<a href="https://colab.research.google.com/github/IfrahHasan/Real-Time-Stock-Alert-System-using-PySpark/blob/main/Real_time_Financial_Data_Analysis_and_Forecasting_using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required packages
!pip install pyspark
!pip install yfinance
!pip install requests
!pip install plotly

# Import required libraries
import os
import time
from datetime import datetime
import requests
import yfinance as yf

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg, stddev, max as max_, min as min_
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define constants
DATA_DIRECTORY = "/content/data"
LOG_DIRECTORY = "/content/logs"
TELEGRAM_BOT_TOKEN = "7757075948:AAHasZ_QFClfsjTTASOWvzq5VTkal8A-o00"
TELEGRAM_CHAT_ID = "7220520286"

# Ensure directories exist
os.makedirs(DATA_DIRECTORY, exist_ok=True)
os.makedirs(LOG_DIRECTORY, exist_ok=True)


Data for AAPL saved at /content/data/AAPL_202501172325.parquet
Data for MSFT saved at /content/data/MSFT_202501172325.parquet
Data for GOOG saved at /content/data/GOOG_202501172325.parquet


In [None]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Stock Analytics Pipeline") \
    .getOrCreate()


In [None]:
STOCK_TICKERS = ["AAPL", "MSFT", "GOOG"]

last_retrieved = {ticker: None for ticker in STOCK_TICKERS}

def fetch_and_store_stock_data(ticker, output_directory):
    """
    Fetches stock data from Yahoo Finance starting from the last retrieved timestamp.
    """
    try:
        # Determine the start time
        last_timestamp = last_retrieved.get(ticker)
        stock = yf.Ticker(ticker)

        if last_timestamp:
            data = stock.history(start=last_timestamp, interval="1m")
        else:
            data = stock.history(period="1d", interval="1m")

        if data.empty:
            print(f"No new data available for {ticker} since {last_timestamp}.")
            return

        data.reset_index(inplace=True)
        data.rename(columns={"Datetime": "timestamp"}, inplace=True)
        data["ticker"] = ticker  # Add a column with the ticker value
        spark_df = spark.createDataFrame(data)

        # Update last retrieved timestamp
        last_retrieved[ticker] = data["timestamp"].iloc[-1]

        file_path = os.path.join(output_directory, f"{ticker}_{datetime.now().strftime('%Y%m%d%H%M')}.parquet")
        spark_df.write.mode("append").parquet(file_path)
        print(f"Data for {ticker} saved at {file_path}")
    except Exception as error:
        print(f"Error fetching data for {ticker}: {error}")


In [None]:
import threading

def stream_stock_data(tickers, output_directory, interval_seconds=60):
    """
    Continuously fetch and store stock data for multiple tickers at a regular interval,
    while allowing other code blocks to run.
    """
    def run_streaming():
        print("Starting data streaming...")
        while True:
            for ticker in tickers:
                fetch_and_store_stock_data(ticker, output_directory)
            time.sleep(interval_seconds)

    # Start the streaming in a separate thread
    thread = threading.Thread(target=run_streaming, daemon=True)
    thread.start()


In [None]:
# Example call
stream_stock_data(STOCK_TICKERS, DATA_DIRECTORY)


Starting data streaming...


In [None]:
spark.read.parquet("/content/data/AAPL_202501172144.parquet").printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)



In [None]:
import os
from pyspark.sql import DataFrame

def debug_parquet_files(parquet_directory: str) -> None:
    """
    List the files in the Parquet directory and show the first few rows of data.
    """
    # List all files in the Parquet directory
    parquet_files = [f for f in os.listdir(parquet_directory) if f.endswith(".parquet")]
    print(f"Found {len(parquet_files)} Parquet files in {parquet_directory}:")
    for file in parquet_files:
        print(f"  - {file}")

    # If there are files, load one and print a sample
    if parquet_files:
        file_to_read = os.path.join(parquet_directory, parquet_files[0])
        try:
            df = spark.read.parquet(file_to_read)
            print("Sample data from the first Parquet file:")
            df.show(5)
        except Exception as e:
            print(f"Error reading Parquet file {file_to_read}: {e}")
    else:
        print("No Parquet files found. Ensure that the streaming function is writing data.")

# Run the debug function
debug_parquet_files(DATA_DIRECTORY)


Found 3 Parquet files in /content/data:
  - MSFT_202501172322.parquet
  - AAPL_202501172322.parquet
  - GOOG_202501172322.parquet
Sample data from the first Parquet file:
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+
|          timestamp|              Open|              High|               Low|             Close|Volume|Dividends|Stock Splits|ticker|
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+
|2025-01-17 17:46:00|430.55999755859375|   430.81201171875|  430.510498046875|   430.81201171875| 11992|      0.0|         0.0|  MSFT|
|2025-01-17 17:47:00|  430.760009765625|430.81561279296875|            430.75|430.80999755859375|  9578|      0.0|         0.0|  MSFT|
|2025-01-17 17:48:00|430.78741455078125| 430.8799133300781| 430.7200012207031| 430.8599853515625| 11102|      0.0|         0.0|  MSFT|
|2025-01-17 17:49:0

