In [1]:
#pip install pandas yfinance plotly newsapi-python sqlalchemy snowflake-connector-python prefect requests textblob

Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl.metadata (1.2 kB)
Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.12.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.9/65.9 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting prefect
  Downloading prefect-3.1.6-py3-none-any.whl.metadata (11 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting sortedcontainers>=2.4.0 (from snowflake-connector-python)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Collecting aiosqlite<1.0.0,>=0.17.0 (from prefect)
  Downloading aiosqlite-0.20.0-py3-none-any.whl.metadata (4.3 kB)
Collecting ale

In [26]:
# Importing Libraries
from IPython.display import display
import requests
import pandas as pd
import yfinance as yf
import sqlite3
import plotly.express as px
from textblob import TextBlob
from sqlalchemy import create_engine
from datetime import timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
#from prefect.task_runners import SequentialTaskRunner

In [27]:
# Constants
DATABASE_FILE = "financial_data.sqlite"
NEWS_API_KEY = "371ea8fd8b87444a96fbc0389758ee43"
STOCK_TICKERS = ["AAPL", "GOOGL", "AMZN"]
NEWS_QUERY = "stock market"

SNOWFLAKE_USER = "sangampatil"
SNOWFLAKE_PASSWORD = "Sangam%40123"
SNOWFLAKE_ACCOUNT = "HB38213"
SNOWFLAKE_DATABASE = "financial_news_pipeline"
SNOWFLAKE_SCHEMA = "public"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"

SNOWFLAKE_URL = f"snowflake://{SNOWFLAKE_USER}:{SNOWFLAKE_PASSWORD}@{SNOWFLAKE_ACCOUNT}/{SNOWFLAKE_DATABASE}/{SNOWFLAKE_SCHEMA}?warehouse={SNOWFLAKE_WAREHOUSE}"

In [28]:
# Step 1: Extract Data
@task(retries=3, retry_delay_seconds=10)
def extract_news():
    logger = get_run_logger()
    logger.info("Extracting financial news data...")
    url = f"https://newsapi.org/v2/everything?q={NEWS_QUERY}&apiKey={NEWS_API_KEY}"
    response = requests.get(url)
    response.raise_for_status()
    articles = response.json()["articles"]
    df = pd.DataFrame(articles)[["title", "description", "publishedAt"]]
    logger.info(f"Extracted {len(df)} news articles.")
    #display(df.head())  # Display the extracted news for Colab
    return df

@task(retries=3, retry_delay_seconds=10)
def extract_stock_data(ticker):
    logger = get_run_logger()
    logger.info(f"Extracting stock data for {ticker}...")
    stock_data = yf.download(ticker, period="5d", interval="1h")
    stock_data.reset_index(inplace=True)  # Ensure 'Datetime' is part of the DataFrame
    stock_data["ticker"] = ticker  # Add a column for the ticker
    logger.info(f"Extracted {len(stock_data)} rows of stock data for {ticker}.")
    return stock_data

In [29]:
# Step 2: Transform Data
@task
def analyze_sentiment(df):
    logger = get_run_logger()
    logger.info("Analyzing news sentiment...")
    df["sentiment"] = df["description"].apply(lambda x: TextBlob(str(x)).sentiment.polarity)
    logger.info("Sentiment analysis completed.")
    return df

@task
def calculate_stock_metrics(df):
    logger = get_run_logger()
    logger.info("Calculating stock metrics...")
    tickers = ['AAPL', 'GOOGL', 'AMZN']
    for ticker in tickers:
        close_col = f"Close_{ticker}"
        if close_col in df.columns:
            df[f"price_change_{ticker}"] = df[close_col].pct_change()
            df[f"volatility_{ticker}"] = df[close_col].rolling(window=3).std()
        else:
            raise ValueError(f"The column {close_col} is missing in the DataFrame.")
    logger.info("Stock metrics calculated.")
    return df

In [30]:
# Step 3: Load Data
@task
def load_to_sqlite(df, table_name):
    logger = get_run_logger()
    logger.info(f"Loading data to SQLite table: {table_name}...")
    conn = sqlite3.connect(DATABASE_FILE)
    try:
        conn.execute(f"DROP TABLE IF EXISTS {table_name}")
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        logger.info(f"Loaded {len(df)} rows into {table_name}.")
    finally:
        conn.close()

@task
def load_to_snowflake(df, table_name):
    engine = create_engine(SNOWFLAKE_URL)
    with engine.connect() as conn:
        df.to_sql(table_name, conn, if_exists="replace", index=False)

In [31]:
# Step 4: Visualize Data
def extract_news_data_from_db():
    conn = sqlite3.connect(DATABASE_FILE)
    try:
        return pd.read_sql("SELECT * FROM news_data", conn)
    finally:
        conn.close()

def extract_stock_data_from_db():
    conn = sqlite3.connect(DATABASE_FILE)
    try:
        return pd.read_sql("SELECT * FROM stock_data", conn)
    finally:
        conn.close()

def visualize_news_sentiment(news_df):
    fig = px.histogram(news_df, x="sentiment", title="News Sentiment Distribution")
    fig.show()

def visualize_stock_prices(stock_df, ticker):
    close_col = f"Close_{ticker}"
    if close_col not in stock_df.columns:
        raise ValueError(f"Column {close_col} is missing.")
    fig = px.line(stock_df, x="Datetime", y=close_col, title=f"{ticker} Stock Prices Over Time")
    fig.show()

def visualize_stock_metrics(stock_df, ticker):
    price_change_col = f"price_change_{ticker}"
    volatility_col = f"volatility_{ticker}"
    if price_change_col not in stock_df.columns or volatility_col not in stock_df.columns:
        raise ValueError(f"Metrics for {ticker} are missing.")
    fig1 = px.line(stock_df, x="Datetime", y=price_change_col, title=f"{ticker} Price Change Over Time")
    fig1.show()
    fig2 = px.line(stock_df, x="Datetime", y=volatility_col, title=f"{ticker} Volatility Over Time")
    fig2.show()

In [32]:
# Step 5: Orchestrate the Pipeline

# Extract news and stock data
news_data = extract_news()
stock_data = pd.concat([extract_stock_data(ticker) for ticker in STOCK_TICKERS], axis=1)

# Flatten MultiIndex columns
stock_data.columns = ['_'.join(col).strip() if col[1] else col[0] for col in stock_data.columns]

# Remove duplicate columns
stock_data = stock_data.loc[:, ~stock_data.columns.duplicated()]

[*********************100%***********************]  1 of 1 completed


[*********************100%***********************]  1 of 1 completed


[*********************100%***********************]  1 of 1 completed


In [33]:
# Transform data
analyzed_news = analyze_sentiment(news_data)
transformed_stock = calculate_stock_metrics(stock_data)

# Load data to SQLite
load_to_sqlite(analyzed_news, "news_data")
load_to_sqlite(transformed_stock, "stock_data")

In [34]:
# Extract data from SQLite
news_data = extract_news_data_from_db()
display(news_data.head())

Unnamed: 0,title,description,publishedAt,sentiment
0,"No Mercy for Samsung, Amazon Slashes the Price...","Rated 4.8/5 on Amazon, it is one of the best i...",2024-12-01T22:05:09Z,0.466667
1,"To Make You Forget About MacBooks, Microsoft S...","Launched in June, Microsoft's Surface Laptops ...",2024-11-27T13:10:14Z,0.4
2,[Removed],[Removed],2024-12-12T14:50:54Z,0.0
3,"Don’t Buy an Old MacBook For Less, The New Mac...","Launched in early November, these MacBook Pro ...",2024-11-27T19:06:51Z,0.295238
4,There are 2 looming risks that could spark a s...,High stock prices mean the risk of a market co...,2024-12-04T17:51:07Z,-0.07625


In [35]:
stock_data = extract_stock_data_from_db()
display(stock_data.head())

Unnamed: 0,Datetime,Adj Close_AAPL,Close_AAPL,High_AAPL,Low_AAPL,Open_AAPL,Volume_AAPL,ticker,Adj Close_GOOGL,Close_GOOGL,...,High_AMZN,Low_AMZN,Open_AMZN,Volume_AMZN,price_change_AAPL,volatility_AAPL,price_change_GOOGL,volatility_GOOGL,price_change_AMZN,volatility_AMZN
0,2024-12-09 14:30:00+00:00,244.509995,244.509995,244.720001,241.759995,241.830002,6763286,AAPL,174.404007,174.404007,...,230.0,227.132004,227.210007,11506240,,,,,,
1,2024-12-09 15:30:00+00:00,246.470001,246.470001,247.2388,244.419998,244.479996,7885176,AAPL,175.499893,175.499893,...,228.990005,226.080002,227.830002,6467006,0.008016,,0.006284,,0.002809,
2,2024-12-09 16:30:00+00:00,246.790298,246.790298,247.240005,246.309998,246.485001,3738978,AAPL,175.979996,175.979996,...,229.720001,228.399994,228.445007,4989866,0.0013,1.234504,0.002736,0.807796,0.005224,0.930518
3,2024-12-09 17:30:00+00:00,246.905396,246.905396,247.015106,245.710007,246.824997,3215218,AAPL,175.945007,175.945007,...,230.080002,228.679993,229.630005,3085767,0.000466,0.225612,-0.000199,0.267659,-0.002279,0.598201
4,2024-12-09 18:30:00+00:00,246.095001,246.095001,246.889999,246.059998,246.889999,2110393,AAPL,175.320007,175.320007,...,229.639297,228.270004,229.080002,2171146,-0.003282,0.438449,-0.003552,0.371356,-0.003535,0.671816


In [36]:
# Visualize the sentiment distribution of financial news
visualize_news_sentiment(news_data)

In [37]:
# Visualize stock prices for AAPL
visualize_stock_prices(stock_data, "AAPL")

In [38]:
# Visualize stock metrics for AAPL
visualize_stock_metrics(stock_data, "AAPL")

In [39]:
# Visualize stock prices for AAPL
visualize_stock_prices(stock_data, "GOOGL")

In [40]:
# Visualize stock metrics for AAPL
visualize_stock_metrics(stock_data, "GOOGL")

In [41]:
# Visualize stock prices for AAPL
visualize_stock_prices(stock_data, "AMZN")

In [42]:
# Visualize stock metrics for AAPL
visualize_stock_metrics(stock_data, "AMZN")