In [1]:
!pip install yfinance --upgrade --quiet

In [27]:
# Step 1: Load actual stock volume from yfinance
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta

today = datetime.today().date()
#today = datetime.strptime("2025-04-30", "%Y-%m-%d").date()

#Pull only one day's worth of data
df = yf.download("NVDA", start=str(today), end=str(today + timedelta(days=1)))

#Reset index for cleaner formatting
df.reset_index(inplace=True)

# Check if DataFrame is empty
if df.empty:
    display("No stock data returned from yfinance. Stopping execution.")
    
    # If you're running in a Synapse notebook, exit cleanly
    from notebookutils import mssparkutils
    mssparkutils.notebook.exit("No data found - stopping notebook")
else:
    display("Stock data successfully retrieved, proceeding with analysis.")

In [28]:
# Step 2: Load the Stream Data from blob
from py4j.java_gateway import java_import
from datetime import datetime
from pyspark.sql.types import *

# Step 1: Define today
today = datetime.today().date()

# Step 2: Set up Hadoop FS for listing files
stream_path = "abfss://team12blobcontainer@team12storage.dfs.core.windows.net/Medallion/Gold/Stream/"
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
path = spark._jvm.Path(stream_path)
files = fs.listStatus(path)

# Step 3: Extract valid .json file names with dates < today
json_files = [f.getPath().getName() for f in files if f.isFile() and f.getPath().getName().endswith(".json")]

dated_files = []
for fname in json_files:
    try:
        file_date = datetime.strptime(fname.replace(".json", ""), "%Y-%m-%d").date()
        if file_date < today:
            dated_files.append(file_date)
    except:
        continue

# Step 4: Get most recent available file and load it
if dated_files:
    nearest_date = max(dated_files)
    file_path = f"{stream_path}{nearest_date}.json"
    print(f"Loaded file: {file_path}")

    streaming_df = spark.read.option("multiline", "true").json(file_path)
    #display(streaming_df.limit(10))
else:
    print("No previous date file found.")

In [29]:
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Load previous predictions and input snapshots
container = "team12blobcontainer"
storage_account_name = "team12storage"
abfss_base_path = f"abfss://{container}@{storage_account_name}.dfs.core.windows.net/"
prediction_date = streaming_df.select("tweet_created_at_date").first()["tweet_created_at_date"]
predicted_df = spark.read.parquet(f"{abfss_base_path}Prediction/Streaming/{prediction_date}/")
display(predicted_df)

In [30]:
from pyspark.sql.functions import col, to_date
# Step 2: Update the batch prediction file
aligned_predicted_df = predicted_df.select(
    to_date("tweet_created_at_date").alias("tweet_created_at_date"),
    col("predicted_next_day_volume").cast("double")
)
update_batch_prediction_path = f"{abfss_base_path}Prediction/Batch/data"
aligned_predicted_df.write.mode("append").parquet(update_batch_prediction_path)

In [33]:
# Read the gold layer file
origin_gold_df = spark.read.parquet(f"{abfss_base_path}Medallion/Gold/dataset_updated/")

In [34]:
from pyspark.sql.functions import col, to_date
# Step 3: Add df["Volume"] to streaming_inputs_df as new column and append back to gold layer
from pyspark.sql.functions import lit

# 1. Extract the volume value from the yfinance DataFrame (assume one row)
actual_volume = int(df["Volume"].iloc[0])

# 2. Add it as a new column to your Spark streaming input DataFrame
streaming_df = streaming_df.withColumn("next_available_volume", lit(actual_volume))

In [35]:
# 3: Cast and select columns in the same order as gold schema
aligned_streaming_df = streaming_df.select(
    col("user_id").cast("string"),
    to_date("tweet_created_at_date").alias("tweet_created_at_date"),   # from string to date
    col("is_blue_verified").cast("boolean"),
    to_date("account_created_at").alias("account_created_at"),         # from string to date
    col("followers_count").cast("int"),
    col("friends_count").cast("int"),
    col("account_favourites_count").cast("int"),
    col("listed_count").cast("int"),
    col("media_count").cast("int"),
    col("account_possibly_sensitive").cast("boolean"),
    col("rest_id").cast("string"),
    col("tweet_created_at_time").cast("string"),
    col("view_count").cast("string"),
    col("retweet_count").cast("int"),
    col("reply_count").cast("int"),
    col("quote_count").cast("int"),
    col("favorite_count").cast("int"),
    col("tweet_possibly_sensitive").cast("boolean"),
    col("full_text").cast("string"),
    col("sentiment_score").cast("double"),
    col("interaction_score").cast("double"),
    col("favorite_ratio").cast("double"),
    col("reply_ratio").cast("double"),
    col("account_age_days").cast("int"),
    col("credibility_score").cast("double"),
    col("follower_activity_score").cast("double"),
    col("day_of_week").cast("string"),
    col("is_viral").cast("boolean"),
    col("is_new_account").cast("boolean"),
    col("is_influencer").cast("boolean"),
    col("current_volume").cast("int"),
    col("current_open").cast("double"),
    col("current_close").cast("double"),
    col("current_high").cast("double"),
    col("current_low").cast("double"),
    col("next_available_volume").cast("int")
)