In [None]:
def calculate_all_metrics(data_frame):
    """
    Adds analytics metrics to the DataFrame, with refined conditions for alerts.
    Includes a grace period and adjusted thresholds to reduce noise.
    """
    short_term_window = Window.orderBy("timestamp")
    long_term_window = Window.orderBy("timestamp")
    correlation_window = Window.orderBy("timestamp").rowsBetween(-10, 0)

    # 1. Price Spikes and Dips (use dynamic thresholds)
    data_frame = data_frame.withColumn("prev_close", lag("Close", 1).over(short_term_window))
    data_frame = data_frame.withColumn("pct_change", (col("Close") - col("prev_close")) / col("prev_close"))
    # Use 3x rolling stddev for spike detection
    data_frame = data_frame.withColumn("rolling_close_stddev", stddev("Close").over(long_term_window))
    data_frame = data_frame.withColumn("alert_price_spike", (col("pct_change") > 3 * col("rolling_close_stddev")))
    data_frame = data_frame.withColumn("alert_price_dip", (col("pct_change") < -3 * col("rolling_close_stddev")))

    # 2. Volume Surges (increase multiplier to reduce noise)
    data_frame = data_frame.withColumn("rolling_volume_avg", avg("Volume").over(long_term_window))
    data_frame = data_frame.withColumn("alert_volume_surge", (col("Volume") > 2 * col("rolling_volume_avg")))

    # 3. Volatility Changes (increase threshold)
    data_frame = data_frame.withColumn("volatility", stddev("pct_change").over(long_term_window))
    data_frame = data_frame.withColumn("recent_volatility_avg", avg("volatility").over(long_term_window))
    data_frame = data_frame.withColumn("alert_volatility_change", (col("volatility") > 3 * col("recent_volatility_avg")))

    # 4. Correlated Movements Between Stocks (less frequent checks)
    data_frame = data_frame.withColumn("group_pct_change", avg("pct_change").over(correlation_window))
    data_frame = data_frame.withColumn("alert_correlation", (col("group_pct_change") > 0.9))

    # 5. Moving Averages (require crossovers to persist)
    data_frame = data_frame.withColumn("short_term_avg", avg("Close").over(short_term_window))
    data_frame = data_frame.withColumn("long_term_avg", avg("Close").over(long_term_window))
    data_frame = data_frame.withColumn("alert_bullish_crossover", (col("short_term_avg") > col("long_term_avg")))
    data_frame = data_frame.withColumn("alert_bearish_crossover", (col("short_term_avg") < col("long_term_avg")))

    # 6. RSI (require multiple intervals in overbought/oversold territory)
    data_frame = data_frame.withColumn("gain", F.when(col("pct_change") > 0, col("pct_change")).otherwise(0))
    data_frame = data_frame.withColumn("loss", F.when(col("pct_change") < 0, -col("pct_change")).otherwise(0))
    data_frame = data_frame.withColumn("avg_gain", avg("gain").over(long_term_window))
    data_frame = data_frame.withColumn("avg_loss", avg("loss").over(long_term_window))
    data_frame = data_frame.withColumn("RS", col("avg_gain") / col("avg_loss"))
    data_frame = data_frame.withColumn("RSI", 100 - (100 / (1 + col("RS"))))
    # Require RSI to stay in overbought/oversold for 3 intervals before triggering
    rsi_window = Window.orderBy("timestamp").rowsBetween(-3, 0)
    data_frame = data_frame.withColumn("oversold_count", F.sum(F.when(col("RSI") < 30, 1).otherwise(0)).over(rsi_window))
    data_frame = data_frame.withColumn("overbought_count", F.sum(F.when(col("RSI") > 70, 1).otherwise(0)).over(rsi_window))
    data_frame = data_frame.withColumn("alert_oversold", (col("oversold_count") == 3))
    data_frame = data_frame.withColumn("alert_overbought", (col("overbought_count") == 3))

    # 7. Intraday High/Low Breakouts (keep existing logic)
    data_frame = data_frame.withColumn("intraday_high", max_("Close").over(long_term_window))
    data_frame = data_frame.withColumn("intraday_low", min_("Close").over(long_term_window))
    data_frame = data_frame.withColumn("alert_high_breakout", (col("Close") > col("intraday_high")))
    data_frame = data_frame.withColumn("alert_low_breakout", (col("Close") < col("intraday_low")))

    return data_frame


In [None]:
def send_telegram_alert(message):
    """
    Sends a message to a Telegram chat using the bot's API.
    """
    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
    response = requests.post(url, data=payload)
    if response.status_code == 200:
        print("Telegram alert sent.")
    else:
        print("Failed to send Telegram alert:", response.text)

