# Environment Setup

In [None]:
import glob
import json
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as F, types as T, Window as W

In [None]:
# Set up the Spark configuration and context
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Set up the Spark session
spark = SparkSession.builder \
    .appName("BatchProcessor") \
    .config("spark.driver.extraJavaOptions", "-Xss4m") \
    .config("spark.executor.extraJavaOptions", "-Xss4m") \
    .getOrCreate()

# Data Preparation

In [None]:
schema = T.StructType([
    T.StructField("authors", T.ArrayType(T.StringType()), True),
    T.StructField("date_google", T.StringType(), True),
    T.StructField("date_metadata", T.StringType(), True),
    T.StructField("date_published", T.StringType(), True),
    T.StructField("date_target", T.StringType(), True),
    T.StructField("description", T.StringType(), True),
    T.StructField("explanation", T.StringType(), True),
    T.StructField("groq_usage", T.StringType(), True),
    T.StructField("metadata", T.MapType(T.StringType(), T.StringType()), True),
    T.StructField("rating_democrats", T.FloatType(), True),
    T.StructField("rating_republicans", T.FloatType(), True),
    T.StructField("source_url", T.StringType(), True),
    T.StructField("summary", T.StringType(), True),
    T.StructField("text", T.StringType(), True),
    T.StructField("title", T.StringType(), True),
    T.StructField("url", T.StringType(), True)
])
news_df = spark.read.option("multiline", "true").json("../news_ratings/data/", schema=schema)
news_df.show(n=5)

In [None]:
market_df = spark.read.csv("../stocks_data/ticker_data.csv", header=True, inferSchema=True)
market_df.show(n=5)

In [None]:
market_df.printSchema()

# Data Pre-processing

In [None]:
news_df = news_df.withColumn("published_at", F.coalesce("date_google", "date_metadata", "date_published")) \
    .withColumn("published_at", F.to_timestamp("published_at")) \
    .withColumn("date_target", F.to_date("date_target"))
news_df = news_df.select('published_at', 'date_target', 'rating_democrats', 'rating_republicans', 'title', 'summary', 'url')
news_df = news_df.na.drop(subset=["published_at"])
news_df.sample(fraction=0.01).show(n=5)

In [None]:
market_df = market_df.filter(market_df['Ticker'] == "JPM").show(20)

# Feature Engineering

In [None]:
time_windows = [
#   1,      # 1 hour
#   5,      # 5 hours
  10,     # 10 hours
  24,     # 1 day
  7*24,   # 1 week
#   14*24,  # 2 weeks
#   28*24,  # 4 weeks
]
# --------------------
statistics = [
  "count",
  "mean",
  "std",
  "min",
  "max",
  "median",
  "spread",
]
# --------------------
ticker_cols = [
  "Open",
  "High",
  "Low",
  "Close",
  "Adj Close",
  "Volume"
]
# --------------------
news_cols = [
  "rating_republicans",
  "rating_democrats",
]

In [None]:
# Define a helper function for rolling window calculations
def calculate_rolling_stats(df, cols, datetime_col, partition_col=None):
    """
    Computes rolling statistics for a given column over various time windows.
    """
    result_df = df
    for window_hours in time_windows:
        # Define the window range in seconds
        window_range = window_hours * 3600
        # Define a rolling window spec
        window_spec = (
            W
            .partitionBy(partition_col if partition_col else [])
            .orderBy(F.col(datetime_col).cast("timestamp").cast("long"))
            .rangeBetween(-window_range, 0)
        )
        for col in cols:
            for stat in statistics:
                col_name = f"rolling_{window_hours}h_{col}_{stat}"
                if   stat == "count":   result_df = result_df.withColumn(col_name, F.count(col).over(window_spec))
                elif stat == "mean":    result_df = result_df.withColumn(col_name, F.mean(col).over(window_spec))
                elif stat == "std":     result_df = result_df.withColumn(col_name, F.stddev(col).over(window_spec))
                elif stat == "min":     result_df = result_df.withColumn(col_name, F.min(col).over(window_spec))
                elif stat == "max":     result_df = result_df.withColumn(col_name, F.max(col).over(window_spec))
                elif stat == "median":  result_df = result_df.withColumn(col_name, F.approx_percentile(col, 0.5, 10).over(window_spec))
                elif stat == "spread":  result_df = result_df.withColumn(col_name, F.max(col).over(window_spec) - F.min(col).over(window_spec))
    return result_df

In [None]:
# %%script false --no-raise-error
market_rollstats_df = calculate_rolling_stats(market_df, ticker_cols, "Date", "Ticker")
news_rollstats_df = calculate_rolling_stats(news_df, news_cols, "published_at")

# Compute Correlations

In [None]:
# %%script false --no-raise-error
sc.stop()       # Stop the Spark contex