# Data Exploration

---

**Author:** Diego Antonio Garc√≠a Padilla

**Date:** Oct 29, 2025

## Enviroment setup

In [None]:
#@title Setup & Environment Verification

import warnings
warnings.filterwarnings('ignore')

import os
import sys

print("=== ENVIRONMENT CHECK ===")
print(f"Python: {sys.version.split()[0]}")
print(f"JAVA_HOME: {os.environ.get('JAVA_HOME')}")
print(f"SPARK_HOME: {os.environ.get('SPARK_HOME')}")
print(f"Driver Memory: {os.environ.get('SPARK_DRIVER_MEMORY')}")
print(f"Executor Memory: {os.environ.get('SPARK_EXECUTOR_MEMORY')}")
print("=" * 50)

In [None]:
#@title Import Libraries

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Data manipulation
import pandas as pd
import numpy as np

# Financial data
import yfinance as yf

# Hugging Face
from huggingface_hub import hf_hub_download

# Utilities
from datetime import datetime, timedelta
import json
import requests
import logging
from tqdm import tqdm
import time

In [None]:
#@title Start Spark session

print("=== PRE-FLIGHT CHECK ===")

# Auto-detect JAVA_HOME if not set properly
def find_java_home():
    try:
        # Method 1: Use 'which java' and follow symlinks
        java_path = subprocess.check_output(['which', 'java'], text=True).strip()
        java_home = os.path.dirname(os.path.dirname(os.path.realpath(java_path)))
        return java_home
    except:
        # Method 2: Common locations
        common_paths = [
            '/usr/lib/jvm/java-8-openjdk-amd64',
            '/usr/lib/jvm/java-8-openjdk-arm64',
            '/usr/lib/jvm/default-java',
            '/usr/lib/jvm/java-8-openjdk',
        ]
        for path in common_paths:
            if os.path.exists(os.path.join(path, 'bin', 'java')):
                return path
        return None

# Check current JAVA_HOME
current_java_home = os.environ.get('JAVA_HOME')
print(f"Current JAVA_HOME: {current_java_home}")

# Verify Java executable exists
if current_java_home:
    java_bin = os.path.join(current_java_home, 'bin', 'java')
    if not os.path.exists(java_bin):
        print(f"‚ö†Ô∏è  Java not found at {java_bin}")
        detected_java_home = find_java_home()
        if detected_java_home:
            os.environ['JAVA_HOME'] = detected_java_home
            print(f"‚úÖ Auto-detected JAVA_HOME: {detected_java_home}")
        else:
            print("‚ùå Could not find Java installation")
    else:
        print(f"‚úÖ Java found at {java_bin}")
else:
    detected_java_home = find_java_home()
    if detected_java_home:
        os.environ['JAVA_HOME'] = detected_java_home
        print(f"‚úÖ Auto-detected JAVA_HOME: {detected_java_home}")

print(f"SPARK_HOME: {os.environ.get('SPARK_HOME', 'NOT SET')}")

# Verify Java is accessible
try:
    java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT)
    print(f"Java: ‚úÖ Available")
except Exception as e:
    print(f"Java: ‚ùå Not available - {e}")

print("=" * 50)

from pyspark.sql import SparkSession

try:
    # Stop any existing Spark sessions
    SparkSession.builder.getOrCreate().stop()
except:
    pass

# Create new session
spark = SparkSession.builder \
    .appName("SAPS_Portfolio_Optimizer") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.ui.port", "4040") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.maxResultSize", "3g") \
    .config("spark.sql.shuffle.partitions", "60") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.autoBroadcastJoinThreshold", "50MB") \
    .config("spark.local.dir", "/tmp/spark") \
    .config("spark.eventLog.enabled", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"\n‚úÖ Spark {spark.version} initialized successfully")
print(f"   Master: {spark.sparkContext.master}")
print(f"   App Name: {spark.sparkContext.appName}")
print(f"   Available cores: {spark.sparkContext.defaultParallelism}")
print(f"   Spark UI: http://localhost:4040")

## Download data

### Stock Prices from `yfinance`

In [None]:
#@title Get S&P 500 companies and select diversified portfolio

TICKERS_NUMBER = 100
tickers_file = '../data/selected_tickers.txt'

# Check if tickers file already exists
if os.path.exists(tickers_file):
    print(f"‚úÖ File '{tickers_file}' already exists. Loading tickers...")
    with open(tickers_file, 'r') as f:
        selected_tickers = [line.strip() for line in f.readlines()]
    print(f"Loaded {len(selected_tickers)} tickers from file")
else:
    print("Fetching S&P 500 companies from Wikipedia...")
    
    # URL of the Wikipedia page listing S&P 500 companies
    url = "https://en.wikipedia.org/wiki/List_of_S&P_500_companies"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    }
    
    # Fetch the HTML content
    response = requests.get(url, headers=headers)
    
    # Read the S&P 500 table
    sp500_data = pd.read_html(response.text)[0]
    print(f"Successfully retrieved {len(sp500_data)} S&P 500 companies")
    
    # Define sector allocation (100 stocks total)
    sector_allocation = {
        'Information Technology': 20,
        'Financials': 15,
        'Health Care': 15,
        'Consumer Discretionary': 15,
        'Energy': 10,
        'Industrials': 10,
        'Consumer Staples': 5,
        'Real Estate': 5,
        'Utilities': 5
    }
    
    # Tickers to exclude
    excluded_tickers = ['BRK.B', 'BF.B']
    
    # Select diversified stocks by sector
    selected_tickers = []
    for sector, count in sector_allocation.items():
        sector_stocks = sp500_data[sp500_data['GICS Sector'] == sector]['Symbol'].tolist()
        # Filter out excluded tickers
        sector_stocks = [ticker for ticker in sector_stocks if ticker not in excluded_tickers]
        selected_tickers.extend(sector_stocks[:count])
    
    print(f"\nTotal tickers selected: {len(selected_tickers)}")
    
    # Save for later use
    with open(tickers_file, 'w') as f:
        f.write('\n'.join(selected_tickers))
    print(f"‚úÖ Tickers saved to '{tickers_file}'")
    print(f"Tickers: ", len(selected_tickers))

In [None]:
#@title Get stock prices from yfinance

# Download price data for selected tickers
start_date = '2015-01-01'
end_date = '2025-01-01'

# Check if file already exists
csv_file = '../data/raw/sp500_prices_raw.csv'

if os.path.exists(csv_file):
    print(f"‚úÖ File '{csv_file}' already exists. Skipping download.")
else:
    print(f"Downloading data for {len(selected_tickers)} tickers...")

    # Get selected tickers
    selected_tickers = pd.read_csv('../data/selected_tickers.txt', header=None)[0].tolist()

    # Download all data at once
    all_data = yf.download(
        selected_tickers,
        start=start_date,
        end=end_date,
        group_by='ticker',
        threads=True,
        progress=True
    )

    print(f"Downloaded data for {len(selected_tickers)} tickers")

    # Save to CSV for PySpark processing
    all_data.to_csv(csv_file, index=False)

In [None]:
#@title Convert into PySpark dataframe (Part 1)

# File
csv_file = '../data/raw/sp500_prices_raw.csv'
parquet_file = '../data/raw/sp500_prices_parquet.parquet'

# Read the multi-index CSV from yfinance
price_df_pandas = pd.read_csv(csv_file, header=[0,1], index_col=0, parse_dates=True)

# Flatten the multi-index structure to long format
price_data_list = []

for ticker in price_df_pandas.columns.get_level_values(0).unique():
    try:
        # Extract data for this ticker
        ticker_df = price_df_pandas[ticker].copy()
        ticker_df['ticker'] = ticker
        ticker_df['date'] = ticker_df.index

        # Rename columns to lowercase
        ticker_df = ticker_df.rename(columns={
            'Open': 'open',
            'High': 'high',
            'Low': 'low',
            'Close': 'close',
            'Volume': 'volume'
        })

        # Select only required columns
        ticker_df = ticker_df[['date', 'ticker', 'open', 'high', 'low', 'close', 'volume']]

        price_data_list.append(ticker_df)

    except Exception as e:
        print(f"Error processing {ticker}: {e}")

# Concatenate all ticker dataframes
price_df_long = pd.concat(price_data_list, ignore_index=True)

# Add adj_close as close for now
price_df_long['adj_close'] = price_df_long['close']

# FIX: Convert date to string to avoid timestamp precision issues
price_df_long['date'] = price_df_long['date'].dt.strftime('%Y-%m-%d')

print(f"Long format created: {len(price_df_long)} rows")
print(f"Unique tickers: {price_df_long['ticker'].nunique()}")

# Save as parquet (now with date as string)
price_df_long.to_parquet(parquet_file, index=False, engine='pyarrow')
print(f"Saved to {parquet_file}")

In [None]:
#@title Convert into PySpark dataframe (Part 2)

from pyspark.sql.types import StructType, StructField, DateType, StringType, DoubleType, LongType

# Load parquet with date as string
price_spark_df = spark.read.parquet('../data/raw/sp500_prices_parquet.parquet')

# Convert string date to proper DateType
price_spark_df = price_spark_df \
    .withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd")) \
    .withColumn("open", F.col("open").cast("double")) \
    .withColumn("high", F.col("high").cast("double")) \
    .withColumn("low", F.col("low").cast("double")) \
    .withColumn("close", F.col("close").cast("double")) \
    .withColumn("adj_close", F.col("adj_close").cast("double")) \
    .withColumn("volume", F.col("volume").cast("long")) \
    .filter(F.col("close").isNotNull()) \
    .orderBy("date", "ticker")

# Cache for faster operations
price_spark_df.cache()

print(f"PySpark DataFrame loaded: {price_spark_df.count():,} rows")
print("\nSchema:")
price_spark_df.printSchema()
print("\nSample data:")
price_spark_df.show(10, truncate=False)

### Stock News from `Hugging Face`

In [None]:
#@title Login into Hugging Face

from huggingface_hub import login
import os

# Try to load from .env file (for local development)
try:
    from dotenv import load_dotenv
    load_dotenv()
    hf_token = os.getenv('HF_TOKEN')
except ImportError:
    # If python-dotenv not installed, try Google Colab secrets
    try:
        from google.colab import userdata
        hf_token = userdata.get('HF_TOKEN')
    except ImportError:
        # Fall back to environment variable
        hf_token = os.getenv('HF_TOKEN')

# Log in using the retrieved token
if hf_token:
    login(token=hf_token)
    print("‚úÖ Logged in to Hugging Face")
else:
    print("‚ö†Ô∏è  HF_TOKEN not found. Please set it in .env file or environment variables.")

In [None]:
#@title Download FNSPID from Hugging Face (Part 1: Download & Filter)

print("=" * 60)
print("DOWNLOADING FNSPID - OPTIMIZED VERSION")
print("=" * 60)

# File paths
raw_csv_file = '../data/raw/fnspid_news_raw.csv'
parquet_file = '../data/raw/fnspid_news_filtered.parquet'

# Check if already downloaded and filtered
if os.path.exists(parquet_file):
    print(f"‚úÖ Filtered parquet file already exists: {parquet_file}")
    print("Skipping download and filtering...")
else:
    # Step 1: Download with progress tracking
    print("\nüì• Step 1: Downloading from Hugging Face...")
    print("(This may take several minutes for large files)")

    file_path = hf_hub_download(
        repo_id="Zihan1004/FNSPID",
        filename="Stock_news/All_external.csv",
        repo_type="dataset",
        resume_download=True  # Allow resuming if interrupted
    )

    print(f"‚úÖ Downloaded to: {file_path}")

    # Check file size
    file_size_mb = os.path.getsize(file_path) / (1024**2)
    print(f"File size: {file_size_mb:.2f} MB")

    # Step 2: Load and filter in chunks (memory efficient)
    print("\nüìä Step 2: Loading and filtering data in chunks...")
    print("This prevents memory overflow by processing incrementally")

    # Load tickers to filter
    with open('../data/selected_tickers.txt', 'r') as f:
        selected_tickers = [line.strip() for line in f.readlines()]

    selected_tickers_set = set(selected_tickers)
    print(f"Filtering for {len(selected_tickers)} tickers")

    # Process in chunks
    chunk_size = 50000  # Process 50k rows at a time
    filtered_chunks = []
    total_rows_processed = 0
    total_rows_kept = 0

    # First, peek at the columns to find ticker column
    sample = pd.read_csv(file_path, nrows=5)
    print(f"\nAvailable columns: {sample.columns.tolist()}")

    # Find ticker column (could be 'ticker', 'symbol', 'stock', etc.)
    ticker_col = None
    for col in ['Stock_symbol', 'ticker', 'symbol']:
        if col in sample.columns:
            ticker_col = col
            break

    if ticker_col is None:
        print("‚ö†Ô∏è  Could not find ticker column. Loading all rows...")
        fnspid_df_pandas = pd.read_csv(file_path)
    else:
        print(f"Found ticker column: '{ticker_col}'")
        print("\nProcessing chunks...")

        # Read and filter in chunks
        with tqdm(desc="Processing", unit=" rows") as pbar:
            for chunk in pd.read_csv(file_path, chunksize=chunk_size):
                total_rows_processed += len(chunk)

                # Filter for our tickers
                filtered_chunk = chunk[chunk[ticker_col].isin(selected_tickers_set)]

                if len(filtered_chunk) > 0:
                    filtered_chunks.append(filtered_chunk)
                    total_rows_kept += len(filtered_chunk)

                pbar.update(len(chunk))
                pbar.set_postfix({
                    'kept': total_rows_kept,
                    'processed': total_rows_processed
                })

        # Combine filtered chunks
        if filtered_chunks:
            fnspid_df_pandas = pd.concat(filtered_chunks, ignore_index=True)
            print(f"\n‚úÖ Filtered: {total_rows_kept:,} / {total_rows_processed:,} rows")
            print(f"   Reduction: {(1 - total_rows_kept/total_rows_processed)*100:.1f}%")
        else:
            print("\n‚ö†Ô∏è  No matching data found. Loading sample...")
            fnspid_df_pandas = pd.read_csv(file_path, nrows=10000)

    # Step 3: Save as parquet for Spark processing
    print(f"\nüíæ Saving filtered data to: {parquet_file}")
    fnspid_df_pandas.to_parquet(parquet_file, index=False, engine='pyarrow')
    print("‚úÖ Saved!")

    print(f"\nFiltered Pandas DataFrame: {len(fnspid_df_pandas):,} rows")
    print(f"Columns: {fnspid_df_pandas.columns.tolist()}")

In [None]:
#@title Load FNSPID with PySpark (Part 2: Convert to Spark DataFrame)

# Load parquet file with Spark
fnspid_spark_df = spark.read.parquet('../data/raw/fnspid_news_filtered.parquet')

# Apply any necessary transformations
# Rename 'Stock_symbol' to 'ticker' for consistency if needed
if 'Stock_symbol' in fnspid_spark_df.columns:
    fnspid_spark_df = fnspid_spark_df.withColumnRenamed('Stock_symbol', 'ticker')

# Convert date column to proper DateType if it exists and is string
if 'date' in fnspid_spark_df.columns:
    # Check if date needs conversion
    date_type = dict(fnspid_spark_df.dtypes)['date']
    if date_type == 'string':
        fnspid_spark_df = fnspid_spark_df.withColumn("date", F.to_date(F.col("date")))

# Filter out null records if necessary
fnspid_spark_df = fnspid_spark_df.filter(F.col("ticker").isNotNull())

# Order by date and ticker
fnspid_spark_df = fnspid_spark_df.orderBy("date", "ticker")

# Cache for faster operations
fnspid_spark_df.cache()

# Display results
print("=" * 60)
print("FNSPID DATA LOADED WITH PYSPARK")
print("=" * 60)
print(f"PySpark DataFrame loaded: {fnspid_spark_df.count():,} rows")
print("\nSchema:")
fnspid_spark_df.printSchema()
print("\nUnique tickers:")
fnspid_spark_df.select("ticker").distinct().orderBy("ticker").show(10)
print("\nSample data:")
fnspid_spark_df.show(5, truncate=False)

## Data Exploration

Let's use Cisco (`CSCO`) stock to illustrate.