def send_alerts_for_anomalies(anomalies_df):
    """
    Sends alerts for each anomaly in the DataFrame.
    """
    for row in anomalies_df.collect():
        message = f"ALERT: {row['ticker']} at {row['timestamp']}\n"
        message += f"Price: {row['Close']}, Volume: {row['Volume']}\n"
        if row['alert_price_spike']: message += "Price Spike, "
        if row['alert_price_dip']: message += "Price Dip, "
        if row['alert_volume_surge']: message += "Volume Surge, "
        if row['alert_volatility_change']: message += "Volatility Change, "
        if row['alert_bullish_crossover']: message += "Bullish Crossover, "
        if row['alert_bearish_crossover']: message += "Bearish Crossover, "
        if row['alert_oversold']: message += "Oversold, "
        if row['alert_overbought']: message += "Overbought, "
        if row['alert_high_breakout']: message += "High Breakout, "
        if row['alert_low_breakout']: message += "Low Breakout, "
        send_telegram_alert(message.rstrip(", "))


In [None]:
def log_anomalies(data_frame, log_directory):
    """
    Filters for rows that trigger alert conditions and saves them to a log file.
    """
    alert_conditions = (
        col("alert_price_spike") |
        col("alert_price_dip") |
        col("alert_volume_surge") |
        col("alert_volatility_change") |
        col("alert_correlation") |
        col("alert_bullish_crossover") |
        col("alert_bearish_crossover") |
        col("alert_oversold") |
        col("alert_overbought") |
        col("alert_high_breakout") |
        col("alert_low_breakout")
    )
    anomalies_df = data_frame.filter(alert_conditions)
    anomalies_df.write.mode("append").parquet(os.path.join(log_directory, "anomalies.parquet"))
    print("Logged anomalies:")
    anomalies_df.show()
    return anomalies_df


In [None]:
# Load latest data
parquet_files = [os.path.join(DATA_DIRECTORY, f) for f in os.listdir(DATA_DIRECTORY) if f.endswith(".parquet")]
if parquet_files:
    df = spark.read.parquet(*parquet_files)
    df_with_metrics = calculate_all_metrics(df)
    anomalies = log_anomalies(df_with_metrics, LOG_DIRECTORY)
    send_alerts_for_anomalies(anomalies)
else:
    print("No data available to process.")


Logged anomalies:
+-------------------+------------------+------------------+------------------+------------------+------+---------+------------+------+------------------+--------------------+--------------------+-----------------+---------------+------------------+------------------+-------------------+---------------------+-----------------------+--------------------+-----------------+------------------+------------------+-----------------------+-----------------------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+--------------+----------------+--------------+----------------+-----------------+-----------------+-------------------+------------------+
|          timestamp|              Open|              High|               Low|             Close|Volume|Dividends|Stock Splits|ticker|        prev_close|          pct_change|rolling_close_stddev|alert_price_spike|alert_price_dip|rolling_volume_avg|alert_volume_surge|

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def visualize_dashboard(data_frame):
    """
    Generates a real-time dashboard for stock metrics.

    Args:
        data_frame (DataFrame): Input Spark DataFrame containing stock metrics and alerts.
    """
    # Convert Spark DataFrame to Pandas for plotting
    pd_df = data_frame.toPandas()

    # Create subplots for multiple metrics
    fig = make_subplots(rows=2, cols=2,
                        subplot_titles=("Price with Anomalies", "Volume with Anomalies",
                                        "Volatility", "RSI"))

    # 1. Price plot with anomalies
    fig.add_trace(go.Scatter(
        x=pd_df["timestamp"],
        y=pd_df["Close"],
        mode="lines",
        name="Close Price"
    ), row=1, col=1)

    # Highlight price spikes
    spikes = pd_df[pd_df["alert_price_spike"] == True]
    fig.add_trace(go.Scatter(
        x=spikes["timestamp"],
        y=spikes["Close"],
        mode="markers",
        marker=dict(color="red", size=8),
        name="Price Spike"
    ), row=1, col=1)

    # 2. Volume plot with anomalies
    fig.add_trace(go.Bar(
        x=pd_df["timestamp"],
        y=pd_df["Volume"],
        name="Volume"
    ), row=1, col=2)

    # Highlight volume surges
    volume_surges = pd_df[pd_df["alert_volume_surge"] == True]
    fig.add_trace(go.Scatter(
        x=volume_surges["timestamp"],
        y=volume_surges["Volume"],
        mode="markers",
        marker=dict(color="blue", size=8),
        name="Volume Surge"
    ), row=1, col=2)

    # 3. Volatility plot
    fig.add_trace(go.Scatter(
        x=pd_df["timestamp"],
        y=pd_df["volatility"],
        mode="lines",
        name="Volatility"
    ), row=2, col=1)

    # 4. RSI plot
    fig.add_trace(go.Scatter(
        x=pd_df["timestamp"],
        y=pd_df["RSI"],
        mode="lines",
        name="RSI"
    ), row=2, col=2)

    # Add layout details
    fig.update_layout(height=600, width=800, title_text="Real-Time Stock Dashboard")
    fig.show()

# Example usage:
visualize_dashboard(df_with_metrics)