# 4. Save the enriched streaming data to your Gold layer (append mode)
gold_output_path = f"{abfss_base_path}Medallion/Gold/dataset_updated"

aligned_streaming_df.write.mode("append").parquet(gold_output_path)

In [36]:
# Step 4: Define retrain function (if should_retrain = True)
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline, PipelineModel

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime
from py4j.java_gateway import java_import

def retrain():

    # Load Updated Gold Dataset
    updated_gold_df = spark.read.parquet(gold_output_path)

    # Load the pipeline
    pipeline_path = "abfss://team12blobcontainer@team12storage.dfs.core.windows.net/Model/preprocess_pipeline"
    preprocessing_pipeline = PipelineModel.load(pipeline_path)

    # Step 1: Preprocess the data (same as initial pipeline setup)
    # Step 1.1: Convert Boolean to Integer
    updated_gold_df = updated_gold_df.withColumn("is_viral_int", col("is_viral").cast("int")) \
        .withColumn("is_new_account_int", col("is_new_account").cast("int")) \
        .withColumn("is_blue_verified_int", col("is_blue_verified").cast("int")) \
        .withColumn("is_influencer_int", col("is_influencer").cast("int"))

    # Step 1.2: Define required columns
    required_cols = [
        "sentiment_score", "interaction_score", "favorite_ratio", "reply_ratio",
        "followers_count", "friends_count", "account_favourites_count",
        "listed_count", "media_count", "account_age_days", "credibility_score",
        "follower_activity_score", "current_open", "current_close",
        "current_high", "current_low", "current_volume", "next_available_volume",
        "day_of_week", "is_viral_int", "is_new_account_int", "is_blue_verified_int", "is_influencer_int"
    ]

    # Step 1.3: Drop nulls
    updated_gold_df = updated_gold_df.dropna(subset=required_cols)

    # Step 1.4: Reuse the same preprocessing pipeline
    processed_df = preprocessing_pipeline.transform(updated_gold_df)
    print("Preprocessing Complete.")

    # Step 2.1: Split the data
    train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)

    # Step 2.2: Define the model
    rf = RandomForestRegressor(
        featuresCol="scaled_features", 
        labelCol="next_available_volume", 
        predictionCol="prediction",
        seed=42
    )

    # Step 2.3: Build the parameter grid
    param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [20, 50, 100]) \
        .addGrid(rf.maxDepth, [5, 10, 15]) \
        .addGrid(rf.minInstancesPerNode, [1, 5]) \
        .build()

    # Step 2.4: Define evaluator
    evaluator = RegressionEvaluator(
        labelCol="next_available_volume", 
        predictionCol="prediction", 
        metricName="rmse"
    )

    # Step 2.5: CrossValidator setup
    cv = CrossValidator(
        estimator=rf,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3,
        parallelism=2
    )

    # Step 2.6: Fit the model on training data
    cv_model = cv.fit(train_df)

    # Step 2.7: Evaluate on test data
    new_best_model = cv_model.bestModel
    predictions = new_best_model.transform(test_df)

    rmse = evaluator.evaluate(predictions)
    r2 = RegressionEvaluator(
        labelCol="next_available_volume", 
        predictionCol="prediction", 
        metricName="r2"
    ).evaluate(predictions)

    print(f"Best model params:")
    print(f" - numTrees: {new_best_model.getNumTrees}")
    print(f" - maxDepth: {new_best_model.getOrDefault('maxDepth')}")
    print(f" - minInstancesPerNode: {new_best_model.getOrDefault('minInstancesPerNode')}")
    print(f"New RMSE: {rmse}")
    print(f"New R²: {r2}")

    # Step 3: Save best trained model
    # Step 3.1 Archive the older version
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    abfss_base_path = f"abfss://{container}@{storage_account_name}.dfs.core.windows.net/Model"
    current_model_path = f"{abfss_base_path}/stock_volume_rf_model"
    archived_model_path = f"{abfss_base_path}/archive/stock_volume_rf_model_{timestamp}"

    # Use Hadoop API to rename (move) the directory
    java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

    src_path = spark._jvm.Path(current_model_path)
    dst_path = spark._jvm.Path(archived_model_path)

    try:
        if fs.exists(src_path):
            fs.rename(src_path, dst_path)
            print(f"Archived old model to: {archived_model_path}")
        else:
            print("No existing model to archive.")
    except Exception as e:
        print(f"Failed to archive old model: {e}")

    # Step 3.2: Save new model
    new_best_model.write().overwrite().save(current_model_path)
    print(f"New model saved to: {current_model_path}")

    return new_best_model

In [37]:
# Step 5: Compare the actual stock volume with the prediction
predicted_volume = predicted_df.select("predicted_next_day_volume").first()["predicted_next_day_volume"]

# Compute relative error
error = abs(actual_volume - predicted_volume) / actual_volume

#Define the threshold to retrain
RETRAIN_THRESHOLD = 0.10

if error > RETRAIN_THRESHOLD:
    print("Significant prediction error — retraining required.")
    should_retrain = True
    trained_model = retrain()
else:
    print("Prediction within acceptable range — no retraining.")
    should_retrain = False