---
# **Dynamic Classification of S&P 500 Stocks Using Distributed Computing and Fundamental Ratios**

---



## **ST446 GROUP PROJECT**
## **CANDIDATE NUMBERS:** 50714, 49775,49663, 49872

---


**NOTE FOR EVALUATOR:**

---> **Dataset link is uploaded in a pdf file named ST446 DATASET LINK.pdf file**

--> **Our final dataset can be found in three solution notebooks uploaded in a folder named ST446 GROUP PROJECT SOLUTION**

--> **Data Preprocessing steps are done in main solution files [ 3 solution files ]**

--> **Here we had used api keys and different methods for api extraction for extracting macro economic data. For fundamental data we directly downloaded from bloomberg by choosing our parameters based on our project objective.**

---
## **DATA EXTRACTION**
---

---
### **WE USED SOURCES FROM:**


*   **FRED API - FOR MACRO DATA**
*   **BLOOMBERG - FUNDAMENTAL DATA**


---

In [None]:
! pip install fredapi
! pip install yfinance
! pip install pandas-datareader

[0m

---
# **Cluster set up**
---

```bash
gcloud dataproc clusters create st446-cluster-final \
  --enable-component-gateway \
  --public-ip-address \
  --region=europe-west2 \
  --master-machine-type=n2-standard-2 \
  --master-boot-disk-size=100 \
  --num-workers=2 \
  --worker-machine-type=n2-standard-2 \
  --worker-boot-disk-size=200 \
  --image-version=2.2-debian12 \
  --optional-components=JUPYTER \
  --metadata='PIP_PACKAGES=lightgbm xgboost causalml scikit-learn pandas numpy matplotlib seaborn synapseml==0.11.1' \
  --initialization-actions='gs://st446-assignment-data/my_actions.sh' \
  --properties=^#^spark:spark.dynamicAllocation.enabled=false#spark:spark.jars.packages=ml.dmlc:xgboost4j-spark_2.12:1.6.1,com.microsoft.azure:synapseml_2.12:0.11.1 \
  --project=st446-wt2025-452419
```


### **Notes on Spark Properties**

To specify multiple Spark properties in `--properties`, a custom separator (`^#^`) is used to avoid syntax conflicts with colons or commas in Maven artifact strings. This syntax ensures that both configuration settings and external packages (e.g., XGBoost and SynapseML) are correctly passed at cluster creation. Defining multiple `--properties` flags across lines is not supported; only the last occurrence will take effect.

---
## **IMPORTING LIBRARIES**

---

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, last, row_number
from pyspark.sql.window import Window

import pandas as pd
from functools import reduce
from fredapi import Fred
import sqlite3

---
## **EXTRACTING MACROECONOMIC DATA USING API [FRED]**

---

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Macroeconomic Indicators from FRED") \
    .getOrCreate()

# SInitialize Fred API
fred_key = 'e9bc109577b3480299a5db51dcb0969e'
fred = Fred(api_key=fred_key)

# Define indicators and FRED series IDs
series_ids = {
    'GDP Growth': 'A191RL1Q225SBEA',
    'CPI': 'CPALTT01USM657N',
    'PMI': 'CUUR0000SA0R',
    'Unemployment Rate': 'UNRATE',
    'Retail Sales': 'RSAFS'
}

start_date = '1992-01-01'
end_date = '2025-01-04'

# SFetch each series and merge
dfs = []

for indicator, series_id in series_ids.items():
    try:
        series_data = fred.get_series(series_id, observation_start=start_date, observation_end=end_date)
        df = series_data.reset_index()
        df.columns = ["date", indicator]
        dfs.append(df)
    except Exception as e:
        print(f"Error fetching data for {indicator}: {e}")

df_combined = reduce(lambda left, right: pd.merge(left, right, on='date', how='outer'), dfs)

# Convert to Spark DataFrame
df_combined["date"] = pd.to_datetime(df_combined["date"])
spark_df = spark.createDataFrame(df_combined)
spark_df = spark_df.withColumn("date", to_date("date"))

spark_df.show(10)

25/05/05 17:27:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+----------+-------------------+----+-----------------+------------+
|      date|GDP Growth|                CPI| PMI|Unemployment Rate|Retail Sales|
+----------+----------+-------------------+----+-----------------+------------+
|1992-01-01|       4.9|  0.145032632342278|72.4|              7.3|    159177.0|
|1992-04-01|       4.4|  0.143575017946859|71.7|              7.4|    159921.0|
|1992-07-01|       4.0|0.21398002853068102|71.2|              7.7|    162855.0|
|1992-10-01|       4.2|  0.353857041755123|70.5|              7.3|    166063.0|
|1993-01-01|       0.7|0.49330514446792795|70.1|              7.3|    169500.0|
|1993-04-01|       2.3|  0.278551532033427|69.5|              7.1|    172102.0|
|1993-07-01|       1.9|                0.0|69.2|              6.9|    175822.0|
|1993-10-01|       5.6|0.41350792556855703|68.6|              6.8|    177588.0|
|1994-01-01|       3.9|  0.274348422496554|68.4|              6.6|    179615.0|
|1994-04-01|       5.5|  0.1358695652173

In [None]:
# Simulate backward fill using window functions
bfill_window = Window.orderBy(col("date").asc()).rowsBetween(0, Window.unboundedFollowing)

# Identify columns to fill (excluding 'date')
cols_to_fill = [c for c in spark_df.columns if c != "date"]

# Apply backward fill column by column
for c in cols_to_fill:
    spark_df = spark_df.withColumn(c, last(c, ignorenulls=True).over(bfill_window))

# Drop the last row using row number
rownum_window = Window.orderBy("date")
spark_df_with_rownum = spark_df.withColumn("row_num", row_number().over(rownum_window))

# Get the last row number
max_row = spark_df_with_rownum.agg({"row_num": "max"}).collect()[0][0]

# Filter out the last row
spark_df_trimmed = spark_df_with_rownum.filter(col("row_num") < max_row).drop("row_num")

25/05/05 17:27:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [None]:
# Write to a single CSV file in GCS
output_path = "gs://st446-data/economic_indicators_spark"

spark_df_trimmed.coalesce(1) \
    .write.option("header", True) \
    .mode("overwrite") \
    .csv(output_path)

print("Successfully saved data to:", output_path)

25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:27:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Successfully saved data to: gs://st446-data/economic_indicators_spark


----
## **SQL API EXTRACTION**

[ extracting data using sql method ]

---

In [None]:
# Create Spark session
spark = SparkSession.builder \
    .appName("FRED Macro Data") \
    .getOrCreate()

# Connect to FRED API
fred_key = "e9bc109577b3480299a5db51dcb0969e"
fred = Fred(api_key=fred_key)

# Define series and date range
series_ids = {
    'US Corporate as % of GDP': 'CP',
    'Net Export Growth': 'NETEXP',
    'Industrial Production (IIP)': 'INDPRO',
    '10-Year Government Bond Rate': 'IRLTLT01USM156N',
    'Saving Rate of People': 'PSAVERT',
    'New Home Sales': 'HSN1F',
    'FED Fund Rate': 'FEDFUNDS'
}

start_date = '1992-01-01'
end_date = '2025-01-04'

# Fetch and merge data
dfs = []

for indicator, series_id in series_ids.items():
    try:
        series_data = fred.get_series(series_id, observation_start=start_date, observation_end=end_date)
        df = series_data.reset_index()
        df.columns = ["date", indicator]
        dfs.append(df)
    except Exception as e:
        print(f"Error fetching data for {indicator}: {e}")

# Merge into one pandas DataFrame
df_combined = reduce(lambda left, right: pd.merge(left, right, on='date', how='outer'), dfs)
df_combined["date"] = pd.to_datetime(df_combined["date"])

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df_combined)
spark_df = spark_df.withColumn("date", to_date("date"))


