### Time Aggregation:

- Take our thematicaly tagged gold delta, and aggregate by time to create sentiment and activity features by theme


In [1]:


from pathlib import Path
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

#paths
NB_DIR = Path.cwd()
ROOT = NB_DIR.parent
GOLD_DELTA = ROOT / "pipelines" / "gold" / "gold_delta"
THEMATIC_DELTA = ROOT / "clustering" / "thematic_clusters"   
print("Thematic Cluster Delta:", THEMATIC_DELTA)

#spark
builder = (
    SparkSession.builder
        .appName("time_agg")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.driver.memory", "10g")
        .config("spark.sql.shuffle.partitions", "16")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()



#load
df_gold = spark.read.format("delta").load(str(GOLD_DELTA))
df_theme = spark.read.format("delta").load(str(THEMATIC_DELTA))

#Remove old macro column if present
if "macro_cluster" in df_gold.columns:
    df_gold = df_gold.drop("macro_cluster")

df = df_gold.join(df_theme, on="url", how="inner")
df.show(5, truncate=False)
print("Joined rows:", df.count())


Thematic Cluster Delta: /home/david/School/CapStone/clustering/thematic_clusters


25/12/08 18:34:41 WARN Utils: Your hostname, david-ThinkPad-T490 resolves to a loopback address: 127.0.1.1; using 172.16.0.186 instead (on interface wlp0s20f3)
25/12/08 18:34:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/david/School/CapStone/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/david/.ivy2/cache
The jars for the packages stored in: /home/david/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-499c51b0-2368-4121-954d-f680efb359c0;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 359ms :: artifacts dl 13ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0  

+------------------------------------------------------------------------------------------------------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------



Joined rows: 1949081


                                                                                

In [2]:
df.printSchema()

from pyspark.sql.functions import min as Fmin, max as Fmax, to_date

# Convert timestamp → date if not already
df_dates = df.withColumn("date_key", to_date("date"))

date_range = (
    df_dates
        .agg(
            Fmin("date_key").alias("start_date"),
            Fmax("date_key").alias("end_date")
        )
        .collect()[0]
)

print("Start date:", date_range["start_date"])
print("End date:", date_range["end_date"])


root
 |-- url: string (nullable = true)
 |-- date: timestamp_ntz (nullable = true)
 |-- text: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- text_type: string (nullable = true)
 |-- time_precision: string (nullable = true)
 |-- date_trading: timestamp_ntz (nullable = true)
 |-- tz_hint: string (nullable = true)
 |-- dataset: string (nullable = true)
 |-- dataset_source: string (nullable = true)
 |-- source: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- len_text: integer (nullable = true)
 |-- silver_ingestion_ts: timestamp_ntz (nullable = true)
 |-- emb_0: float (nullable = true)
 |-- emb_1: float (nullable = true)
 |-- emb_2: float (nullable = true)
 |-- emb_3: float (nullable = true)
 |-- emb_4: float (nullable = true)
 |-- emb_5: float (nullable = true)
 |-- emb_6: float (nullable = true)
 |-- emb_7: float (nullable = true)
 |-- emb_8: float (nullable = true)
 |-- emb_9: float (nullable = true



Start date: 2016-01-01
End date: 2020-04-01


                                                                                

### Curse of Dimensionality  
With only **~1024 trading days**, adding too many themes — plus sentiment, activity, and lag features for each — makes the feature space **explode**.  
To avoid overfitting and keep the system universal across tickers, we retain only broad, economically meaningful themes.

| Theme | Why It Stays (Universal Economic Impact) |
|-------|-------------------------------------------|
| **1. Corporate Earnings & Financial Performance** | Earnings season drives broad sentiment; aggregate corporate health affects indices. |
| **2. Financial Markets & Trading** | Captures liquidity, flows, and market-wide sentiment. Relevant to every ticker. |
| **3. Macroeconomic Trends & Outlook** | Inflation, growth, and interest rates drive systematic returns across all assets. |
| **4. Energy & Commodities Markets** | Oil, gas, and metals impact inflation, supply chains, and sector costs. |
| **5. Tech Industry & Innovation** | Tech dominates global market cap and risk appetite in the modern era. |
| **6. International Trade & Relations** | Tariffs, supply chain shocks, and trade tensions affect most equities globally. |
| **7. Global Geopolitics & Security** | Conflicts, sanctions, and geopolitical risk feed directly into volatility and returns. |


### Columns Kept for Daily Aggregation  
To keep the feature space compact and statistically reliable (~1024 trading days), we retain only columns that contribute meaningful, broad-impact market signals.  
All embedding vectors, PCA dimensions, raw text, and metadata fields are dropped.

| Column / Group | Example Columns | Why We Keep It |
|----------------|-----------------|----------------|
| **Date** | `date` → `date_key` | Needed for grouping news into daily aggregates. |
| **Theme Labels** | `thematic_cluster` | Core explanatory variable: theme counts + theme shocks. |
| **Lexicon Sentiment** | `lm_pos`, `lm_neg`, `lm_unc`, `lm_lit`, `lm_con` | Captures tone, uncertainty, and sentiment widely shown to influence market returns. |
| **Entity Mentions** | `ner_org`, `ner_gpe`, `ner_person`, `ner_money` | High-level indicators of economic, geopolitical, or corporate focus in the news flow. |
| **Text Complexity Metrics** | `len_text`, `num_words`, `avg_word_len` | Cheap proxies for article depth, often correlated with event severity or informational richness. |
| **Identifiers** | `url` | Only used for joins and diagnostics; not part of modelling. |

### Columns Dropped  
All high-dimensional or non-economic fields are removed to avoid exploding feature space:  
- **Embeddings:** `emb_0`–`emb_383`  
- **PCA vectors:** `pca_0`–`pca_49`  
- **Raw Text:** `text`  
- **Author / publication metadata:** `author`, `publication`, `source_file`  
- **Silver ingestion timestamps**  
- **Mixed / noisy theme clusters**  


In [3]:
from pyspark.sql.functions import to_date

relevant_cols = [
    "url", "date", "thematic_cluster",
    "lm_pos", "lm_neg", "lm_unc", "lm_lit", "lm_con",
    "ner_org", "ner_gpe", "ner_person",
    "len_text", "num_words", "avg_word_len"
]

df_slim = df.select(*relevant_cols)
df_slim = df_slim.withColumn("date_key", to_date("date"))

relevant_thematic_clusters = [1, 2, 3, 4, 7, 11, 12]

df_slim = df_slim.filter(df_slim.thematic_cluster.isin(relevant_thematic_clusters))


In [4]:
from pyspark.sql.functions import count, sum as Fsum

# 1. Daily article counts per theme
df_theme_daily = (
    df_slim
        .groupBy("date_key", "thematic_cluster")
        .agg(count("*").alias("count_articles"))
)

df_theme_pivot = (
    df_theme_daily
        .groupBy("date_key")
        .pivot("thematic_cluster")
        .agg(Fsum("count_articles"))
        .na.fill(0)
)

# 2. Sentiment + NER + text metrics per date × theme
df_theme_extra = (
    df_slim
        .groupBy("date_key", "thematic_cluster")
        .agg(
            Fsum("lm_pos").alias("pos_sum"),
            Fsum("lm_neg").alias("neg_sum"),
            Fsum("lm_unc").alias("unc_sum"),
            Fsum("ner_org").alias("org_sum"),
            Fsum("ner_gpe").alias("gpe_sum"),
            Fsum("ner_person").alias("person_sum"),
            Fsum("len_text").alias("total_text_len"),
            Fsum("num_words").alias("total_words"),
        )
)

df_theme_extra_pivot = (
    df_theme_extra
        .groupBy("date_key")
        .pivot("thematic_cluster")
        .agg(
            Fsum("pos_sum").alias("pos_sum"),
            Fsum("neg_sum").alias("neg_sum"),
            Fsum("unc_sum").alias("unc_sum"),
            Fsum("org_sum").alias("org_sum"),
            Fsum("gpe_sum").alias("gpe_sum"),
            Fsum("person_sum").alias("person_sum"),
            Fsum("total_text_len").alias("total_text_len"),
            Fsum("total_words").alias("total_words")
        )
)

# 3. Rename columns based on chosen 1-word theme labels
theme_map = {
    "1": "earnings",
    "2": "markets",
    "3": "macro",
    "4": "energy",
    "7": "tech",
    "11": "trade",
    "12": "geopol"
}

for old, new in theme_map.items():
    for metric in ["pos_sum", "neg_sum", "unc_sum", "org_sum",
                   "gpe_sum", "person_sum", "total_text_len", "total_words"]:
        old_col = f"{old}_{metric}"
        new_col = f"{new}_{metric}"
        if old_col in df_theme_extra_pivot.columns:
            df_theme_extra_pivot = df_theme_extra_pivot.withColumnRenamed(old_col, new_col)


                                                                                

In [5]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ---- 4. Merge article-count pivot and sentiment/NER/words pivot ----

df_daily = (
    df_theme_pivot
        .join(df_theme_extra_pivot, on="date_key", how="inner")
        .orderBy("date_key")
)

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# List of themes
themes = ["earnings", "markets", "macro", "energy", "tech", "trade", "geopol"]

# ---- 1. Compute total words across ALL themes per day ----

total_words_expr = sum(F.col(f"{t}_total_words") for t in themes)

df_daily = df_daily.withColumn("allthemes_total_words", total_words_expr)

# ---- 2. Compute SHARE OF NEWS for each theme ----

for theme in themes:
    df_daily = df_daily.withColumn(
        f"{theme}_activity_share",
        F.col(f"{theme}_total_words") / (F.col("allthemes_total_words") + F.lit(1))
    )

# ---- 3. Rolling window for z-scores ----

w7 = Window.orderBy("date_key").rowsBetween(-7, -1)

# ---- 4. Rolling Z-score of activity share ----

for theme in themes:
    base = f"{theme}_activity_share"
    zname = f"{theme}_activity_share_z7"

    df_daily = df_daily.withColumn(
        zname,
        (F.col(base) - F.avg(base).over(w7)) /
        (F.stddev(base).over(w7) + F.lit(1))
    )

# ---- 7. Sentiment balance, sentiment intensity, uncertainty ratio ----

for theme in themes:
    
    pos = f"{theme}_pos_sum"
    neg = f"{theme}_neg_sum"
    unc = f"{theme}_unc_sum"
    words = f"{theme}_total_words"

    # Sentiment balance: (pos - neg) / (pos + neg + 1)
    df_daily = df_daily.withColumn(
        f"{theme}_sentiment_balance",
        (F.col(pos) - F.col(neg)) / (F.col(pos) + F.col(neg) + F.lit(1))
    )

    # Sentiment intensity: |pos - neg|
    df_daily = df_daily.withColumn(
        f"{theme}_sentiment_intensity",
        F.abs(F.col(pos) - F.col(neg))
    )

df_daily.show(10, truncate=False)

25/12/08 18:40:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:40:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:40:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:40:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:40:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:40:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 1

+----------+---+---+---+---+---+---+---+----------------+----------------+----------------+----------------+----------------+-------------------+-----------------------+--------------------+---------------+---------------+---------------+---------------+---------------+------------------+----------------------+-------------------+-------------+-------------+-------------+-------------+-------------+----------------+--------------------+-----------------+--------------+--------------+--------------+--------------+--------------+-----------------+---------------------+------------------+------------+------------+------------+------------+------------+---------------+-------------------+----------------+-------------+-------------+-------------+-------------+-------------+----------------+--------------------+-----------------+--------------+--------------+--------------+--------------+--------------+-----------------+---------------------+------------------+---------------------+--------

In [6]:
#uncertainty ratio, sentiment volatility, uncertainty z-score
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w7 = Window.orderBy("date_key").rowsBetween(-7, -1)

for theme in themes:
    unc = f"{theme}_unc_sum"
    words = f"{theme}_total_words"
    bal = f"{theme}_sentiment_balance"
    
    # --- 1. Normalised uncertainty ---
    df_daily = df_daily.withColumn(
        f"{theme}_uncertainty_ratio",
        F.col(unc) / (F.col(words) + F.lit(1))
    )
    
    # --- 2. Sentiment volatility (rolling 7d std) ---
    df_daily = df_daily.withColumn(
        f"{theme}_sentiment_volatility",
        F.stddev(bal).over(w7)
    )
    
    # --- 3. Surprise uncertainty (rolling z-score) ---
    df_daily = df_daily.withColumn(
        f"{theme}_uncertainty_z7",
        (F.col(f"{theme}_uncertainty_ratio")
         - F.avg(f"{theme}_uncertainty_ratio").over(w7))
        /
        (F.stddev(f"{theme}_uncertainty_ratio").over(w7) + F.lit(1))
    )


In [7]:
df_daily.printSchema()

root
 |-- date_key: date (nullable = true)
 |-- 1: long (nullable = true)
 |-- 2: long (nullable = true)
 |-- 3: long (nullable = true)
 |-- 4: long (nullable = true)
 |-- 7: long (nullable = true)
 |-- 11: long (nullable = true)
 |-- 12: long (nullable = true)
 |-- earnings_pos_sum: long (nullable = true)
 |-- earnings_neg_sum: long (nullable = true)
 |-- earnings_unc_sum: long (nullable = true)
 |-- earnings_org_sum: long (nullable = true)
 |-- earnings_gpe_sum: long (nullable = true)
 |-- earnings_person_sum: long (nullable = true)
 |-- earnings_total_text_len: long (nullable = true)
 |-- earnings_total_words: long (nullable = true)
 |-- markets_pos_sum: long (nullable = true)
 |-- markets_neg_sum: long (nullable = true)
 |-- markets_unc_sum: long (nullable = true)
 |-- markets_org_sum: long (nullable = true)
 |-- markets_gpe_sum: long (nullable = true)
 |-- markets_person_sum: long (nullable = true)
 |-- markets_total_text_len: long (nullable = true)
 |-- markets_total_words: long 

In [8]:
#to pd to csv
pdf = df_daily.toPandas()
out_path = ROOT / "modelling" / "features"
out_path.mkdir(parents=True, exist_ok=True)
pdf.to_csv(out_path / "daily_features.csv", index=False)
print("Saved:", out_path / "daily_features.csv")


25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 18:42:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/08 1

Saved: /home/david/School/CapStone/modelling/features/daily_features.csv


25/12/09 13:53:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 47588860 ms exceeds timeout 120000 ms
25/12/09 13:53:47 WARN SparkContext: Killing executors is not supported by current scheduler.
25/12/09 13:53:48 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint