## Read in Data

Maybe can be removed when directly pulled

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    window, first, last, max as Fmax, min as Fmin,
    sum as Fsum, avg as Favg, stddev, lag, col,
    split, explode, count as Fcount
)
from pyspark.sql.window import Window as W
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.stat import Summarizer

# 1) Spark session
spark = SparkSession.builder.appName("WindowedAggregation").getOrCreate()

# 2) Load raw SPY CSV (already batched pull)
stockDF = spark.read.csv(
    "/mnt/project/spy_snapshot.csv",
    header=True, inferSchema=True
).withColumn("timestamp", col("timestamp").cast("timestamp"))

# 3) Load raw GDELT CSV (already batched pull)
newsDF = spark.read.csv(
    "/mnt/project/gdelt_news.csv",
    header=True, inferSchema=True
).withColumn("timestamp", col("V2_DATE").cast("timestamp")) \
 .select(
    "timestamp",
    col("V1_5_TONE").alias("Tone"),
    "Positive","Negative","Polarity",
    "ActivityRefDensity","SelfGroupDensity",
    "WordCount","GKGRECORDID",
    "V2_ENHANCED_THEMES"
)

## Create Features

In [None]:

# 4) Aggregate SPY into 15-minute windows
stockAgg = stockDF.groupBy(window("timestamp","15 minutes").alias("w")) \
    .agg(
      first("open").alias("open"),
      Fmax("high").alias("high"),
      Fmin("low").alias("low"),
      last("close").alias("close"),
      Fsum("volume").alias("volume"),
      Fsum("trade_count").alias("trade_count"),
      Favg("vwap").alias("vwap"),
      stddev(((col("high")+col("low"))/2)).alias("volatility")
    )

# 5) Compute log‐return vs previous window
winSpec = W.orderBy("w.start")
stockFeat = stockAgg \
    .withColumn("prev_close", lag("close").over(winSpec)) \
    .withColumn("log_return", (col("close")/col("prev_close")).log()) \
    .na.fill({"log_return": 0.0}) \
    .drop("prev_close")

# 6) Aggregate news into 15-minute windows (numeric only)
newsFeat = newsDF.groupBy(window("timestamp","15 minutes").alias("w")) \
    .agg(
      Fcount("GKGRECORDID").alias("article_count"),
      Favg("Tone").alias("avg_tone"),
      Fsum("Positive").alias("sum_pos"),
      Fsum("Negative").alias("sum_neg"),
      Favg("Polarity").alias("avg_pol")
    )

# 5) News: numeric aggregates per window
numericNews = newsDF.groupBy(window("timestamp","15 minutes").alias("w")) \
    .agg(
      Fcount("GKGRECORDID").alias("article_count"),
      Favg("Tone").alias("avg_tone"),
      Fsum("Positive").alias("sum_pos"),
      Fsum("Negative").alias("sum_neg"),
      Favg("Polarity").alias("avg_pol")
    )

Additional Themes feature

In [None]:
# from pyspark.sql.window import Window as W
# from pyspark.ml.feature import HashingTF, IDF

# # 6) Prepare themes for TF–IDF
# # explode the semicolon-delimited V1_THEMES into tokens
# themesTokens = newsDF \
#   .withColumn("theme", explode(split(col("V1_THEMES"), ";"))) \
#   .withColumn("theme", col("theme")) \
#   .groupBy("GKGRECORDID","timestamp") \
#   .agg(collect_list("theme").alias("themes") )

# # 7) TF–IDF on themes
# htf = HashingTF(inputCol="themes", outputCol="tf", numFeatures=256)
# idf = IDF(inputCol="tf", outputCol="tfidf")
# tf = htf.transform(themesTokens)
# themesVec = idf.fit(tf).transform(tf)

# # 8) Aggregate theme vectors per window (mean TF–IDF)
# # use simple avg over sparse vectors: convert to array and back if needed, but Spark 3.0+ supports avg on Vector
# themeNews = themesVec.groupBy(window("timestamp","15 minutes").alias("w")) \
#     .agg( Favg("tfidf").alias("avg_tfidf") )

First Features

In [None]:
# 7) Join stock + news features
joinedFeat = stockFeat.join(newsFeat, on="w", how="left") \
    .na.fill({
      "article_count": 0,
      "avg_tone": 0.0,
      "sum_pos": 0.0,
      "sum_neg": 0.0,
      "avg_pol": 0.0,
      #"avg_tfidf":  Vectors.sparse(256, [])
    })

# 8) Preview final feature table
joinedFeat.select(
  "w.start","w.end",
  "open","high","low","close","volume","trade_count",
  "vwap","volatility","log_return",
  "article_count","avg_tone","sum_pos","sum_neg","avg_pol"#,"avg_tfidf"
).show(truncate=False)

## Prepare for pipeline

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

all_feature_cols = [
    "open","high","low","close",
    "volume","trade_count","vwap",
    "volatility","log_return",
    "article_count","avg_tone","sum_pos","sum_neg","avg_pol"
]

# Add depending if TF-IDF is used
#all_feature_cols = all_feature_cols + ["avg_tfidf"]

# 2) Create the VectorAssembler
assembler = VectorAssembler(
    inputCols=all_feature_cols,
    outputCol="raw_features"
)

# 3) Create the StandardScaler
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withMean=True,
    withStd=True
)

## Next Steps:
-  Decide on model
    - Classifictaion Binary up or down
    - Regression

- Create graphs and metrics for evaluation and visualization

Optional:
- Create feature selection




In [None]:
import mlflow
import mlflow.spark
from pyspark.sql.window import Window
from pyspark.sql.functions import lead
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Assume `joinedFeat` from before is in scope:
# columns: w.start, open, high, low, close, volume, trade_count, vwap,
# volatility, log_return, article_count, avg_tone, sum_pos, sum_neg, avg_pol

# 1) Build the labeled DataFrame with next-window close as label
wSpec = Window.orderBy("w.start")
df = joinedFeat.withColumn("label", lead("close",1).over(wSpec)).na.drop(subset=["label"])

# 2) Assemble & scale features
feature_cols = [
    "open","high","low","close","volume","trade_count","vwap",
    "volatility","log_return","article_count","avg_tone","sum_pos","sum_neg","avg_pol"
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
scaler    = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

# 3) Split into train/test
train_df, test_df = df.randomSplit([0.8,0.2], seed=42)

# 4) Evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# ----------------------------------------------------
# 5) GBT run
# ----------------------------------------------------
mlflow.spark.autolog()   # enable Spark autologging
with mlflow.start_run(run_name="GBTRegressor") as run:
    gbt = GBTRegressor(featuresCol="features", labelCol="label",
                       maxIter=50, maxDepth=5)
    pipeline_gbt = Pipeline(stages=[assembler, scaler, gbt])
    model_gbt = pipeline_gbt.fit(train_df)
    
    preds_gbt = model_gbt.transform(test_df)
    rmse_gbt = evaluator.evaluate(preds_gbt)
    mlflow.log_metric("rmse", rmse_gbt)
    print(f"GBT RMSE = {rmse_gbt:.4f}")

# ----------------------------------------------------
# 6) XGBoost run
# ----------------------------------------------------
# no need to call autolog again—the same setting applies
with mlflow.start_run(run_name="XGBRegressor") as run:
    xgb = SparkXGBRegressor(
        features_col="features",
        label_col="label",
        objective="reg:squarederror",
        num_round=100,
        max_depth=5,
        eta=0.1
    )
    pipeline_xgb = Pipeline(stages=[assembler, scaler, xgb])
    model_xgb = pipeline_xgb.fit(train_df)
    
    preds_xgb = model_xgb.transform(test_df)
    rmse_xgb = evaluator.evaluate(preds_xgb)
    mlflow.log_metric("rmse", rmse_xgb)
    print(f"XGB RMSE = {rmse_xgb:.4f}")

# ----------------------------------------------------
# 7) Compare in MLflow UI
# ----------------------------------------------------
print("Two runs have been logged.  Navigate to the MLflow Experiment UI to compare their RMSE and parameters side-by-side.")


## Custom DCScan Algorithm for Clustering News

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from math import sqrt, exp

# 1.1 Extract per-article TF-IDF and window start as RDD
# Assume themesVec has columns: window (with .start), GKGRECORDID, tfidf
articleRDD = themesVec.rdd.map(lambda row: (
    row['w'].start,        # window_start timestamp
    (row['GKGRECORDID'], row['tfidf'])  # article id + vector
))

# 1.2 Group per window 
byWindow = articleRDD.groupByKey()  
# now: RDD[(window_start, Iterable[(id, tfidf_vec)])]

# 1.3 A simple density-based clusterer (DBSCAN-lite)
def dbscan_cluster(points, eps=0.5, min_pts=3):
    # points: list of (id, SparseVector)
    clusters = {}
    visited = set()
    cid = 0
    # naive O(n^2) distance for demo
    def dist(u, v):
        diff = u.toArray() - v.toArray()
        return sqrt((diff*diff).sum())
    for pid, vec in points:
        if pid in visited:
            continue
        visited.add(pid)
        # find neighbors
        neigh = [q for q,w in points if dist(vec, w) <= eps]
        if len(neigh) < min_pts:
            clusters[pid] = -1  # noise
        else:
            # expand cluster
            stack = neigh[:]
            clusters[pid] = cid
            while stack:
                q = stack.pop()
                if q not in visited:
                    visited.add(q)
                    qvec = dict(points)[q]
                    qneigh = [r for r,w in points if dist(qvec, w) <= eps]
                    if len(qneigh) >= min_pts:
                        stack.extend(qneigh)
                if q not in clusters:
                    clusters[q] = cid
            cid += 1
    return clusters  # dict article_id → cluster_id

# 1.4 Apply per window
clustered = byWindow.mapValues(lambda articles:
    dbscan_cluster(list(articles), eps=0.3, min_pts=2)
)
# clustered: RDD[(window_start, {article_id: cluster_id, …}), …]

# 1.5 Turn back into a DataFrame if you want to aggregate cluster stats per window
rows = clustered.flatMap(lambda ws_map:
    [ (ws_map[0], aid, cid) for aid, cid in ws_map[1].items() ]
)
clusterDF = spark.createDataFrame(rows, schema=["window_start","GKGID","cluster_id"])