# Show preview
spark_df.show(10)

25/05/05 17:27:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|      date|US Corporate as % of GDP|Net Export Growth|Industrial Production (IIP)|10-Year Government Bond Rate|Saving Rate of People|New Home Sales|FED Fund Rate|
+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|1992-01-01|                 329.803|          -20.536|                    61.4823|                        7.03|                  9.5|         676.0|         4.03|
|1992-04-01|                 344.556|          -32.788|                    62.9199|                        7.48|                  9.8|         546.0|         3.73|
|1992-07-01|                 328.651|          -38.488|                    63.7348|                        6.84|                  9.5|         627.0|         3.25|
|1992-10-01|    

In [None]:
# Convert Spark DataFrame to Pandas
df_pandas = spark_df.toPandas()

# Connect to SQLite
conn = sqlite3.connect("fred_data.db")

# Save to SQLite
df_pandas.to_sql("fred_data", conn, if_exists="replace", index=False)

print("Successfully inserted data into SQLite database.")

conn.close()

Successfully inserted data into SQLite database.


In [None]:
# Reconnect to SQLite
conn = sqlite3.connect("fred_data.db")

# SQL query to select desired columns
query = """
SELECT
    date,
    "US Corporate as % of GDP",
    "Net Export Growth",
    "Industrial Production (IIP)",
    "10-Year Government Bond Rate",
    "Saving Rate of People",
    "New Home Sales",
    "FED Fund Rate"
FROM fred_data
"""

# Load into pandas DataFrame
df_extracted = pd.read_sql_query(query, conn)
df_extracted["date"] = pd.to_datetime(df_extracted["date"])

# Convert to Spark DataFrame
spark_df_extracted = spark.createDataFrame(df_extracted)

# Convert 'date' to Spark date type
from pyspark.sql.functions import to_date
spark_df_extracted = spark_df_extracted.withColumn("date", to_date("date"))

# Use it in your pipeline
spark_df_extracted.show()

+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|      date|US Corporate as % of GDP|Net Export Growth|Industrial Production (IIP)|10-Year Government Bond Rate|Saving Rate of People|New Home Sales|FED Fund Rate|
+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|1992-01-01|                 329.803|          -20.536|                    61.4823|                        7.03|                  9.5|         676.0|         4.03|
|1992-04-01|                 344.556|          -32.788|                    62.9199|                        7.48|                  9.8|         546.0|         3.73|
|1992-07-01|                 328.651|          -38.488|                    63.7348|                        6.84|                  9.5|         627.0|         3.25|
|1992-10-01|    

In [None]:
spark_df_extracted = spark_df_extracted.withColumn("date", to_date("date"))

In [None]:
# Save to CSV (as a single file) in GCS bucket
output_path = "gs://st446-data/economic_indicators_sql_spark"

spark_df_extracted.coalesce(1) \
    .write.option("header", True) \
    .mode("overwrite") \
    .csv(output_path)

print("Successfully saved this data to:", output_path)

                                                                                

Successfully saved this data to: gs://st446-data/economic_indicators_sql_spark


In [None]:
# Define a backward-looking window (i.e., current row to the bottom)
bfill_window = Window.orderBy("date").rowsBetween(0, Window.unboundedFollowing)

# Apply backward fill to all columns except 'date'
columns_to_bfill = [c for c in spark_df_extracted.columns if c != "date"]

# Fill each column using last() with ignorenulls=True
for c in columns_to_bfill:
    spark_df_extracted = spark_df_extracted.withColumn(
        c, last(c, ignorenulls=True).over(bfill_window)
    )

# Show result
spark_df_extracted.show()

25/05/05 17:28:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:28:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:28:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:28:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/05 17:28:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|      date|US Corporate as % of GDP|Net Export Growth|Industrial Production (IIP)|10-Year Government Bond Rate|Saving Rate of People|New Home Sales|FED Fund Rate|
+----------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|1992-01-01|                     NaN|        -1262.307|                   103.3418|                        4.63|                  3.9|         654.0|         4.33|
|1992-02-01|                     NaN|        -1262.307|                   103.3418|                        4.63|                  3.9|         654.0|         4.33|
|1992-03-01|                     NaN|        -1262.307|                   103.3418|                        4.63|                  3.9|         654.0|         4.33|
|1992-04-01|    

In [None]:
# Read both Spark-written CSVs from GCS
df_fred = spark.read.option("header", True).option("inferSchema", True).csv("gs://st446-data/economic_indicators_spark/")
df_sql = spark.read.option("header", True).option("inferSchema", True).csv("gs://st446-data/economic_indicators_sql_spark/")

df_fred = df_fred.withColumn("date", to_date("date"))
df_sql = df_sql.withColumn("date", to_date("date"))

# Join both DataFrames on 'date' column (since Spark has no index)
df_combined = df_fred.join(df_sql, on="date", how="inner")

# Show result
df_combined.show()

+----------+----------+---+----+-----------------+------------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|      date|GDP Growth|CPI| PMI|Unemployment Rate|Retail Sales|US Corporate as % of GDP|Net Export Growth|Industrial Production (IIP)|10-Year Government Bond Rate|Saving Rate of People|New Home Sales|FED Fund Rate|
+----------+----------+---+----+-----------------+------------+------------------------+-----------------+---------------------------+----------------------------+---------------------+--------------+-------------+
|1992-01-01|      -0.3|NaN|31.5|              4.0|    711461.0|                 329.803|          -20.536|                    61.4823|                        7.03|                  9.5|         676.0|         4.03|
|1992-04-01|      -0.3|NaN|31.5|              4.0|    711461.0|                 344.556|          -32.788|                    62.9199|      

In [None]:
# Save to GCS temporary folder
temp_path = "gs://st446-data/tmp_st446_macro_data"

df_combined.coalesce(1) \
    .write.option("header", True) \
    .mode("overwrite") \
    .csv(temp_path)

                                                                                

---
## **EXTRACTING FUNDAMENTAL DATA FROM BLOOMBERG**

---

---

#### Here, we select our required parameters based on our objective and download the dataset.

#### We download the `st446_macro_data.csv` from the bucket, uploading the `st446_trading_and_fundamental_data_bbg.csv`- download from Bloomberg, and merging the data.

---

---
### **MERGING THE DATA**

---

In [None]:
# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Set your bucket name
bucket = 'st446-data'

# Define file paths
fundamental_path = f'gs://{bucket}/st446_trading_and_fundamental_data_bbg.csv'
macro_path = f'gs://{bucket}/st446_macro_data.csv'

# Read CSV files into DataFrames
fundamental_df = spark.read.csv(fundamental_path, header=True, inferSchema=True)
macro_df = spark.read.csv(macro_path, header=True, inferSchema=True)

                                                                                

In [None]:
macro_df = macro_df.withColumnRenamed('date', 'macro_date')

In [None]:
data_df = fundamental_df.join(macro_df, on='month', how='left')
data_df = data_df.drop('macro_date', 'month','quarter','year','px_last','cur_mkt_cap','logret','spx_logret')

In [None]:
# Count rows that have any nulls
null_rows = data_df.filter(
    sum(col(c).isNull().cast("int") for c in data_df.columns) > 0
)

# Show how many rows have at least one NULL
print("Number of rows with at least one NULL:", null_rows.count())




Number of rows with at least one NULL: 91



                                                                                

In [None]:
data = data_df.dropna(how='any')

In [None]:
# Filter the data of SPX index
data = data.filter(data.stock != 'SPX Index')

In [None]:

n_rows = data.count()

n_cols = len(data.columns)

print(f"DataFrame shape: ({n_rows}, {n_cols})")



DataFrame shape: (146165, 44)



                                                                                

In [None]:
data.show(5,truncate=False)


[Stage 80:>                                                         (0 + 1) / 1]

+-----+----------+-----+----------------+---------+-----------+----------+-------------------+---------------------+-----------------------+------------+-----------+-----------+---------------+--------------+--------------+-------------+------------------------------+-------------+--------+----------------+-----------------+--------------+---------+---------------+--------------+------------+------------+-------------------+------------------+------------------+----------------+---------------+----------+----+-----------------+------------+-----------------+----------------------------+---------------------+--------------+-------------+------------+----------+
|stock|date      |score|market_cap_score|cur_ratio|quick_ratio|cash_ratio|tot_debt_to_tot_eqy|tot_debt_to_tot_asset|interest_coverage_ratio|gross_margin|oper_margin|prof_margin|return_on_asset|return_com_eqy|asset_turnover|acct_rcv_turn|accounts_payable_turnover_days|acct_rcv_days|pe_ratio|px_to_book_ratio|px_to_sales_ratio|divid


                                                                                