# Imports

In [None]:
# General Tools/Utilities
from pathlib import Path
import os
import polars as pl
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta, date
from typing import List, Tuple, Dict
import buckaroo
import gc
import talib
# Custom Imports
from data_loader import DataLoader
from technical_indicators import add_all_indicators, filter_buy_signals, get_signals_summary
# Warnings
import warnings
warnings.filterwarnings('ignore')


# Data

In [28]:
loader = DataLoader()

query = """
SELECT DISTINCT
    trade_date as date,
    clean_symbol as symbol,
    -- open_price,
    daily_range_high as high,
    daily_range_low as low,
    closing_price as close,
    closing_price + price_change as open,
    volume
FROM
	old_trade_data TD
    LEFT JOIN  jse_database.instruments I on (TD.CLEAN_symbol = I.symbol AND TD.CURRENCY = I.CURRENCY)
WHERE
    trade_date BETWEEN :start_date AND :end_date
    AND I.TYPE = 'ORDINARY'
    AND TD.CURRENCY = 'JMD'
ORDER BY
	trade_date ASC,
	clean_symbol ASC
"""

# Load all stocks data
df = loader.fetch_data(
    query=query,
    start_date='2017-01-01',
    end_date='2025-03-31'
)

# Indicators

# Modeling

## Testing

# Sentiment Exploration

In [None]:
fileLoc = r"C:\Users\Joshh\Projects\Stocks\Data\sentiment_data_FINAL.parquet"
df2 = pl.read_parquet(fileLoc)
print(df2.head())

PolarsBuckarooWidget(buckaroo_options={'sampled': ['random'], 'auto_clean': ['aggressive', 'conservative'], 'p…

In [73]:
df2.columns

['filename',
 'title',
 'text',
 'publication_date',
 'keywords',
 'authors',
 'title_sentiment_positive',
 'title_sentiment_negative',
 'title_sentiment_neutral',
 'title_sentiment_compound',
 'text_sentiment_positive',
 'text_sentiment_negative',
 'text_sentiment_neutral',
 'text_sentiment_compound',
 'combined_sentiment_positive',
 'combined_sentiment_negative',
 'combined_sentiment_neutral',
 'combined_sentiment_compound',
 'organizations',
 'jse_symbols',
 'entity_specific_sentiments']

In [76]:
# Enhanced stock and sentiment integration script with entity-specific sentiment, market sentiment, and temporal features

import polars as pl
import numpy as np
import datetime as dt
import json
from typing import List, Tuple, Dict, Optional
from data_loader import DataLoader
import os

# --- Configuration ---
STOCK_DATA_PATH = "C:/Users/Joshh/Projects/Stocks/Data/stock_data.parquet"
SENTIMENT_DATA_PATH = "C:/Users/Joshh/Projects/Stocks/Data/sentiment_data_FINAL.parquet"
OUTPUT_DIR = "C:/Users/Joshh/Projects/Stocks/Data/Integrated"

# Market sentiment keywords for overall market indicators
MARKET_KEYWORDS = ['trading', 'market', 'stocks', 'securities', 'exchange', 'index', 'volume', 'shares']

# --- Loaders ---
def load_stock_data(file_path: str) -> pl.DataFrame:
    print("Loading stock data...")
    loader = DataLoader()

    query = """
    SELECT DISTINCT
        trade_date as date,
        clean_symbol as symbol,
        daily_range_high as high_price,
        daily_range_low as low_price,
        closing_price,
        closing_price + price_change as opening_price,
        volume
    FROM old_trade_data TD
    LEFT JOIN jse_database.instruments I ON (TD.CLEAN_symbol = I.symbol AND TD.CURRENCY = I.CURRENCY)
    WHERE trade_date BETWEEN :start_date AND :end_date
      AND I.TYPE = 'ORDINARY'
      AND TD.CURRENCY = 'JMD'
    ORDER BY trade_date ASC, clean_symbol ASC
    """

    df = loader.fetch_data(query=query, start_date='2017-01-01', end_date='2025-03-31')
    df = df.with_columns([
        pl.col("date").dt.date(),
        pl.col("symbol").cast(pl.Utf8),
        pl.col("high_price").cast(pl.Float64),
        pl.col("low_price").cast(pl.Float64),
        pl.col("closing_price").cast(pl.Float64),
        pl.col("opening_price").cast(pl.Float64),
        pl.col("volume").cast(pl.Float64).fill_null(0)
    ])

    print(f"Loaded stock data: {df.height:,} records for {df.select('symbol').n_unique()} symbols")
    return df.sort(["symbol", "date"])

def load_sentiment_data(file_path: str) -> Tuple[pl.DataFrame, pl.DataFrame]:
    print("Loading and processing sentiment data...")
    
    # Load raw sentiment data and inspect the structure
    df = pl.read_parquet(file_path)
    print(f"Schema: {df.schema}")
    
    # Handle the data based on actual structure
    df = df.with_columns([
        pl.col("publication_date").alias("date"),
        pl.col("jse_symbols").cast(pl.List(pl.Utf8))
        # Don't cast entity_specific_sentiments since it's already structured
    ])

    print("Creating market sentiment indicators...")
    # Create market sentiment specifically from articles where JSE is mentioned as a symbol
    market_sentiment = df.filter(
        pl.col("jse_symbols").list.contains("JSE")
    ).group_by("date").agg([
        pl.count("title").alias("market_news_count"),
        pl.col("combined_sentiment_compound").mean().alias("market_sentiment"),
        pl.col("combined_sentiment_compound").std().fill_null(0).alias("market_sentiment_volatility"),
        pl.col("combined_sentiment_positive").mean().alias("market_positive"),
        pl.col("combined_sentiment_negative").mean().alias("market_negative"),
        pl.col("combined_sentiment_neutral").mean().alias("market_neutral"),
        # Volume-weighted sentiment (more articles = stronger signal)
        (pl.col("combined_sentiment_compound") * pl.count("title")).sum().alias("volume_weighted_market_sentiment")
    ]).with_columns([
        # Normalize volume-weighted sentiment
        (pl.col("volume_weighted_market_sentiment") / pl.col("market_news_count")).alias("volume_weighted_market_sentiment")
    ])

    print("Processing entity-specific sentiments...")
    # Explode by symbols and process entity-specific sentiments (exclude JSE for individual stock analysis)
    df_exploded = df.select([
        "date", "title", "text", "authors",
        "title_sentiment_compound", "text_sentiment_compound", "combined_sentiment_compound",
        'combined_sentiment_positive', 'combined_sentiment_negative', 'combined_sentiment_neutral',
        "entity_specific_sentiments", "jse_symbols"
    ]).explode("jse_symbols").filter(
        pl.col("jse_symbols").is_not_null() & 
        (pl.col("jse_symbols") != "") & 
        (pl.col("jse_symbols") != "JSE")  # Exclude JSE from individual stock sentiment
    )

    # For now, let's use the combined sentiment as entity sentiment since the struct parsing is complex
    # We'll add entity-specific features in the next iteration
    df_with_entity = df_exploded.with_columns([
        # Use combined sentiment as proxy for entity sentiment for now
        pl.col("combined_sentiment_positive").alias("entity_positive"),
        pl.col("combined_sentiment_negative").alias("entity_negative"),
        pl.col("combined_sentiment_neutral").alias("entity_neutral"),
        pl.col("combined_sentiment_compound").alias("entity_compound")
    ])

    print("Aggregating company-specific sentiment features...")
    # Aggregate by date and symbol with enhanced features
    company_sentiment = df_with_entity.group_by(["date", "jse_symbols"]).agg([
        # Basic counts and averages
        pl.count("title").alias("article_count"),
        pl.col("combined_sentiment_compound").mean().alias("avg_sentiment"),
        pl.col("entity_compound").mean().alias("entity_specific_sentiment"),
        
        # Sentiment distributions
        pl.col("combined_sentiment_positive").mean().alias("avg_positive"),
        pl.col("combined_sentiment_negative").mean().alias("avg_negative"),
        pl.col("combined_sentiment_neutral").mean().alias("avg_neutral"),
        pl.col("entity_positive").mean().alias("entity_avg_positive"),
        pl.col("entity_negative").mean().alias("entity_avg_negative"),
        pl.col("entity_neutral").mean().alias("entity_avg_neutral"),
        
        # Sentiment variability and extremes
        pl.col("combined_sentiment_compound").std().fill_null(0).alias("sentiment_volatility"),
        pl.col("entity_compound").std().fill_null(0).alias("entity_sentiment_volatility"),
        pl.col("combined_sentiment_compound").min().alias("min_sentiment"),
        pl.col("combined_sentiment_compound").max().alias("max_sentiment"),
        pl.col("entity_compound").min().alias("entity_min_sentiment"),
        pl.col("entity_compound").max().alias("entity_max_sentiment"),
        
        # Title vs text sentiment divergence
        (pl.col("title_sentiment_compound") - pl.col("text_sentiment_compound")).mean().alias("headline_bias"),
        (pl.col("title_sentiment_compound") - pl.col("text_sentiment_compound")).abs().mean().alias("headline_text_divergence"),
        
        # Entity vs general sentiment divergence
        (pl.col("entity_compound") - pl.col("combined_sentiment_compound")).mean().alias("entity_general_divergence"),
        
        # Sentiment concentration (how consistent sentiment is)
        pl.col("combined_sentiment_compound").map_elements(
            lambda x: 1.0 - (len(set(x.to_list())) / len(x.to_list())) if len(x) > 1 else 1.0,
            return_dtype=pl.Float64
        ).alias("sentiment_concentration"),
        
        # Volume-weighted sentiment for this company
        (pl.col("combined_sentiment_compound") * pl.count("title")).sum().alias("volume_weighted_sentiment")
    ]).with_columns([
        # Normalize volume-weighted sentiment
        (pl.col("volume_weighted_sentiment") / pl.col("article_count")).alias("volume_weighted_sentiment")
    ]).sort(["jse_symbols", "date"])

    # Add temporal sentiment features
    print("Calculating temporal sentiment features...")
    company_sentiment = company_sentiment.with_columns([
        # Sentiment momentum (rate of change)
        pl.col("avg_sentiment").diff().over("jse_symbols").alias("sentiment_momentum"),
        pl.col("entity_specific_sentiment").diff().over("jse_symbols").alias("entity_sentiment_momentum"),
        
        # Rolling sentiment averages
        pl.col("avg_sentiment").rolling_mean(3).over("jse_symbols").alias("sentiment_3d_ma"),
        pl.col("avg_sentiment").rolling_mean(7).over("jse_symbols").alias("sentiment_7d_ma"),
        pl.col("entity_specific_sentiment").rolling_mean(3).over("jse_symbols").alias("entity_sentiment_3d_ma"),
        pl.col("entity_specific_sentiment").rolling_mean(7).over("jse_symbols").alias("entity_sentiment_7d_ma"),
        
        # Rolling sentiment volatility
        pl.col("avg_sentiment").rolling_std(7).over("jse_symbols").alias("sentiment_7d_volatility"),
        pl.col("entity_specific_sentiment").rolling_std(7).over("jse_symbols").alias("entity_sentiment_7d_volatility"),
        
        # Article volume momentum
        pl.col("article_count").diff().over("jse_symbols").alias("news_volume_momentum"),
        pl.col("article_count").rolling_mean(7).over("jse_symbols").alias("news_volume_7d_ma")
    ])

    return company_sentiment.rename({"jse_symbols": "symbol"}), market_sentiment

# --- Technical Indicators ---
def calculate_vwap(df: pl.DataFrame, symbol_col: str = "symbol") -> pl.DataFrame:
    print("Calculating daily VWAP...")
    return df.with_columns([
        ((pl.col("high_price") + pl.col("low_price") + pl.col("closing_price")) / 3 * pl.col("volume")).sum().over([symbol_col, "date"]) / 
        pl.col("volume").sum().over([symbol_col, "date"]).alias("vwap")
    ])

def calculate_technical_indicators(df: pl.DataFrame, symbol_col: str = "symbol") -> pl.DataFrame:
    print("Calculating technical indicators and signals...")
    df = df.sort([symbol_col, "date"])
    
    # Step 1: Calculate basic price movements and EMAs
    df = df.with_columns([
        pl.col("closing_price").pct_change().over(symbol_col).fill_null(0).alias("returns"),
        pl.col("closing_price").diff().over(symbol_col).alias("price_diff"),
        pl.col("closing_price").ewm_mean(span=12).over(symbol_col).alias("ema12"),
        pl.col("closing_price").ewm_mean(span=26).over(symbol_col).alias("ema26"),
    ])
    
    # Step 2: Calculate RSI components
    df = df.with_columns([
        pl.when(pl.col("price_diff") > 0)
        .then(pl.col("price_diff"))
        .otherwise(0)
        .rolling_mean(14).over(symbol_col)
        .alias("avg_gain"),
        
        pl.when(pl.col("price_diff") < 0)
        .then(-pl.col("price_diff"))
        .otherwise(0)
        .rolling_mean(14).over(symbol_col)
        .alias("avg_loss"),
    ])
    
    # Step 3: Calculate derived indicators
    df = df.with_columns([
        # Basic returns and ranges
        ((pl.col("closing_price") - pl.col("opening_price")) / 
         pl.when(pl.col("opening_price") == 0).then(1).otherwise(pl.col("opening_price"))
        ).alias("intraday_return"),
        
        ((pl.col("high_price") - pl.col("low_price")) / 
         pl.when(pl.col("low_price") == 0).then(1).otherwise(pl.col("low_price"))
        ).alias("daily_range"),
        
        # Moving averages
        pl.col("closing_price").rolling_mean(20).over(symbol_col).alias("sma_20"),
        pl.col("closing_price").rolling_mean(50).over(symbol_col).alias("sma_50"),
        pl.col("volume").rolling_mean(10).over(symbol_col).alias("volume_ma10"),
        pl.col("volume").rolling_mean(20).over(symbol_col).alias("volume_ma20"),
        
        # MACD
        (pl.col("ema12") - pl.col("ema26")).alias("macd"),
        
        # RSI
        pl.when(pl.col("avg_loss") == 0)
        .then(100)
        .otherwise(100 - (100 / (1 + pl.col("avg_gain") / pl.col("avg_loss"))))
        .fill_null(50)
        .alias("rsi_14"),
    ])
    
    # Step 4: Calculate MACD signal line and dependent indicators
    df = df.with_columns([
        pl.col("macd").ewm_mean(span=9).over(symbol_col).alias("signal_line"),
        pl.col("returns").rolling_std(10).over(symbol_col).fill_null(0).alias("volatility_10d"),
        pl.col("returns").rolling_std(20).over(symbol_col).fill_null(0).alias("volatility_20d"),
    ])
    
    # Step 5: Calculate volume ratio and MACD histogram
    df = df.with_columns([
        (pl.col("volume") / 
         pl.when(pl.col("volume_ma20") == 0).then(1).otherwise(pl.col("volume_ma20"))
        ).alias("volume_ratio"),
        (pl.col("macd") - pl.col("signal_line")).alias("macd_histogram")
    ])

    # Step 6: Calculate VWAP
    df = calculate_vwap(df, symbol_col)

    # Step 7: Generate trading signals
    df = df.with_columns([
        (pl.col("sma_20") > pl.col("sma_50")).alias("ma_buy_signal"),
        (pl.col("sma_20") < pl.col("sma_50")).alias("ma_sell_signal"),
        (pl.col("macd") > pl.col("signal_line")).alias("macd_buy_signal"),
        (pl.col("macd") < pl.col("signal_line")).alias("macd_sell_signal"),
        (pl.col("rsi_14") < 30).alias("rsi_buy_signal"),
        (pl.col("rsi_14") > 70).alias("rsi_sell_signal"),
    ])
    
    # Step 8: Combined signals
    df = df.with_columns([
        (pl.col("ma_buy_signal") & pl.col("macd_buy_signal")).alias("combined_buy_signal"),
        (pl.col("ma_sell_signal") & pl.col("macd_sell_signal")).alias("combined_sell_signal")
    ])
    
    # Clean up intermediate columns
    df = df.drop(["price_diff", "avg_gain", "avg_loss", "ema12", "ema26"])

    return df

# --- Integration ---
def integrate_data(stock_df: pl.DataFrame, sentiment_df: pl.DataFrame, market_sentiment_df: pl.DataFrame) -> pl.DataFrame:
    print("Calculating stock technical indicators...")
    stock_with_features = calculate_technical_indicators(stock_df)
    
    print("Merging stock and company sentiment data...")
    # Join company-specific sentiment
    merged_df = stock_with_features.join(sentiment_df, on=["date", "symbol"], how="left")

    # Fill missing sentiment values with neutral defaults
    sentiment_fill = {
        "article_count": 0, "avg_sentiment": 0.0, "entity_specific_sentiment": 0.0,
        "sentiment_volatility": 0.0, "entity_sentiment_volatility": 0.0,
        "min_sentiment": 0.0, "max_sentiment": 0.0, 
        "entity_min_sentiment": 0.0, "entity_max_sentiment": 0.0,
        "avg_positive": 0.33, "avg_negative": 0.33, "avg_neutral": 0.34,
        "entity_avg_positive": 0.33, "entity_avg_negative": 0.33, "entity_avg_neutral": 0.34,
        "headline_bias": 0.0, "headline_text_divergence": 0.0, "entity_general_divergence": 0.0,
        "sentiment_concentration": 0.0, "volume_weighted_sentiment": 0.0,
        "sentiment_momentum": 0.0, "entity_sentiment_momentum": 0.0,
        "sentiment_3d_ma": 0.0, "sentiment_7d_ma": 0.0,
        "entity_sentiment_3d_ma": 0.0, "entity_sentiment_7d_ma": 0.0,
        "sentiment_7d_volatility": 0.0, "entity_sentiment_7d_volatility": 0.0,
        "news_volume_momentum": 0.0, "news_volume_7d_ma": 0.0
    }
    
    for col, val in sentiment_fill.items():
        if col in merged_df.columns:
            merged_df = merged_df.with_columns(pl.col(col).fill_null(val))

    print("Adding market sentiment indicators...")
    # Join market sentiment
    merged_df = merged_df.join(market_sentiment_df, on="date", how="left")
    
    # Fill missing market sentiment
    market_fill = {
        "market_news_count": 0, "market_sentiment": 0.0, "market_sentiment_volatility": 0.0,
        "market_positive": 0.33, "market_negative": 0.33, "market_neutral": 0.34,
        "volume_weighted_market_sentiment": 0.0
    }
    
    for col, val in market_fill.items():
        if col in merged_df.columns:
            merged_df = merged_df.with_columns(pl.col(col).fill_null(val))

    print("Calculating advanced interaction features...")
    merged_df = merged_df.with_columns([
        # Sentiment-returns interactions
        (pl.col("avg_sentiment") * pl.col("returns")).alias("sentiment_returns_interaction"),
        (pl.col("entity_specific_sentiment") * pl.col("returns")).alias("entity_sentiment_returns_interaction"),
        (pl.col("market_sentiment") * pl.col("returns")).alias("market_sentiment_returns_interaction"),
        
        # Sentiment-volume interactions
        (pl.col("avg_sentiment") * pl.col("volume_ratio")).alias("sentiment_volume_interaction"),
        (pl.col("entity_specific_sentiment") * pl.col("volume_ratio")).alias("entity_sentiment_volume_interaction"),
        (pl.col("market_sentiment") * pl.col("volume_ratio")).alias("market_sentiment_volume_interaction"),
        
        # Sentiment-volatility interactions
        (pl.col("sentiment_volatility") * pl.col("volatility_10d")).alias("sentiment_volatility_interaction"),
        (pl.col("entity_sentiment_volatility") * pl.col("volatility_10d")).alias("entity_sentiment_volatility_interaction"),
        (pl.col("market_sentiment_volatility") * pl.col("volatility_10d")).alias("market_sentiment_volatility_interaction"),
        
        # Sentiment momentum interactions
        (pl.col("sentiment_momentum") * pl.col("returns")).alias("sentiment_momentum_returns_interaction"),
        (pl.col("entity_sentiment_momentum") * pl.col("returns")).alias("entity_sentiment_momentum_returns_interaction"),
        
        # News volume effects
        (pl.col("article_count") * pl.col("volume_ratio")).alias("news_volume_trading_interaction"),
        (pl.col("market_news_count") * pl.col("volume_ratio")).alias("market_news_volume_interaction"),
        
        # Relative sentiment vs market
        (pl.col("avg_sentiment") - pl.col("market_sentiment")).alias("relative_sentiment_vs_market"),
        (pl.col("entity_specific_sentiment") - pl.col("market_sentiment")).alias("entity_relative_sentiment_vs_market"),
        
        # Sentiment signal strength
        (pl.col("article_count") * pl.col("avg_sentiment").abs()).alias("sentiment_signal_strength"),
        (pl.col("article_count") * pl.col("entity_specific_sentiment").abs()).alias("entity_sentiment_signal_strength"),
        
        # Combined sentiment score (weighted average of general and entity-specific)
        (0.6 * pl.col("entity_specific_sentiment") + 0.4 * pl.col("avg_sentiment")).alias("combined_sentiment_score")
    ])

    return merged_df.sort(["symbol", "date"])

# --- Main Execution ---
if __name__ == "__main__":
    print("=== Enhanced Stock-Sentiment Integration Pipeline ===")
    
    # Load data
    stock_df = load_stock_data(STOCK_DATA_PATH)
    sentiment_df, market_sentiment_df = load_sentiment_data(SENTIMENT_DATA_PATH)
    
    # Integrate all data
    integrated_df = integrate_data(stock_df, sentiment_df, market_sentiment_df)

    print("\n--- Final Data Schema ---")
    for col, dtype in integrated_df.schema.items():
        print(f"- {col}: {dtype}")

    print(f"\nFinal dataset shape: {integrated_df.shape}")
    print(f"Date range: {integrated_df.select('date').min().item()} to {integrated_df.select('date').max().item()}")
    print(f"Unique symbols: {integrated_df.select('symbol').n_unique()}")
    
    # Sample output
    print("\n--- Sample Output ---")
    sample_cols = ['symbol', 'date', 'closing_price', 'returns', 'avg_sentiment', 'entity_specific_sentiment', 
                   'market_sentiment', 'sentiment_signal_strength', 'combined_sentiment_score']
    print(integrated_df.select(sample_cols).head(10))

    # Save results
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    output_file = os.path.join(OUTPUT_DIR, "enhanced_integrated_stock_sentiment_features.parquet")
    integrated_df.write_parquet(output_file)
    
    # Save market sentiment separately for analysis
    market_output_file = os.path.join(OUTPUT_DIR, "market_sentiment_indicators.parquet")
    market_sentiment_df.write_parquet(market_output_file)
    
    print(f"\nData successfully saved to:")
    print(f"- Main dataset: {output_file}")
    print(f"- Market sentiment: {market_output_file}")
    print("\n=== Pipeline Complete ===")

=== Enhanced Stock-Sentiment Integration Pipeline ===
Loading stock data...
Loaded stock data: 156,385 records for 99 symbols
Loading and processing sentiment data...
Schema: Schema({'filename': String, 'title': String, 'text': String, 'publication_date': Date, 'keywords': List(String), 'authors': List(String), 'title_sentiment_positive': Float64, 'title_sentiment_negative': Float64, 'title_sentiment_neutral': Float64, 'title_sentiment_compound': Float64, 'text_sentiment_positive': Float64, 'text_sentiment_negative': Float64, 'text_sentiment_neutral': Float64, 'text_sentiment_compound': Float64, 'combined_sentiment_positive': Float64, 'combined_sentiment_negative': Float64, 'combined_sentiment_neutral': Float64, 'combined_sentiment_compound': Float64, 'organizations': List(String), 'jse_symbols': List(String), 'entity_specific_sentiments': Struct({'the Jamaica Stock Exchange': Struct({'positive': Float64, 'negative': Float64, 'neutral': Float64, 'compound': Float64}), 'the Investor’s C