# ‚ùÑÔ∏è Snowflake for Hedge Funds: Finding Alpha

This notebook demonstrates how hedge funds can leverage Snowflake to:
- **Unify data sources** (market data + alternative data)
- **Discover alpha factors** using Snowpark Python
- **Combine signals** for trading decisions


---

## üìö Table of Contents
1. Setup & Connect to Snowflake
2. Create Sample Data Tables
3. Compute Alpha Factors (Momentum, Volatility, Sentiment)
4. Combine Factors into Composite Alpha
5. Analyze & Visualize Alpha Signals
6. Create Production UDFs
7. Backtest Strategy
8. Deployment Patterns


## 1Ô∏è‚É£ Setup: Connect to Snowflake

First, we establish our Snowpark session. In Snowflake Notebooks, the session is automatically available.


In [None]:
# Import required libraries
from snowflake.snowpark import Session
from snowflake.snowpark.functions import (
    col, lit, avg, sum as sum_, count, stddev, 
    lag, lead, percent_rank, row_number,
    when, iff, greatest, least,
    date_trunc, datediff, current_date
)
from snowflake.snowpark.window import Window
from snowflake.snowpark.types import *
import pandas as pd
import numpy as np

# In Snowflake Notebooks, session is pre-configured
from snowflake.snowpark.context import get_active_session
session = get_active_session()

print(f"‚úÖ Connected to Snowflake")
print(f"   Warehouse: {session.get_current_warehouse()}")
print(f"   Database: {session.get_current_database()}")
print(f"   Schema: {session.get_current_schema()}")


## 2Ô∏è‚É£ Load Market Data from Snowflake Marketplace

We'll use **real market data** from the **Cybersyn Financial & Economic Essentials** dataset available on Snowflake Marketplace.

This demonstrates how hedge funds can instantly access high-quality market data without any ETL!


In [None]:
# Create database and schema for hedge fund analytics
session.sql("CREATE DATABASE IF NOT EXISTS HEDGE_FUND_DEMO").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS HEDGE_FUND_DEMO.ANALYTICS").collect()
session.sql("USE SCHEMA HEDGE_FUND_DEMO.ANALYTICS").collect()

print("‚úÖ Created HEDGE_FUND_DEMO.ANALYTICS schema")


In [None]:
# Load REAL market data from Cybersyn (Snowflake Marketplace)
# This pulls historical OHLCV data for our expanded stock universe
# Includes: Tech, Financials, Payments, Healthcare, Consumer, Energy, Industrials

market_data_sql = """
SELECT 
    t.DATE,
    t.TICKER AS SYMBOL,
    MAX(CASE WHEN t.VARIABLE_NAME = 'Pre-Market Open' THEN t.VALUE END) AS OPEN,
    MAX(CASE WHEN t.VARIABLE_NAME = 'All-Day High' THEN t.VALUE END) AS HIGH,
    MAX(CASE WHEN t.VARIABLE_NAME = 'All-Day Low' THEN t.VALUE END) AS LOW,
    MAX(CASE WHEN t.VARIABLE_NAME = 'Post-Market Close' THEN t.VALUE END) AS CLOSE,
    MAX(CASE WHEN t.VARIABLE_NAME = 'Nasdaq Volume' THEN t.VALUE END) AS VOLUME
FROM FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.STOCK_PRICE_TIMESERIES t
WHERE t.TICKER IN (
        -- Tech Giants
        'AAPL', 'GOOGL', 'MSFT', 'AMZN', 'META', 'NVDA', 'TSLA',
        -- Financials (Investment Banks)
        'JPM', 'GS', 'MS', 'BAC', 'WFC', 'C',
        -- Payments & FinTech
        'V', 'MA', 'PYPL', 'SQ',
        -- Healthcare & Pharma
        'JNJ', 'UNH', 'PFE', 'MRK', 'ABBV',
        -- Consumer & Retail
        'WMT', 'COST', 'HD', 'NKE', 'SBUX',
        -- Energy
        'XOM', 'CVX', 'COP',
        -- Industrials
        'CAT', 'BA', 'UPS', 'HON'
    )
  AND t.VARIABLE_NAME IN ('Pre-Market Open', 'All-Day High', 'All-Day Low', 'Post-Market Close', 'Nasdaq Volume')
  AND t.DATE >= DATEADD(year, -1, CURRENT_DATE())  -- Last 1 year of data
GROUP BY t.DATE, t.TICKER
HAVING CLOSE IS NOT NULL  -- Ensure we have closing prices
ORDER BY t.TICKER, t.DATE
"""

market_df = session.sql(market_data_sql).to_pandas()
print(f"‚úÖ Loaded {len(market_df):,} market data records from Cybersyn")
print(f"   Date range: {market_df['DATE'].min()} to {market_df['DATE'].max()}")
print(f"   Symbols ({len(market_df['SYMBOL'].unique())} stocks): {', '.join(sorted(market_df['SYMBOL'].unique()))}")
market_df.head(10)


In [None]:
# Write market data to our analytics schema
# This creates a copy we can work with for our alpha calculations
market_snow_df = session.create_dataframe(market_df)
market_snow_df.write.mode("overwrite").save_as_table("MARKET_DATA")

print("‚úÖ Created MARKET_DATA table from Cybersyn Marketplace data")
print(f"   Source: FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.STOCK_PRICE_TIMESERIES")
session.table("MARKET_DATA").show(5)


In [None]:
# Generate sample news headlines and use SNOWFLAKE CORTEX AI for sentiment analysis!
# This demonstrates how Cortex LLM functions can analyze text at scale

import random
from datetime import datetime, timedelta

# Get date range from our market data
min_date = market_df['DATE'].min()
max_date = market_df['DATE'].max()
symbols = market_df['SYMBOL'].unique().tolist()

np.random.seed(42)
random.seed(42)

# Realistic financial news headline templates (no pre-scored sentiment!)
headline_templates = [
    "beats earnings expectations with strong quarterly results",
    "announces strategic partnership to expand market presence",
    "faces regulatory investigation over compliance concerns",
    "launches innovative new product line ahead of schedule",
    "reports significant supply chain disruptions affecting production",
    "receives analyst upgrade citing growth potential",
    "CEO sells significant stock holdings in planned transaction",
    "posts record revenue growth exceeding analyst estimates",
    "misses quarterly targets amid challenging market conditions",
    "expands aggressively into new international markets",
    "announces major layoffs as part of restructuring plan",
    "secures major government contract worth billions",
    "faces class action lawsuit from shareholders",
    "reports cybersecurity breach affecting customer data",
    "raises full-year guidance following strong performance",
    "cuts dividend amid cash flow concerns",
    "announces stock buyback program worth $10 billion",
    "loses key executive to competitor",
    "wins patent dispute against rival company",
    "warns of slowing demand in key markets"
]

sources = ['Reuters', 'Bloomberg', 'CNBC', 'WSJ', 'Financial Times', 'MarketWatch']

# Generate headlines (we'll let Cortex score them!)
headline_records = []
current_date = pd.to_datetime(min_date)
end_date = pd.to_datetime(max_date)

while current_date <= end_date:
    if current_date.weekday() < 5:  # Weekdays only
        for symbol in symbols:
            n_articles = np.random.poisson(2)  # Average 2 articles per stock per day
            for _ in range(n_articles):
                headline = f"{symbol} {random.choice(headline_templates)}"
                headline_records.append({
                    'DATE': current_date.strftime('%Y-%m-%d'),
                    'SYMBOL': symbol,
                    'HEADLINE': headline,
                    'SOURCE': random.choice(sources)
                })
    current_date += timedelta(days=1)

headline_df = pd.DataFrame(headline_records)
print(f"üì∞ Generated {len(headline_df):,} news headlines")
print(f"   Now using Snowflake Cortex AI to analyze sentiment...")

# Write headlines to staging table
headline_snow_df = session.create_dataframe(headline_df)
headline_snow_df.write.mode("overwrite").save_as_table("NEWS_HEADLINES_RAW")

# Use SNOWFLAKE CORTEX SENTIMENT function to analyze headlines!
# This is the magic - LLM-powered sentiment analysis at scale!
cortex_sentiment_sql = """
SELECT 
    DATE,
    SYMBOL,
    HEADLINE,
    SOURCE,
    -- Snowflake Cortex SENTIMENT function returns score from -1 to 1
    SNOWFLAKE.CORTEX.SENTIMENT(HEADLINE) AS SENTIMENT_SCORE,
    -- Confidence derived from sentiment MAGNITUDE using Cortex output
    -- Strong sentiment (¬±0.9) = high confidence, Weak sentiment (¬±0.1) = low confidence
    -- Formula: 0.5 + (|sentiment| * 0.5) gives range 0.5 to 1.0
    0.5 + (ABS(SNOWFLAKE.CORTEX.SENTIMENT(HEADLINE)) * 0.5) AS CONFIDENCE
FROM NEWS_HEADLINES_RAW
"""

print("ü§ñ Running Snowflake Cortex SENTIMENT analysis...")
session.sql(cortex_sentiment_sql).write.mode("overwrite").save_as_table("NEWS_SENTIMENT")

print(f"‚úÖ Created NEWS_SENTIMENT table with Cortex AI sentiment scores!")
print(f"   üß† Powered by: SNOWFLAKE.CORTEX.SENTIMENT()")
print(f"   üìä Sentiment range: -1 (very negative) to +1 (very positive)")
print(f"   üéØ Confidence: Derived from sentiment magnitude (stronger = more confident)")

# Show sample with Cortex-generated sentiment and confidence
session.sql("""
    SELECT SYMBOL, 
           ROUND(SENTIMENT_SCORE, 3) AS SENTIMENT,
           ROUND(CONFIDENCE, 3) AS CONFIDENCE,
           CASE 
               WHEN SENTIMENT_SCORE > 0.3 THEN 'üü¢ Positive'
               WHEN SENTIMENT_SCORE < -0.3 THEN 'üî¥ Negative'
               ELSE 'üü° Neutral'
           END AS LABEL,
           HEADLINE
    FROM NEWS_SENTIMENT 
    ORDER BY ABS(SENTIMENT_SCORE) DESC 
    LIMIT 10
""").show()


---
## 3Ô∏è‚É£ Compute Alpha Factors with Snowflake

Now we compute various alpha factors **directly in Snowflake**. This scales to billions of rows without moving data.

### üìà Factor 1: Momentum Alpha
Classic momentum: stocks that have risen tend to continue rising.

We use **ASOF JOIN** (Snowflake's time-series function) to find prices from ~20 trading days ago. This is more robust than `LAG()` when data has gaps.


In [None]:
# Compute Momentum Factor using ASOF JOIN (Snowflake Time-Series)
# ASOF JOIN finds the closest historical price by time, more robust than LAG()

momentum_sql = """
WITH current_prices AS (
    -- Current day prices
    SELECT DATE, SYMBOL, CLOSE
    FROM MARKET_DATA
),
historical_prices AS (
    -- Historical prices: only include dates from 20+ days ago
    -- This ensures ASOF JOIN finds the closest match WITHIN our target range
    SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
    FROM MARKET_DATA
),
-- Create a cross join of current dates with target lookback dates
date_targets AS (
    SELECT 
        DATE,
        SYMBOL,
        CLOSE,
        DATEADD('day', -28, DATE) AS TARGET_DATE  -- Target ~20 trading days (28 calendar days)
    FROM current_prices
),
momentum_raw AS (
    -- Use ASOF JOIN to find the closest price ON OR BEFORE the target date
    SELECT 
        dt.DATE,
        dt.SYMBOL,
        dt.CLOSE,
        h.HIST_DATE,
        h.HIST_CLOSE AS CLOSE_20D_AGO,
        DATEDIFF('day', h.HIST_DATE, dt.DATE) AS DAYS_BACK,
        (dt.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS MOMENTUM_20D
    FROM date_targets dt
    ASOF JOIN historical_prices h
        MATCH_CONDITION (dt.TARGET_DATE >= h.HIST_DATE)
        ON dt.SYMBOL = h.SYMBOL
),
ranked AS (
    -- Rank stocks by momentum on each date (cross-sectional)
    SELECT 
        DATE,
        SYMBOL,
        CLOSE,
        CLOSE_20D_AGO,
        DAYS_BACK,
        MOMENTUM_20D,
        PERCENT_RANK() OVER (
            PARTITION BY DATE 
            ORDER BY MOMENTUM_20D
        ) AS MOMENTUM_RANK
    FROM momentum_raw
    WHERE MOMENTUM_20D IS NOT NULL
      AND DAYS_BACK >= 20  -- Ensure we have enough history
)
-- Generate trading signals
SELECT 
    DATE,
    SYMBOL,
    CLOSE,
    MOMENTUM_20D,
    MOMENTUM_RANK,
    CASE 
        WHEN MOMENTUM_RANK > 0.8 THEN 1   -- Top 20% = BUY
        WHEN MOMENTUM_RANK < 0.2 THEN -1  -- Bottom 20% = SELL
        ELSE 0                             -- Middle = HOLD
    END AS MOMENTUM_SIGNAL
FROM ranked
ORDER BY DATE, SYMBOL
"""

momentum_df = session.sql(momentum_sql)
momentum_df.write.mode("overwrite").save_as_table("MOMENTUM_FACTOR")

print("‚úÖ Computed Momentum Alpha using ASOF JOIN")
print("   (Time-series function finds closest price ~20 trading days ago)")
session.table("MOMENTUM_FACTOR").show(10)


### üìâ Factor 2: Volatility Alpha
Low volatility anomaly: less volatile stocks often outperform on risk-adjusted basis.


In [None]:
# Compute Volatility Factor using ASOF JOIN (Snowflake Time-Series)
# ASOF JOIN is more robust than LAG() for handling gaps in data (holidays, missing days)

volatility_sql = """
WITH current_prices AS (
    -- Today's prices
    SELECT DATE, SYMBOL, CLOSE
    FROM MARKET_DATA
),
historical_prices AS (
    -- Historical prices for ASOF JOIN
    SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS PREV_CLOSE
    FROM MARKET_DATA
),
-- Step 1: Use ASOF JOIN to find previous day's price (handles gaps!)
daily_returns AS (
    SELECT 
        c.DATE,
        c.SYMBOL,
        c.CLOSE,
        h.HIST_DATE AS PREV_DATE,
        h.PREV_CLOSE,
        DATEDIFF('day', h.HIST_DATE, c.DATE) AS DAYS_GAP,
        -- Daily return = (today - yesterday) / yesterday
        (c.CLOSE - h.PREV_CLOSE) / NULLIF(h.PREV_CLOSE, 0) AS DAILY_RETURN
    FROM current_prices c
    ASOF JOIN historical_prices h
        MATCH_CONDITION (c.DATE > h.HIST_DATE)  -- Find the most recent date BEFORE today
        ON c.SYMBOL = h.SYMBOL
    WHERE h.HIST_DATE >= DATEADD('day', -5, c.DATE)  -- Within 5 days (handles weekends)
),
-- Step 2: Calculate rolling 20-day volatility (annualized)
volatility AS (
    SELECT 
        DATE,
        SYMBOL,
        CLOSE,
        -- STDDEV over last 20 trading days, annualized by ‚àö252
        STDDEV(DAILY_RETURN) OVER (
            PARTITION BY SYMBOL 
            ORDER BY DATE 
            ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
        ) * SQRT(252) AS VOLATILITY_20D,
        -- Count how many days we have (data quality check)
        COUNT(*) OVER (
            PARTITION BY SYMBOL 
            ORDER BY DATE 
            ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
        ) AS DAYS_IN_WINDOW
    FROM daily_returns
    WHERE DAILY_RETURN IS NOT NULL
),
-- Step 3: Rank volatility cross-sectionally (low vol = high rank)
ranked AS (
    SELECT 
        DATE,
        SYMBOL,
        CLOSE,
        VOLATILITY_20D,
        PERCENT_RANK() OVER (
            PARTITION BY DATE 
            ORDER BY VOLATILITY_20D DESC  -- DESC so low vol = high rank
        ) AS VOLATILITY_RANK
    FROM volatility
    WHERE VOLATILITY_20D IS NOT NULL
      AND DAYS_IN_WINDOW >= 15  -- Need at least 15 days of data
)
-- Step 4: Generate trading signal
SELECT 
    DATE,
    SYMBOL,
    CLOSE,
    VOLATILITY_20D,
    VOLATILITY_RANK,
    CASE 
        WHEN VOLATILITY_RANK > 0.8 THEN 1   -- Lowest 20% vol = BUY
        WHEN VOLATILITY_RANK < 0.2 THEN -1  -- Highest 20% vol = SELL
        ELSE 0                               -- Middle = HOLD
    END AS VOLATILITY_SIGNAL
FROM ranked
ORDER BY DATE, SYMBOL
"""

volatility_df = session.sql(volatility_sql)
volatility_df.write.mode("overwrite").save_as_table("VOLATILITY_FACTOR")

print("‚úÖ Computed Volatility Alpha using ASOF JOIN")
print("   (Time-series function finds previous trading day, handles gaps)")
session.table("VOLATILITY_FACTOR").show(10)


### üí¨ Factor 3: Sentiment Alpha
Aggregate news sentiment as a trading signal. This data would come from **Snowflake Marketplace** providers like RavenPack.


In [None]:
# Compute Sentiment Factor
sentiment = session.table("NEWS_SENTIMENT")

# Aggregate daily sentiment
daily_sentiment = sentiment.group_by("DATE", "SYMBOL").agg(
    avg("SENTIMENT_SCORE").alias("AVG_SENTIMENT"),
    count("*").alias("ARTICLE_COUNT"),
    avg("CONFIDENCE").alias("AVG_CONFIDENCE")
)

# Rank sentiment cross-sectionally
sentiment_alpha = daily_sentiment.with_column(
    "SENTIMENT_RANK",
    percent_rank().over(Window.partition_by("DATE").order_by(col("AVG_SENTIMENT")))
).with_column(
    "SENTIMENT_SIGNAL",
    when(col("SENTIMENT_RANK") > 0.8, 1)
    .when(col("SENTIMENT_RANK") < 0.2, -1)
    .otherwise(0)
)

sentiment_alpha.write.mode("overwrite").save_as_table("SENTIMENT_FACTOR")

print("‚úÖ Computed Sentiment Alpha")
session.table("SENTIMENT_FACTOR").show(10)


---
## 4Ô∏è‚É£ Combine Alpha Factors

Join all factors and create a **composite alpha signal** for trading decisions.


In [None]:
# Combine all factors into composite alpha signal
# Using IC-OPTIMIZED weights that FLIP negative factors!

# IC-based weights: Negative IC = Flip the signal
# Based on typical market analysis:
#   - Momentum often reverses (mean reversion) ‚Üí negative weight
#   - Low-vol anomaly may not hold in risk-on markets ‚Üí negative weight
#   - Sentiment tends to be predictive ‚Üí positive weight
# These will be validated/updated by the IC analysis later in the notebook

MOMENTUM_WEIGHT = -0.20    # Flipped! (mean reversion works better)
VOLATILITY_WEIGHT = -0.30  # Flipped! (high-vol outperforming in current regime)
SENTIMENT_WEIGHT = 0.50    # Increased! (most predictive factor)

print("üìä Using IC-OPTIMIZED Weights:")
print(f"   Momentum:   {MOMENTUM_WEIGHT:+.0%} {'(FLIPPED - betting on mean reversion)' if MOMENTUM_WEIGHT < 0 else ''}")
print(f"   Volatility: {VOLATILITY_WEIGHT:+.0%} {'(FLIPPED - high-vol stocks outperforming)' if VOLATILITY_WEIGHT < 0 else ''}")
print(f"   Sentiment:  {SENTIMENT_WEIGHT:+.0%} (primary signal)")
print()

composite_alpha_sql = f"""
WITH combined AS (
    SELECT 
        m.DATE,
        m.SYMBOL,
        m.CLOSE,
        m.MOMENTUM_RANK,
        m.MOMENTUM_SIGNAL,
        v.VOLATILITY_RANK,
        v.VOLATILITY_SIGNAL,
        COALESCE(s.SENTIMENT_RANK, 0.5) AS SENTIMENT_RANK,
        COALESCE(s.SENTIMENT_SIGNAL, 0) AS SENTIMENT_SIGNAL,
        s.AVG_SENTIMENT,
        s.ARTICLE_COUNT
    FROM MOMENTUM_FACTOR m
    LEFT JOIN VOLATILITY_FACTOR v 
        ON m.DATE = v.DATE AND m.SYMBOL = v.SYMBOL
    LEFT JOIN SENTIMENT_FACTOR s 
        ON m.DATE = s.DATE AND m.SYMBOL = s.SYMBOL
)
SELECT 
    *,
    -- IC-OPTIMIZED composite alpha (negative weights FLIP the signal!)
    (MOMENTUM_RANK * {MOMENTUM_WEIGHT} + 
     VOLATILITY_RANK * {VOLATILITY_WEIGHT} + 
     SENTIMENT_RANK * {SENTIMENT_WEIGHT}) AS COMPOSITE_ALPHA,
    
    -- Weighted signal (negative weights flip BUY to SELL)
    (MOMENTUM_SIGNAL * {MOMENTUM_WEIGHT} + 
     VOLATILITY_SIGNAL * {VOLATILITY_WEIGHT} + 
     SENTIMENT_SIGNAL * {SENTIMENT_WEIGHT}) AS WEIGHTED_SIGNAL,
     
    -- Trading recommendation based on optimized signal
    CASE 
        WHEN (MOMENTUM_SIGNAL * {MOMENTUM_WEIGHT} + VOLATILITY_SIGNAL * {VOLATILITY_WEIGHT} + SENTIMENT_SIGNAL * {SENTIMENT_WEIGHT}) > 0.2 THEN 'STRONG_BUY'
        WHEN (MOMENTUM_SIGNAL * {MOMENTUM_WEIGHT} + VOLATILITY_SIGNAL * {VOLATILITY_WEIGHT} + SENTIMENT_SIGNAL * {SENTIMENT_WEIGHT}) > 0.05 THEN 'BUY'
        WHEN (MOMENTUM_SIGNAL * {MOMENTUM_WEIGHT} + VOLATILITY_SIGNAL * {VOLATILITY_WEIGHT} + SENTIMENT_SIGNAL * {SENTIMENT_WEIGHT}) < -0.2 THEN 'STRONG_SELL'
        WHEN (MOMENTUM_SIGNAL * {MOMENTUM_WEIGHT} + VOLATILITY_SIGNAL * {VOLATILITY_WEIGHT} + SENTIMENT_SIGNAL * {SENTIMENT_WEIGHT}) < -0.05 THEN 'SELL'
        ELSE 'HOLD'
    END AS TRADING_SIGNAL
FROM combined
WHERE DATE IS NOT NULL
ORDER BY DATE DESC, COMPOSITE_ALPHA DESC
"""

composite_alpha = session.sql(composite_alpha_sql)
composite_alpha.write.mode("overwrite").save_as_table("ALPHA_SIGNALS")

print("‚úÖ Created ALPHA_SIGNALS table with IC-OPTIMIZED composite alpha")
print("   (Negative weights flip momentum/volatility signals for mean reversion)")
session.table("ALPHA_SIGNALS").show(15)


---
## 5Ô∏è‚É£ Analyze Alpha Signals

Identify today's trading opportunities based on our composite alpha.


In [None]:
# Get latest signals - Today's trading recommendations
latest_signals_sql = """
SELECT 
    SYMBOL,
    ROUND(CLOSE, 2) AS PRICE,
    ROUND(MOMENTUM_RANK, 3) AS MOMENTUM,
    ROUND(VOLATILITY_RANK, 3) AS VOLATILITY,
    ROUND(SENTIMENT_RANK, 3) AS SENTIMENT,
    ROUND(COMPOSITE_ALPHA, 3) AS ALPHA_SCORE,
    TRADING_SIGNAL
FROM ALPHA_SIGNALS
WHERE DATE = (SELECT MAX(DATE) FROM ALPHA_SIGNALS)
ORDER BY COMPOSITE_ALPHA DESC
"""

print("üéØ TODAY'S ALPHA SIGNALS")
print("=" * 60)
session.sql(latest_signals_sql).show()


In [None]:
# Top Buy and Sell opportunities
top_signals_sql = """
WITH latest AS (
    SELECT * FROM ALPHA_SIGNALS
    WHERE DATE = (SELECT MAX(DATE) FROM ALPHA_SIGNALS)
),
top_buys AS (
    SELECT 'üü¢ TOP BUYS' AS CATEGORY, SYMBOL, ROUND(COMPOSITE_ALPHA, 3) AS ALPHA, TRADING_SIGNAL
    FROM latest 
    WHERE WEIGHTED_SIGNAL > 0
    ORDER BY COMPOSITE_ALPHA DESC 
    LIMIT 5
),
top_sells AS (
    SELECT 'üî¥ TOP SELLS' AS CATEGORY, SYMBOL, ROUND(COMPOSITE_ALPHA, 3) AS ALPHA, TRADING_SIGNAL
    FROM latest 
    WHERE WEIGHTED_SIGNAL < 0
    ORDER BY COMPOSITE_ALPHA ASC 
    LIMIT 5
)
SELECT * FROM top_buys
UNION ALL
SELECT * FROM top_sells
"""

print("üìä TOP TRADING OPPORTUNITIES")
print("=" * 60)
session.sql(top_signals_sql).show()


In [None]:
# Signal distribution summary
signal_summary_sql = """
SELECT 
    TRADING_SIGNAL,
    COUNT(*) AS COUNT,
    ROUND(AVG(COMPOSITE_ALPHA), 3) AS AVG_ALPHA,
    ROUND(AVG(MOMENTUM_RANK), 3) AS AVG_MOMENTUM,
    ROUND(AVG(SENTIMENT_RANK), 3) AS AVG_SENTIMENT
FROM ALPHA_SIGNALS
WHERE DATE = (SELECT MAX(DATE) FROM ALPHA_SIGNALS)
GROUP BY TRADING_SIGNAL
ORDER BY AVG_ALPHA DESC
"""

print("üìà SIGNAL DISTRIBUTION SUMMARY")
print("=" * 60)
session.sql(signal_summary_sql).show()


---
## 6Ô∏è‚É£ Visualize Results

Create charts to visualize alpha signals and factor performance.


In [None]:
# Load data for visualization
# Note: Snowflake Notebooks uses Streamlit for rendering
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go

alpha_data = session.table("ALPHA_SIGNALS").to_pandas()
alpha_data['DATE'] = pd.to_datetime(alpha_data['DATE'])

print(f"Loaded {len(alpha_data):,} records for visualization")


In [None]:
# Alpha Score Distribution
fig = px.histogram(
    alpha_data, 
    x='COMPOSITE_ALPHA',
    nbins=50,
    title='üìä Distribution of Composite Alpha Scores',
    color_discrete_sequence=['#29B5E8']
)
fig.update_layout(
    xaxis_title='Alpha Score',
    yaxis_title='Frequency',
    template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)


In [None]:
# Alpha by Symbol (latest date)
latest_date = alpha_data['DATE'].max()
latest_alpha = alpha_data[alpha_data['DATE'] == latest_date].sort_values('COMPOSITE_ALPHA', ascending=False)

fig = px.bar(
    latest_alpha,
    x='SYMBOL',
    y='COMPOSITE_ALPHA',
    color='TRADING_SIGNAL',
    color_discrete_map={
        'STRONG_BUY': '#00C853',
        'BUY': '#29B5E8',
        'HOLD': '#888888',
        'SELL': '#FF9800',
        'STRONG_SELL': '#FF1744'
    },
    title=f'üéØ Alpha Scores by Symbol ({latest_date.strftime("%Y-%m-%d")})'
)
fig.update_layout(xaxis_title='Symbol', yaxis_title='Composite Alpha')
st.plotly_chart(fig, use_container_width=True)


In [None]:
# Factor Correlation Heatmap
factor_cols = ['MOMENTUM_RANK', 'VOLATILITY_RANK', 'SENTIMENT_RANK', 'COMPOSITE_ALPHA']
corr_matrix = alpha_data[factor_cols].corr()

fig = px.imshow(
    corr_matrix,
    labels=dict(color="Correlation"),
    x=['Momentum', 'Volatility', 'Sentiment', 'Composite'],
    y=['Momentum', 'Volatility', 'Sentiment', 'Composite'],
    color_continuous_scale='RdBu',
    title='üîó Factor Correlation Matrix'
)
st.plotly_chart(fig, use_container_width=True)


---
## 7Ô∏è‚É£ Backtest Alpha Strategy

Calculate the historical performance of our alpha signals.


In [None]:
# Calculate forward returns and strategy performance
# Using WEIGHTED_SIGNAL (works correctly with optimized weights)

backtest_sql = """
WITH signals_with_returns AS (
    SELECT 
        a.*,
        LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) AS CLOSE_5D_FORWARD,
        (LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) - CLOSE) / CLOSE AS FORWARD_RETURN_5D
    FROM ALPHA_SIGNALS a
),
strategy_returns AS (
    SELECT 
        DATE,
        SYMBOL,
        WEIGHTED_SIGNAL,
        TRADING_SIGNAL,
        FORWARD_RETURN_5D,
        -- Use WEIGHTED_SIGNAL instead of COMPOSITE_ALPHA thresholds
        -- Positive signal = LONG, Negative signal = SHORT
        CASE 
            WHEN WEIGHTED_SIGNAL > 0 THEN FORWARD_RETURN_5D      -- LONG position
            WHEN WEIGHTED_SIGNAL < 0 THEN -FORWARD_RETURN_5D     -- SHORT position
            ELSE 0                                                -- No trade
        END AS STRATEGY_RETURN
    FROM signals_with_returns
    WHERE FORWARD_RETURN_5D IS NOT NULL
)
SELECT 
    'IC-Optimized Alpha Strategy' AS STRATEGY,
    COUNT(*) AS N_TRADES,
    ROUND(AVG(STRATEGY_RETURN) * 100, 3) AS AVG_RETURN_PCT,
    ROUND(STDDEV(STRATEGY_RETURN) * 100, 3) AS STD_DEV_PCT,
    ROUND(AVG(STRATEGY_RETURN) / NULLIF(STDDEV(STRATEGY_RETURN), 0) * SQRT(52), 2) AS SHARPE_RATIO,
    ROUND(SUM(CASE WHEN STRATEGY_RETURN > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS HIT_RATE_PCT
FROM strategy_returns
WHERE STRATEGY_RETURN != 0
"""

print("üìà BACKTEST RESULTS (IC-Optimized Strategy)")
print("=" * 60)
print("Strategy: LONG when WEIGHTED_SIGNAL > 0, SHORT when < 0")
print("Weights: Momentum -20%, Volatility -30%, Sentiment +50%")
print("=" * 60)
session.sql(backtest_sql).show()


### üìä Long-Short Equity Analysis

First, let's examine the **current portfolio composition** - which stocks are we long vs short, and what's our sector exposure?

This shows the **real-time trading signals** from our alpha model.


In [None]:
# Long-Short Equity Analysis
# Shows current portfolio positions and sector exposure

# Define sector mappings
sector_map_sql = """
SELECT 'AAPL' AS SYMBOL, 'Tech' AS SECTOR UNION ALL
SELECT 'GOOGL', 'Tech' UNION ALL SELECT 'MSFT', 'Tech' UNION ALL
SELECT 'AMZN', 'Tech' UNION ALL SELECT 'META', 'Tech' UNION ALL
SELECT 'NVDA', 'Tech' UNION ALL SELECT 'TSLA', 'Tech' UNION ALL
SELECT 'JPM', 'Financials' UNION ALL SELECT 'GS', 'Financials' UNION ALL
SELECT 'MS', 'Financials' UNION ALL SELECT 'BAC', 'Financials' UNION ALL
SELECT 'WFC', 'Financials' UNION ALL SELECT 'C', 'Financials' UNION ALL
SELECT 'V', 'Payments' UNION ALL SELECT 'MA', 'Payments' UNION ALL
SELECT 'PYPL', 'Payments' UNION ALL SELECT 'SQ', 'Payments' UNION ALL
SELECT 'JNJ', 'Healthcare' UNION ALL SELECT 'UNH', 'Healthcare' UNION ALL
SELECT 'PFE', 'Healthcare' UNION ALL SELECT 'MRK', 'Healthcare' UNION ALL
SELECT 'ABBV', 'Healthcare' UNION ALL
SELECT 'WMT', 'Consumer' UNION ALL SELECT 'COST', 'Consumer' UNION ALL
SELECT 'HD', 'Consumer' UNION ALL SELECT 'NKE', 'Consumer' UNION ALL
SELECT 'SBUX', 'Consumer' UNION ALL
SELECT 'XOM', 'Energy' UNION ALL SELECT 'CVX', 'Energy' UNION ALL
SELECT 'COP', 'Energy' UNION ALL
SELECT 'CAT', 'Industrials' UNION ALL SELECT 'BA', 'Industrials' UNION ALL
SELECT 'UPS', 'Industrials' UNION ALL SELECT 'HON', 'Industrials'
"""

# Current Long-Short Portfolio Holdings
holdings_sql = f"""
WITH sectors AS ({sector_map_sql}),
latest_signals AS (
    SELECT 
        a.SYMBOL,
        a.CLOSE AS PRICE,
        a.WEIGHTED_SIGNAL,
        a.TRADING_SIGNAL,
        a.COMPOSITE_ALPHA,
        a.MOMENTUM_RANK,
        a.VOLATILITY_RANK,
        a.SENTIMENT_RANK,
        s.SECTOR,
        CASE 
            WHEN a.WEIGHTED_SIGNAL > 0 THEN 'LONG'
            WHEN a.WEIGHTED_SIGNAL < 0 THEN 'SHORT'
            ELSE 'NO_POSITION'
        END AS POSITION
    FROM ALPHA_SIGNALS a
    LEFT JOIN sectors s ON a.SYMBOL = s.SYMBOL
    WHERE a.DATE = (SELECT MAX(DATE) FROM ALPHA_SIGNALS)
)
SELECT * FROM latest_signals
WHERE POSITION != 'NO_POSITION'
ORDER BY POSITION, COMPOSITE_ALPHA DESC
"""

holdings_df = session.sql(holdings_sql).to_pandas()

# Separate long and short positions
longs = holdings_df[holdings_df['POSITION'] == 'LONG']
shorts = holdings_df[holdings_df['POSITION'] == 'SHORT']

print("=" * 80)
print("üìä LONG-SHORT EQUITY PORTFOLIO ANALYSIS")
print("=" * 80)

print(f"\nüü¢ LONG POSITIONS ({len(longs)} stocks)")
print("-" * 60)
if len(longs) > 0:
    print(f"{'Symbol':<8} {'Sector':<12} {'Price':>10} {'Alpha':>8} {'Signal':>12}")
    print("-" * 60)
    for _, row in longs.iterrows():
        print(f"{row['SYMBOL']:<8} {row['SECTOR']:<12} ${row['PRICE']:>8.2f} {row['COMPOSITE_ALPHA']:>+8.3f} {row['TRADING_SIGNAL']:>12}")

print(f"\nüî¥ SHORT POSITIONS ({len(shorts)} stocks)")
print("-" * 60)
if len(shorts) > 0:
    print(f"{'Symbol':<8} {'Sector':<12} {'Price':>10} {'Alpha':>8} {'Signal':>12}")
    print("-" * 60)
    for _, row in shorts.iterrows():
        print(f"{row['SYMBOL']:<8} {row['SECTOR']:<12} ${row['PRICE']:>8.2f} {row['COMPOSITE_ALPHA']:>+8.3f} {row['TRADING_SIGNAL']:>12}")


In [None]:
# Sector Exposure Analysis
# This is critical for hedge funds to manage concentration risk

sector_exposure_sql = f"""
WITH sectors AS ({sector_map_sql}),
latest_signals AS (
    SELECT 
        a.SYMBOL,
        a.WEIGHTED_SIGNAL,
        s.SECTOR,
        CASE 
            WHEN a.WEIGHTED_SIGNAL > 0 THEN 'LONG'
            WHEN a.WEIGHTED_SIGNAL < 0 THEN 'SHORT'
            ELSE 'NEUTRAL'
        END AS POSITION
    FROM ALPHA_SIGNALS a
    LEFT JOIN sectors s ON a.SYMBOL = s.SYMBOL
    WHERE a.DATE = (SELECT MAX(DATE) FROM ALPHA_SIGNALS)
)
SELECT 
    SECTOR,
    SUM(CASE WHEN POSITION = 'LONG' THEN 1 ELSE 0 END) AS LONG_COUNT,
    SUM(CASE WHEN POSITION = 'SHORT' THEN 1 ELSE 0 END) AS SHORT_COUNT,
    SUM(CASE WHEN POSITION = 'LONG' THEN 1 ELSE 0 END) - 
    SUM(CASE WHEN POSITION = 'SHORT' THEN 1 ELSE 0 END) AS NET_EXPOSURE
FROM latest_signals
GROUP BY SECTOR
ORDER BY NET_EXPOSURE DESC
"""

sector_df = session.sql(sector_exposure_sql).to_pandas()

print("\n" + "=" * 80)
print("üè≠ SECTOR EXPOSURE ANALYSIS")
print("=" * 80)
print(f"\n{'Sector':<15} {'Longs':>8} {'Shorts':>8} {'Net':>8} {'Exposure Bar':<20}")
print("-" * 70)

for _, row in sector_df.iterrows():
    net = row['NET_EXPOSURE']
    bar_len = min(abs(net), 10)
    if net > 0:
        bar = "üü¢" * bar_len
        exposure = "NET LONG"
    elif net < 0:
        bar = "üî¥" * bar_len
        exposure = "NET SHORT"
    else:
        bar = "‚ö™"
        exposure = "NEUTRAL"
    print(f"{row['SECTOR']:<15} {row['LONG_COUNT']:>8} {row['SHORT_COUNT']:>8} {net:>+8} {bar}")

# Portfolio Summary
total_longs = len(longs)
total_shorts = len(shorts)
gross_exposure = total_longs + total_shorts
net_exposure = total_longs - total_shorts

print("\n" + "=" * 80)
print("üìà PORTFOLIO SUMMARY")
print("=" * 80)
print(f"   ‚Ä¢ Long Positions:   {total_longs} stocks")
print(f"   ‚Ä¢ Short Positions:  {total_shorts} stocks")
print(f"   ‚Ä¢ Gross Exposure:   {gross_exposure} positions")
print(f"   ‚Ä¢ Net Exposure:     {net_exposure:+d} ({'Long Bias' if net_exposure > 0 else 'Short Bias' if net_exposure < 0 else 'Market Neutral'})")
print(f"   ‚Ä¢ Long/Short Ratio: {total_longs/max(total_shorts,1):.2f}x")


In [None]:
# Visualize Long-Short Portfolio by Sector
from plotly.subplots import make_subplots

# Create side-by-side bar chart
fig = make_subplots(rows=1, cols=2, 
                    subplot_titles=('Portfolio Positions by Sector', 'Long vs Short Distribution'),
                    specs=[[{"type": "bar"}, {"type": "pie"}]])

# Bar chart: Sector exposure
fig.add_trace(
    go.Bar(name='Long', x=sector_df['SECTOR'], y=sector_df['LONG_COUNT'], 
           marker_color='#00C853'),
    row=1, col=1
)
fig.add_trace(
    go.Bar(name='Short', x=sector_df['SECTOR'], y=-sector_df['SHORT_COUNT'], 
           marker_color='#FF1744'),
    row=1, col=1
)

# Pie chart: Long vs Short split
fig.add_trace(
    go.Pie(labels=['Long Positions', 'Short Positions'], 
           values=[total_longs, total_shorts],
           marker_colors=['#00C853', '#FF1744'],
           hole=0.4),
    row=1, col=2
)

fig.update_layout(
    title_text='üìä Long-Short Equity Portfolio Composition',
    barmode='relative',
    height=400
)
fig.update_yaxes(title_text="# of Positions", row=1, col=1)

st.plotly_chart(fig, use_container_width=True)

# Show the actual holdings table
print("\nüìã DETAILED HOLDINGS:")
session.sql(holdings_sql).show()


### üìàüìâ Long-Short Portfolio Backtest

Now let's see how this strategy would have performed **historically**.

This is how hedge funds actually trade: **LONG** the winners, **SHORT** the losers, and capture the **spread**.


In [None]:
# Long-Short Portfolio Backtest
# LONG top 20% (signal = +1), SHORT bottom 20% (signal = -1)

long_short_sql = """
WITH signals_with_returns AS (
    -- Get forward 5-day returns for each signal
    SELECT 
        DATE,
        SYMBOL,
        TRADING_SIGNAL,
        COMPOSITE_ALPHA,
        WEIGHTED_SIGNAL,
        CLOSE,
        LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) AS CLOSE_5D_FORWARD,
        (LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) - CLOSE) / CLOSE AS FORWARD_RETURN_5D
    FROM ALPHA_SIGNALS
),
daily_returns AS (
    -- Calculate daily long and short portfolio returns
    SELECT 
        DATE,
        -- Long portfolio: average return of stocks with positive signal
        AVG(CASE WHEN WEIGHTED_SIGNAL > 0 THEN FORWARD_RETURN_5D END) AS LONG_RETURN,
        COUNT(CASE WHEN WEIGHTED_SIGNAL > 0 THEN 1 END) AS LONG_COUNT,
        -- Short portfolio: average return of stocks with negative signal  
        AVG(CASE WHEN WEIGHTED_SIGNAL < 0 THEN FORWARD_RETURN_5D END) AS SHORT_RETURN,
        COUNT(CASE WHEN WEIGHTED_SIGNAL < 0 THEN 1 END) AS SHORT_COUNT,
        -- Long-Short spread (this is the hedge fund return!)
        AVG(CASE WHEN WEIGHTED_SIGNAL > 0 THEN FORWARD_RETURN_5D END) - 
        AVG(CASE WHEN WEIGHTED_SIGNAL < 0 THEN FORWARD_RETURN_5D END) AS LONG_SHORT_RETURN
    FROM signals_with_returns
    WHERE FORWARD_RETURN_5D IS NOT NULL
    GROUP BY DATE
    HAVING LONG_COUNT > 0 AND SHORT_COUNT > 0
)
SELECT 
    'Long Portfolio (Buy Winners)' AS PORTFOLIO,
    ROUND(AVG(LONG_RETURN) * 100, 3) AS AVG_RETURN_PCT,
    ROUND(STDDEV(LONG_RETURN) * 100, 3) AS VOLATILITY_PCT,
    ROUND(AVG(LONG_RETURN) / NULLIF(STDDEV(LONG_RETURN), 0) * SQRT(52), 2) AS SHARPE,
    ROUND(SUM(CASE WHEN LONG_RETURN > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS WIN_RATE
FROM daily_returns

UNION ALL

SELECT 
    'Short Portfolio (Sell Losers)' AS PORTFOLIO,
    ROUND(AVG(-SHORT_RETURN) * 100, 3) AS AVG_RETURN_PCT,  -- Negative because we're short
    ROUND(STDDEV(SHORT_RETURN) * 100, 3) AS VOLATILITY_PCT,
    ROUND(AVG(-SHORT_RETURN) / NULLIF(STDDEV(SHORT_RETURN), 0) * SQRT(52), 2) AS SHARPE,
    ROUND(SUM(CASE WHEN SHORT_RETURN < 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS WIN_RATE
FROM daily_returns

UNION ALL

SELECT 
    'üìä LONG-SHORT SPREAD (Hedge Fund Return)' AS PORTFOLIO,
    ROUND(AVG(LONG_SHORT_RETURN) * 100, 3) AS AVG_RETURN_PCT,
    ROUND(STDDEV(LONG_SHORT_RETURN) * 100, 3) AS VOLATILITY_PCT,
    ROUND(AVG(LONG_SHORT_RETURN) / NULLIF(STDDEV(LONG_SHORT_RETURN), 0) * SQRT(52), 2) AS SHARPE,
    ROUND(SUM(CASE WHEN LONG_SHORT_RETURN > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS WIN_RATE
FROM daily_returns
"""

print("üìàüìâ LONG-SHORT PORTFOLIO BACKTEST")
print("=" * 70)
print("Strategy: LONG top 20% momentum, SHORT bottom 20% momentum")
print("Holding Period: 5 days")
print("=" * 70)
session.sql(long_short_sql).show()


In [None]:
# Visualize Cumulative Long-Short Returns Over Time
cumulative_sql = """
WITH signals_with_returns AS (
    SELECT 
        DATE,
        SYMBOL,
        WEIGHTED_SIGNAL,
        CLOSE,
        (LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) - CLOSE) / CLOSE AS FORWARD_RETURN_5D
    FROM ALPHA_SIGNALS
),
daily_returns AS (
    SELECT 
        DATE,
        AVG(CASE WHEN WEIGHTED_SIGNAL > 0 THEN FORWARD_RETURN_5D END) AS LONG_RETURN,
        AVG(CASE WHEN WEIGHTED_SIGNAL < 0 THEN FORWARD_RETURN_5D END) AS SHORT_RETURN,
        AVG(CASE WHEN WEIGHTED_SIGNAL > 0 THEN FORWARD_RETURN_5D END) - 
        AVG(CASE WHEN WEIGHTED_SIGNAL < 0 THEN FORWARD_RETURN_5D END) AS LONG_SHORT_RETURN
    FROM signals_with_returns
    WHERE FORWARD_RETURN_5D IS NOT NULL
    GROUP BY DATE
    HAVING LONG_RETURN IS NOT NULL AND SHORT_RETURN IS NOT NULL
)
SELECT 
    DATE,
    LONG_RETURN,
    SHORT_RETURN,
    LONG_SHORT_RETURN,
    SUM(LONG_SHORT_RETURN) OVER (ORDER BY DATE) AS CUMULATIVE_RETURN
FROM daily_returns
ORDER BY DATE
"""

# Get data for plotting
cumulative_data = session.sql(cumulative_sql).to_pandas()
cumulative_data['DATE'] = pd.to_datetime(cumulative_data['DATE'])
cumulative_data['CUMULATIVE_PCT'] = cumulative_data['CUMULATIVE_RETURN'] * 100

# Plot cumulative returns
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=cumulative_data['DATE'],
    y=cumulative_data['CUMULATIVE_PCT'],
    mode='lines',
    name='Long-Short Cumulative Return',
    line=dict(color='#00C853', width=2),
    fill='tozeroy',
    fillcolor='rgba(0, 200, 83, 0.2)'
))

# Add zero line
fig.add_hline(y=0, line_dash="dash", line_color="gray")

fig.update_layout(
    title='üìà Cumulative Long-Short Portfolio Returns',
    xaxis_title='Date',
    yaxis_title='Cumulative Return (%)',
    template='plotly_white',
    hovermode='x unified'
)

st.plotly_chart(fig, use_container_width=True)

# Summary stats
total_return = cumulative_data['CUMULATIVE_PCT'].iloc[-1] if len(cumulative_data) > 0 else 0
print(f"üìä Total Cumulative Return: {total_return:.2f}%")
print(f"üìÖ Period: {cumulative_data['DATE'].min().strftime('%Y-%m-%d')} to {cumulative_data['DATE'].max().strftime('%Y-%m-%d')}")


### üîó Pairs Trading Analysis

**Pairs Trading Strategy:**
1. Find two stocks that are **highly correlated** (move together)
2. Detect when they **diverge** (unusual gap opens)
3. **SHORT** the outperformer, **LONG** the underperformer
4. Profit when they **converge** back to normal


In [None]:
# Step 1: Calculate Correlation Matrix between all stocks
correlation_sql = """
WITH daily_returns AS (
    -- Calculate daily returns for each stock
    SELECT 
        DATE,
        SYMBOL,
        CLOSE,
        (CLOSE - LAG(CLOSE) OVER (PARTITION BY SYMBOL ORDER BY DATE)) 
            / NULLIF(LAG(CLOSE) OVER (PARTITION BY SYMBOL ORDER BY DATE), 0) AS DAILY_RETURN
    FROM MARKET_DATA
)
SELECT 
    a.SYMBOL AS SYMBOL_1,
    b.SYMBOL AS SYMBOL_2,
    ROUND(CORR(a.DAILY_RETURN, b.DAILY_RETURN), 3) AS CORRELATION,
    COUNT(*) AS TRADING_DAYS
FROM daily_returns a
JOIN daily_returns b 
    ON a.DATE = b.DATE 
    AND a.SYMBOL < b.SYMBOL  -- Avoid duplicates (A-B, not B-A)
WHERE a.DAILY_RETURN IS NOT NULL 
  AND b.DAILY_RETURN IS NOT NULL
GROUP BY a.SYMBOL, b.SYMBOL
HAVING COUNT(*) >= 50  -- Need enough data points
ORDER BY CORRELATION DESC
"""

print("üîó STOCK CORRELATION MATRIX")
print("=" * 60)
print("Highly correlated pairs are candidates for pairs trading")
print("=" * 60)
correlation_df = session.sql(correlation_sql).to_pandas()
correlation_df.head(15)


In [None]:
# Visualize Correlation Heatmap
# Pivot to matrix format
symbols = list(set(correlation_df['SYMBOL_1'].tolist() + correlation_df['SYMBOL_2'].tolist()))
symbols.sort()

# Create correlation matrix
corr_matrix = pd.DataFrame(index=symbols, columns=symbols, data=1.0)  # Diagonal = 1
for _, row in correlation_df.iterrows():
    corr_matrix.loc[row['SYMBOL_1'], row['SYMBOL_2']] = row['CORRELATION']
    corr_matrix.loc[row['SYMBOL_2'], row['SYMBOL_1']] = row['CORRELATION']

# Plot heatmap
fig = px.imshow(
    corr_matrix.astype(float),
    labels=dict(color="Correlation"),
    x=symbols,
    y=symbols,
    color_continuous_scale='RdYlGn',
    zmin=-1,
    zmax=1,
    title='üîó Stock Correlation Heatmap (Pairs Trading Candidates)'
)
fig.update_layout(width=700, height=600)
st.plotly_chart(fig, use_container_width=True)

# Show top correlated pairs
print("\nüéØ TOP CORRELATED PAIRS (Best for Pairs Trading):")
print("=" * 50)
top_pairs = correlation_df[correlation_df['CORRELATION'] > 0.5].head(10)
for _, row in top_pairs.iterrows():
    print(f"   {row['SYMBOL_1']} ‚Üî {row['SYMBOL_2']}: {row['CORRELATION']:.3f}")


In [None]:
# Step 2: Detect Divergences in HIGHLY CORRELATED Pairs Only
# Key insight: Only look for divergences in pairs that REALLY move together!
# Best pairs: Same sector, same business model, high correlation (>= 0.70)
# NOW USING ASOF JOIN for robust 20-day lookback!

# Define sector mappings for filtering
SECTOR_PAIRS = """
-- Define which stocks are in the same sector (best for pairs trading)
SELECT 'AAPL' AS SYMBOL, 'Tech' AS SECTOR UNION ALL
SELECT 'GOOGL', 'Tech' UNION ALL SELECT 'MSFT', 'Tech' UNION ALL
SELECT 'AMZN', 'Tech' UNION ALL SELECT 'META', 'Tech' UNION ALL
SELECT 'NVDA', 'Tech' UNION ALL SELECT 'TSLA', 'Tech' UNION ALL
SELECT 'JPM', 'Financials' UNION ALL SELECT 'GS', 'Financials' UNION ALL
SELECT 'MS', 'Financials' UNION ALL SELECT 'BAC', 'Financials' UNION ALL
SELECT 'WFC', 'Financials' UNION ALL SELECT 'C', 'Financials' UNION ALL
SELECT 'V', 'Payments' UNION ALL SELECT 'MA', 'Payments' UNION ALL
SELECT 'PYPL', 'Payments' UNION ALL SELECT 'SQ', 'Payments' UNION ALL
SELECT 'JNJ', 'Healthcare' UNION ALL SELECT 'UNH', 'Healthcare' UNION ALL
SELECT 'PFE', 'Healthcare' UNION ALL SELECT 'MRK', 'Healthcare' UNION ALL
SELECT 'ABBV', 'Healthcare' UNION ALL
SELECT 'WMT', 'Consumer' UNION ALL SELECT 'COST', 'Consumer' UNION ALL
SELECT 'HD', 'Consumer' UNION ALL SELECT 'NKE', 'Consumer' UNION ALL
SELECT 'SBUX', 'Consumer' UNION ALL
SELECT 'XOM', 'Energy' UNION ALL SELECT 'CVX', 'Energy' UNION ALL
SELECT 'COP', 'Energy' UNION ALL
SELECT 'CAT', 'Industrials' UNION ALL SELECT 'BA', 'Industrials' UNION ALL
SELECT 'UPS', 'Industrials' UNION ALL SELECT 'HON', 'Industrials'
"""

pairs_divergence_sql = f"""
WITH sectors AS (
    {SECTOR_PAIRS}
),
-- Step 1: Get current prices
current_prices AS (
    SELECT DATE, SYMBOL, CLOSE
    FROM MARKET_DATA
),
-- Step 2: Historical prices for ASOF JOIN
historical_prices AS (
    SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
    FROM MARKET_DATA
),
-- Step 3: Use ASOF JOIN to find price ~20 trading days ago (28 calendar days)
returns_with_asof AS (
    SELECT 
        c.DATE,
        c.SYMBOL,
        c.CLOSE,
        h.HIST_DATE,
        h.HIST_CLOSE AS CLOSE_20D_AGO,
        -- Daily return (still use LAG for previous day - it's fine for this)
        (c.CLOSE - LAG(c.CLOSE) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE)) 
            / NULLIF(LAG(c.CLOSE) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE), 0) AS DAILY_RETURN,
        -- 20-day return using ASOF JOIN (more robust!)
        (c.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS RETURN_20D
    FROM current_prices c
    ASOF JOIN historical_prices h
        MATCH_CONDITION (DATEADD('day', -28, c.DATE) >= h.HIST_DATE)
        ON c.SYMBOL = h.SYMBOL
),
daily_returns AS (
    SELECT * FROM returns_with_asof
    WHERE RETURN_20D IS NOT NULL
),
-- FIRST: Calculate correlations for SAME-SECTOR pairs with HIGH correlation
pair_correlations AS (
    SELECT 
        a.SYMBOL AS SYMBOL_1,
        b.SYMBOL AS SYMBOL_2,
        s1.SECTOR AS SECTOR,
        CORR(a.DAILY_RETURN, b.DAILY_RETURN) AS CORRELATION
    FROM daily_returns a
    JOIN daily_returns b 
        ON a.DATE = b.DATE 
        AND a.SYMBOL < b.SYMBOL
    -- Join sector info
    JOIN sectors s1 ON a.SYMBOL = s1.SYMBOL
    JOIN sectors s2 ON b.SYMBOL = s2.SYMBOL
    WHERE a.DAILY_RETURN IS NOT NULL 
      AND b.DAILY_RETURN IS NOT NULL
      AND s1.SECTOR = s2.SECTOR  -- ‚ö†Ô∏è SAME SECTOR ONLY!
    GROUP BY a.SYMBOL, b.SYMBOL, s1.SECTOR
    HAVING CORR(a.DAILY_RETURN, b.DAILY_RETURN) >= 0.65  -- ‚ö†Ô∏è Higher threshold!
),
-- THEN: Analyze only the same-sector correlated pairs
pair_analysis AS (
    SELECT 
        a.DATE,
        a.SYMBOL AS SYMBOL_1,
        b.SYMBOL AS SYMBOL_2,
        pc.SECTOR,  -- Include sector for reference
        pc.CORRELATION,  -- Include correlation for reference
        a.CLOSE AS CLOSE_1,
        b.CLOSE AS CLOSE_2,
        a.RETURN_20D AS RETURN_20D_1,
        b.RETURN_20D AS RETURN_20D_2,
        -- Spread = difference in 20-day returns
        a.RETURN_20D - b.RETURN_20D AS RETURN_SPREAD,
        -- Normalized price ratio
        a.CLOSE / NULLIF(b.CLOSE, 0) AS PRICE_RATIO
    FROM daily_returns a
    JOIN daily_returns b 
        ON a.DATE = b.DATE 
        AND a.SYMBOL < b.SYMBOL
    -- ‚ö†Ô∏è KEY FIX: Only join pairs that passed the sector + correlation filter!
    JOIN pair_correlations pc 
        ON a.SYMBOL = pc.SYMBOL_1 
        AND b.SYMBOL = pc.SYMBOL_2
    WHERE a.RETURN_20D IS NOT NULL 
      AND b.RETURN_20D IS NOT NULL
),
-- Calculate spread statistics
spread_stats AS (
    SELECT 
        SYMBOL_1,
        SYMBOL_2,
        SECTOR,
        CORRELATION,
        DATE,
        CLOSE_1,
        CLOSE_2,
        RETURN_20D_1,
        RETURN_20D_2,
        RETURN_SPREAD,
        PRICE_RATIO,
        AVG(RETURN_SPREAD) OVER (PARTITION BY SYMBOL_1, SYMBOL_2 ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_MEAN,
        STDDEV(RETURN_SPREAD) OVER (PARTITION BY SYMBOL_1, SYMBOL_2 ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_STD
    FROM pair_analysis
),
-- Calculate Z-score (how many std deviations from mean)
divergence_signals AS (
    SELECT 
        *,
        (RETURN_SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) AS ZSCORE,
        CASE 
            WHEN (RETURN_SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) > 2 THEN 'SHORT_1_LONG_2'
            WHEN (RETURN_SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) < -2 THEN 'LONG_1_SHORT_2'
            ELSE 'NO_SIGNAL'
        END AS PAIRS_SIGNAL
    FROM spread_stats
    WHERE SPREAD_STD IS NOT NULL AND SPREAD_STD > 0
)
SELECT 
    DATE,
    SECTOR,  -- Show which sector
    SYMBOL_1,
    SYMBOL_2,
    ROUND(CORRELATION, 2) AS CORR,  -- Show correlation!
    ROUND(RETURN_20D_1 * 100, 2) AS RETURN_1_PCT,
    ROUND(RETURN_20D_2 * 100, 2) AS RETURN_2_PCT,
    ROUND(RETURN_SPREAD * 100, 2) AS SPREAD_PCT,
    ROUND(ZSCORE, 2) AS ZSCORE,
    PAIRS_SIGNAL
FROM divergence_signals
WHERE DATE = (SELECT MAX(DATE) FROM divergence_signals)
ORDER BY CORRELATION DESC, ABS(ZSCORE) DESC  -- Show ALL pairs, sorted by correlation
"""

print("üìä ALL CORRELATED PAIRS STATUS (Same-Sector, Correlation >= 0.65)")
print("=" * 80)
print("‚ö†Ô∏è  FILTERS: Same sector only, Correlation >= 0.65")
print("-" * 80)
divergence_df = session.sql(pairs_divergence_sql).to_pandas()

# Separate into divergent and non-divergent
divergent = divergence_df[abs(divergence_df['ZSCORE']) >= 2]
approaching = divergence_df[(abs(divergence_df['ZSCORE']) >= 1.5) & (abs(divergence_df['ZSCORE']) < 2)]
stable = divergence_df[abs(divergence_df['ZSCORE']) < 1.5]

print(f"\nüî¥ DIVERGENT (|Z| >= 2) - TRADE NOW: {len(divergent)} pairs")
if len(divergent) > 0:
    for _, row in divergent.iterrows():
        action = "SHORT " + row['SYMBOL_1'] + ", LONG " + row['SYMBOL_2'] if row['ZSCORE'] > 0 else "LONG " + row['SYMBOL_1'] + ", SHORT " + row['SYMBOL_2']
        print(f"   {row['SECTOR']:12} ‚îÇ {row['SYMBOL_1']:5}‚Üî{row['SYMBOL_2']:5} ‚îÇ Corr: {row['CORR']:.2f} ‚îÇ Z: {row['ZSCORE']:+.2f} ‚îÇ {action}")

print(f"\nüü° APPROACHING (|Z| 1.5-2) - WATCH: {len(approaching)} pairs")
if len(approaching) > 0:
    for _, row in approaching.head(5).iterrows():
        print(f"   {row['SECTOR']:12} ‚îÇ {row['SYMBOL_1']:5}‚Üî{row['SYMBOL_2']:5} ‚îÇ Corr: {row['CORR']:.2f} ‚îÇ Z: {row['ZSCORE']:+.2f}")

print(f"\nüü¢ STABLE (|Z| < 1.5) - NO ACTION: {len(stable)} pairs")
if len(stable) > 0:
    # Show top 5 by correlation
    top_stable = stable.nlargest(5, 'CORR')
    for _, row in top_stable.iterrows():
        print(f"   {row['SECTOR']:12} ‚îÇ {row['SYMBOL_1']:5}‚Üî{row['SYMBOL_2']:5} ‚îÇ Corr: {row['CORR']:.2f} ‚îÇ Z: {row['ZSCORE']:+.2f} ‚îÇ Moving together ‚úÖ")

print("\n" + "=" * 80)
divergence_df


In [None]:
# Step 3: Visualize a Pair's Spread Over Time
# Pick the top correlated pair and show when divergences occur
# NOW USING ASOF JOIN for robust 20-day lookback!

# Get the top correlated pair
if len(correlation_df) > 0:
    top_pair = correlation_df.iloc[0]
    symbol_1 = top_pair['SYMBOL_1']
    symbol_2 = top_pair['SYMBOL_2']
    
    spread_history_sql = f"""
    WITH current_prices AS (
        SELECT DATE, SYMBOL, CLOSE
        FROM MARKET_DATA
        WHERE SYMBOL IN ('{symbol_1}', '{symbol_2}')
    ),
    historical_prices AS (
        SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
        FROM MARKET_DATA
        WHERE SYMBOL IN ('{symbol_1}', '{symbol_2}')
    ),
    -- Use ASOF JOIN for 20-day lookback
    prices AS (
        SELECT 
            c.DATE,
            c.SYMBOL,
            c.CLOSE,
            (c.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS RETURN_20D
        FROM current_prices c
        ASOF JOIN historical_prices h
            MATCH_CONDITION (DATEADD('day', -28, c.DATE) >= h.HIST_DATE)
            ON c.SYMBOL = h.SYMBOL
    ),
    paired AS (
        SELECT 
            a.DATE,
            a.CLOSE AS CLOSE_1,
            b.CLOSE AS CLOSE_2,
            a.RETURN_20D AS RETURN_1,
            b.RETURN_20D AS RETURN_2,
            a.RETURN_20D - b.RETURN_20D AS SPREAD
        FROM prices a
        JOIN prices b ON a.DATE = b.DATE
        WHERE a.SYMBOL = '{symbol_1}' AND b.SYMBOL = '{symbol_2}'
    )
    SELECT 
        DATE,
        CLOSE_1,
        CLOSE_2,
        SPREAD,
        AVG(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_MEAN,
        STDDEV(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_STD,
        (SPREAD - AVG(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW)) 
            / NULLIF(STDDEV(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW), 0) AS ZSCORE
    FROM paired
    WHERE SPREAD IS NOT NULL
    ORDER BY DATE
    """
    
    spread_data = session.sql(spread_history_sql).to_pandas()
    spread_data['DATE'] = pd.to_datetime(spread_data['DATE'])
    
    # Create subplot with prices and z-score
    from plotly.subplots import make_subplots
    
    fig = make_subplots(
        rows=2, cols=1,
        subplot_titles=(f'üìà Price Comparison: {symbol_1} vs {symbol_2}', 
                       f'üìä Spread Z-Score (Trading Signal)'),
        row_heights=[0.5, 0.5],
        vertical_spacing=0.12
    )
    
    # Normalize prices to 100 for comparison
    spread_data['PRICE_1_NORM'] = spread_data['CLOSE_1'] / spread_data['CLOSE_1'].iloc[0] * 100
    spread_data['PRICE_2_NORM'] = spread_data['CLOSE_2'] / spread_data['CLOSE_2'].iloc[0] * 100
    
    # Plot normalized prices
    fig.add_trace(go.Scatter(x=spread_data['DATE'], y=spread_data['PRICE_1_NORM'], 
                             name=symbol_1, line=dict(color='#2196F3')), row=1, col=1)
    fig.add_trace(go.Scatter(x=spread_data['DATE'], y=spread_data['PRICE_2_NORM'], 
                             name=symbol_2, line=dict(color='#FF9800')), row=1, col=1)
    
    # Plot Z-Score with trading bands
    fig.add_trace(go.Scatter(x=spread_data['DATE'], y=spread_data['ZSCORE'], 
                             name='Z-Score', line=dict(color='#4CAF50')), row=2, col=1)
    
    # Add trading bands at +/- 2 standard deviations
    fig.add_hline(y=2, line_dash="dash", line_color="red", row=2, col=1, 
                  annotation_text="SHORT Signal (+2œÉ)")
    fig.add_hline(y=-2, line_dash="dash", line_color="green", row=2, col=1,
                  annotation_text="LONG Signal (-2œÉ)")
    fig.add_hline(y=0, line_dash="dot", line_color="gray", row=2, col=1)
    
    fig.update_layout(
        height=600,
        title_text=f"üîó Pairs Trading: {symbol_1} ‚Üî {symbol_2} (Correlation: {top_pair['CORRELATION']:.2f})",
        showlegend=True
    )
    
    fig.update_yaxes(title_text="Normalized Price (Base 100)", row=1, col=1)
    fig.update_yaxes(title_text="Z-Score", row=2, col=1)
    
    st.plotly_chart(fig, use_container_width=True)
    
    print(f"\nüìä PAIRS TRADING INTERPRETATION:")
    print(f"   ‚Ä¢ When Z-Score > +2: {symbol_1} outperformed ‚Üí SHORT {symbol_1}, LONG {symbol_2}")
    print(f"   ‚Ä¢ When Z-Score < -2: {symbol_2} outperformed ‚Üí LONG {symbol_1}, SHORT {symbol_2}")
    print(f"   ‚Ä¢ Exit when Z-Score returns to 0 (convergence)")
else:
    print("No correlation data available")


In [None]:
# Step 4: Pairs Trading Backtest
# How profitable would pairs trading have been?
# NOW USING ASOF JOIN for robust 20-day lookback!

pairs_backtest_sql = """
WITH current_prices AS (
    SELECT DATE, SYMBOL, CLOSE
    FROM MARKET_DATA
),
historical_prices AS (
    SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
    FROM MARKET_DATA
),
-- Use ASOF JOIN for 20-day return, LEAD for forward return
daily_returns AS (
    SELECT 
        c.DATE,
        c.SYMBOL,
        c.CLOSE,
        LEAD(c.CLOSE, 5) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE) AS CLOSE_5D_FORWARD,
        (LEAD(c.CLOSE, 5) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE) - c.CLOSE) / c.CLOSE AS FORWARD_RETURN_5D,
        (c.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS RETURN_20D
    FROM current_prices c
    ASOF JOIN historical_prices h
        MATCH_CONDITION (DATEADD('day', -28, c.DATE) >= h.HIST_DATE)
        ON c.SYMBOL = h.SYMBOL
),
pair_signals AS (
    SELECT 
        a.DATE,
        a.SYMBOL AS SYMBOL_1,
        b.SYMBOL AS SYMBOL_2,
        a.RETURN_20D - b.RETURN_20D AS SPREAD,
        a.FORWARD_RETURN_5D AS FWD_RETURN_1,
        b.FORWARD_RETURN_5D AS FWD_RETURN_2,
        AVG(a.RETURN_20D - b.RETURN_20D) OVER (
            PARTITION BY a.SYMBOL, b.SYMBOL ORDER BY a.DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) AS SPREAD_MEAN,
        STDDEV(a.RETURN_20D - b.RETURN_20D) OVER (
            PARTITION BY a.SYMBOL, b.SYMBOL ORDER BY a.DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) AS SPREAD_STD
    FROM daily_returns a
    JOIN daily_returns b ON a.DATE = b.DATE AND a.SYMBOL < b.SYMBOL
    WHERE a.RETURN_20D IS NOT NULL AND b.RETURN_20D IS NOT NULL
      AND a.FORWARD_RETURN_5D IS NOT NULL AND b.FORWARD_RETURN_5D IS NOT NULL
),
signals_with_zscore AS (
    SELECT 
        *,
        (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) AS ZSCORE,
        CASE 
            WHEN (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) > 2 THEN 
                -FWD_RETURN_1 + FWD_RETURN_2  -- SHORT 1, LONG 2
            WHEN (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) < -2 THEN 
                FWD_RETURN_1 - FWD_RETURN_2   -- LONG 1, SHORT 2
            ELSE NULL
        END AS PAIRS_TRADE_RETURN
    FROM pair_signals
    WHERE SPREAD_STD > 0
)
SELECT 
    'Pairs Trading Strategy' AS STRATEGY,
    COUNT(PAIRS_TRADE_RETURN) AS NUM_TRADES,
    ROUND(AVG(PAIRS_TRADE_RETURN) * 100, 3) AS AVG_RETURN_PCT,
    ROUND(STDDEV(PAIRS_TRADE_RETURN) * 100, 3) AS VOLATILITY_PCT,
    ROUND(AVG(PAIRS_TRADE_RETURN) / NULLIF(STDDEV(PAIRS_TRADE_RETURN), 0) * SQRT(52), 2) AS SHARPE_RATIO,
    ROUND(SUM(CASE WHEN PAIRS_TRADE_RETURN > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS WIN_RATE_PCT
FROM signals_with_zscore
WHERE PAIRS_TRADE_RETURN IS NOT NULL
"""

print("üìä PAIRS TRADING BACKTEST RESULTS")
print("=" * 60)
print("Strategy: Enter when Z-Score > 2 or < -2, hold for 5 days")
print("=" * 60)
session.sql(pairs_backtest_sql).show()

print("\nüí° PAIRS TRADING vs MOMENTUM LONG-SHORT:")
print("   ‚Ä¢ Pairs Trading: Bets on CONVERGENCE (mean reversion)")
print("   ‚Ä¢ Momentum L/S:  Bets on CONTINUATION (trend following)")
print("   ‚Ä¢ Best practice: Combine both for diversification!")


### üî¨ Deep Dive: Sector-Based Pair Analysis

With our expanded stock universe (30+ stocks), we can now analyze pairs within sectors:
- **Intra-sector pairs** (GS‚ÜîJPM, XOM‚ÜîCVX) tend to have higher correlation
- **Cross-sector pairs** can provide diversification benefits
- **Sector rotation** signals when entire sectors diverge


In [None]:
# Deep Dive: Best Pairs by Sector

# Define sector mappings
sector_mapping = {
    # Tech Giants
    'AAPL': 'Tech', 'GOOGL': 'Tech', 'MSFT': 'Tech', 'AMZN': 'Tech', 
    'META': 'Tech', 'NVDA': 'Tech', 'TSLA': 'Tech',
    # Financials
    'JPM': 'Financials', 'GS': 'Financials', 'MS': 'Financials', 
    'BAC': 'Financials', 'WFC': 'Financials', 'C': 'Financials',
    # Payments
    'V': 'Payments', 'MA': 'Payments', 'PYPL': 'Payments', 'SQ': 'Payments',
    # Healthcare
    'JNJ': 'Healthcare', 'UNH': 'Healthcare', 'PFE': 'Healthcare', 
    'MRK': 'Healthcare', 'ABBV': 'Healthcare',
    # Consumer
    'WMT': 'Consumer', 'COST': 'Consumer', 'HD': 'Consumer', 
    'NKE': 'Consumer', 'SBUX': 'Consumer',
    # Energy
    'XOM': 'Energy', 'CVX': 'Energy', 'COP': 'Energy',
    # Industrials
    'CAT': 'Industrials', 'BA': 'Industrials', 'UPS': 'Industrials', 'HON': 'Industrials'
}

# Get correlations with sector labels
sector_corr_sql = """
WITH daily_returns AS (
    SELECT 
        DATE,
        SYMBOL,
        (CLOSE - LAG(CLOSE) OVER (PARTITION BY SYMBOL ORDER BY DATE)) 
            / NULLIF(LAG(CLOSE) OVER (PARTITION BY SYMBOL ORDER BY DATE), 0) AS DAILY_RETURN
    FROM MARKET_DATA
)
SELECT 
    a.SYMBOL AS SYMBOL_1,
    b.SYMBOL AS SYMBOL_2,
    ROUND(CORR(a.DAILY_RETURN, b.DAILY_RETURN), 3) AS CORRELATION,
    COUNT(*) AS TRADING_DAYS
FROM daily_returns a
JOIN daily_returns b 
    ON a.DATE = b.DATE 
    AND a.SYMBOL < b.SYMBOL
WHERE a.DAILY_RETURN IS NOT NULL 
  AND b.DAILY_RETURN IS NOT NULL
GROUP BY a.SYMBOL, b.SYMBOL
HAVING COUNT(*) >= 50
ORDER BY CORRELATION DESC
"""

sector_corr_df = session.sql(sector_corr_sql).to_pandas()

# Add sector labels
sector_corr_df['SECTOR_1'] = sector_corr_df['SYMBOL_1'].map(sector_mapping)
sector_corr_df['SECTOR_2'] = sector_corr_df['SYMBOL_2'].map(sector_mapping)
sector_corr_df['SAME_SECTOR'] = sector_corr_df['SECTOR_1'] == sector_corr_df['SECTOR_2']

# Show best pairs by category
print("üèÜ TOP INTRA-SECTOR PAIRS (Same Industry)")
print("=" * 70)
intra_sector = sector_corr_df[sector_corr_df['SAME_SECTOR']].head(10)
for _, row in intra_sector.iterrows():
    print(f"   {row['SECTOR_1']:12} ‚îÇ {row['SYMBOL_1']:5} ‚Üî {row['SYMBOL_2']:5} ‚îÇ Correlation: {row['CORRELATION']:.3f}")

print("\nüåê TOP CROSS-SECTOR PAIRS (Different Industries)")
print("=" * 70)
cross_sector = sector_corr_df[~sector_corr_df['SAME_SECTOR']].head(10)
for _, row in cross_sector.iterrows():
    print(f"   {row['SECTOR_1']:12} vs {row['SECTOR_2']:12} ‚îÇ {row['SYMBOL_1']:5} ‚Üî {row['SYMBOL_2']:5} ‚îÇ Correlation: {row['CORRELATION']:.3f}")

print("\nüí° INSIGHT:")
print("   ‚Ä¢ Intra-sector pairs: Higher correlation = more reliable mean reversion")
print("   ‚Ä¢ Cross-sector pairs: Unexpected correlations can reveal hidden connections")


In [None]:
# Interactive Deep Dive: Select Best Pair from Each Sector
# NOW USING ASOF JOIN for robust 20-day lookback!

import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Get the best pair from each sector
best_by_sector = intra_sector.groupby('SECTOR_1').first().reset_index()

print("üìä BEST PAIR FROM EACH SECTOR:")
print("=" * 70)

# Build subplot titles using iterrows (dictionary access)
subplot_titles = []
for _, row in best_by_sector.iterrows():
    subplot_titles.append(f"{row['SYMBOL_1']} vs {row['SYMBOL_2']} (Prices)")
    subplot_titles.append("Spread Z-Score")

fig = make_subplots(
    rows=len(best_by_sector), cols=2,
    subplot_titles=subplot_titles,
    horizontal_spacing=0.1,
    vertical_spacing=0.08
)

for idx, (_, pair) in enumerate(best_by_sector.iterrows()):
    symbol_1, symbol_2 = pair['SYMBOL_1'], pair['SYMBOL_2']
    
    # Get spread data for this pair using ASOF JOIN
    pair_sql = f"""
    WITH current_prices AS (
        SELECT DATE, SYMBOL, CLOSE
        FROM MARKET_DATA
        WHERE SYMBOL IN ('{symbol_1}', '{symbol_2}')
    ),
    historical_prices AS (
        SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
        FROM MARKET_DATA
        WHERE SYMBOL IN ('{symbol_1}', '{symbol_2}')
    ),
    daily_returns AS (
        SELECT 
            c.DATE, c.SYMBOL, c.CLOSE,
            (c.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS RETURN_20D
        FROM current_prices c
        ASOF JOIN historical_prices h
            MATCH_CONDITION (DATEADD('day', -28, c.DATE) >= h.HIST_DATE)
            ON c.SYMBOL = h.SYMBOL
    ),
    pivoted AS (
        SELECT a.DATE, 
               a.CLOSE AS CLOSE_1, a.RETURN_20D AS RETURN_1,
               b.CLOSE AS CLOSE_2, b.RETURN_20D AS RETURN_2,
               a.RETURN_20D - b.RETURN_20D AS SPREAD
        FROM daily_returns a
        JOIN daily_returns b ON a.DATE = b.DATE
        WHERE a.SYMBOL = '{symbol_1}' AND b.SYMBOL = '{symbol_2}'
    ),
    with_stats AS (
        SELECT *,
            AVG(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_MEAN,
            STDDEV(SPREAD) OVER (ORDER BY DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW) AS SPREAD_STD
        FROM pivoted
    )
    SELECT *, (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) AS ZSCORE
    FROM with_stats
    WHERE SPREAD_STD IS NOT NULL
    ORDER BY DATE
    """
    
    pair_data = session.sql(pair_sql).to_pandas()
    
    if len(pair_data) > 0:
        # Normalize prices
        pair_data['PRICE_1_NORM'] = pair_data['CLOSE_1'] / pair_data['CLOSE_1'].iloc[0] * 100
        pair_data['PRICE_2_NORM'] = pair_data['CLOSE_2'] / pair_data['CLOSE_2'].iloc[0] * 100
        
        row_num = idx + 1
        
        # Price chart
        fig.add_trace(go.Scatter(x=pair_data['DATE'], y=pair_data['PRICE_1_NORM'],
                                 name=symbol_1, line=dict(color='#2196F3', width=1.5),
                                 showlegend=(idx==0)), row=row_num, col=1)
        fig.add_trace(go.Scatter(x=pair_data['DATE'], y=pair_data['PRICE_2_NORM'],
                                 name=symbol_2, line=dict(color='#FF9800', width=1.5),
                                 showlegend=(idx==0)), row=row_num, col=1)
        
        # Z-Score chart with signal bands
        fig.add_trace(go.Scatter(x=pair_data['DATE'], y=pair_data['ZSCORE'],
                                 name='Z-Score', line=dict(color='#4CAF50', width=1),
                                 fill='tozeroy', fillcolor='rgba(76, 175, 80, 0.2)',
                                 showlegend=(idx==0)), row=row_num, col=2)
        
        # Add signal bands
        fig.add_hline(y=2, line_dash="dash", line_color="red", line_width=0.5, row=row_num, col=2)
        fig.add_hline(y=-2, line_dash="dash", line_color="green", line_width=0.5, row=row_num, col=2)
        
        # Count signals
        num_signals = len(pair_data[abs(pair_data['ZSCORE']) > 2])
        print(f"   {pair['SECTOR_1']:12} ‚îÇ {symbol_1} ‚Üî {symbol_2} ‚îÇ Corr: {pair['CORRELATION']:.2f} ‚îÇ Signals: {num_signals}")

fig.update_layout(
    height=250 * len(best_by_sector),
    title_text="üî¨ Sector-by-Sector Pair Analysis",
    showlegend=True,
    legend=dict(orientation="h", yanchor="bottom", y=1.02)
)

st.plotly_chart(fig, use_container_width=True)


In [None]:
# Current Trading Opportunities - SAME SECTOR, HIGH CORRELATION PAIRS ONLY
# This is the corrected version that only shows legitimate pairs trading opportunities
# NOW USING ASOF JOIN for robust 20-day lookback!

current_opportunities_sql = f"""
WITH sectors AS (
    {SECTOR_PAIRS}
),
current_prices AS (
    SELECT DATE, SYMBOL, CLOSE
    FROM MARKET_DATA
),
historical_prices AS (
    SELECT DATE AS HIST_DATE, SYMBOL, CLOSE AS HIST_CLOSE
    FROM MARKET_DATA
),
-- Use ASOF JOIN for 20-day return (more robust than LAG)
daily_returns AS (
    SELECT 
        c.DATE,
        c.SYMBOL,
        c.CLOSE,
        (c.CLOSE - LAG(c.CLOSE) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE)) 
            / NULLIF(LAG(c.CLOSE) OVER (PARTITION BY c.SYMBOL ORDER BY c.DATE), 0) AS DAILY_RETURN,
        (c.CLOSE - h.HIST_CLOSE) / NULLIF(h.HIST_CLOSE, 0) AS RETURN_20D
    FROM current_prices c
    ASOF JOIN historical_prices h
        MATCH_CONDITION (DATEADD('day', -28, c.DATE) >= h.HIST_DATE)
        ON c.SYMBOL = h.SYMBOL
),
-- First filter: Same sector + high correlation pairs only
pair_correlations AS (
    SELECT 
        a.SYMBOL AS SYMBOL_1,
        b.SYMBOL AS SYMBOL_2,
        s1.SECTOR AS SECTOR,
        CORR(a.DAILY_RETURN, b.DAILY_RETURN) AS CORRELATION
    FROM daily_returns a
    JOIN daily_returns b ON a.DATE = b.DATE AND a.SYMBOL < b.SYMBOL
    JOIN sectors s1 ON a.SYMBOL = s1.SYMBOL
    JOIN sectors s2 ON b.SYMBOL = s2.SYMBOL
    WHERE a.DAILY_RETURN IS NOT NULL AND b.DAILY_RETURN IS NOT NULL
      AND s1.SECTOR = s2.SECTOR  -- Same sector only!
    GROUP BY a.SYMBOL, b.SYMBOL, s1.SECTOR
    HAVING CORR(a.DAILY_RETURN, b.DAILY_RETURN) >= 0.65  -- High correlation only!
),
pair_analysis AS (
    SELECT 
        a.DATE,
        pc.SECTOR,
        a.SYMBOL AS SYMBOL_1,
        b.SYMBOL AS SYMBOL_2,
        pc.CORRELATION,
        a.RETURN_20D - b.RETURN_20D AS SPREAD,
        AVG(a.RETURN_20D - b.RETURN_20D) OVER (
            PARTITION BY a.SYMBOL, b.SYMBOL ORDER BY a.DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) AS SPREAD_MEAN,
        STDDEV(a.RETURN_20D - b.RETURN_20D) OVER (
            PARTITION BY a.SYMBOL, b.SYMBOL ORDER BY a.DATE ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
        ) AS SPREAD_STD
    FROM daily_returns a
    JOIN daily_returns b ON a.DATE = b.DATE AND a.SYMBOL < b.SYMBOL
    JOIN pair_correlations pc ON a.SYMBOL = pc.SYMBOL_1 AND b.SYMBOL = pc.SYMBOL_2
    WHERE a.RETURN_20D IS NOT NULL AND b.RETURN_20D IS NOT NULL
),
latest_signals AS (
    SELECT 
        SECTOR,
        SYMBOL_1,
        SYMBOL_2,
        CORRELATION,
        SPREAD,
        (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) AS ZSCORE,
        CASE 
            WHEN (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) > 2 THEN 
                'üî¥ SHORT ' || SYMBOL_1 || ', LONG ' || SYMBOL_2
            WHEN (SPREAD - SPREAD_MEAN) / NULLIF(SPREAD_STD, 0) < -2 THEN 
                'üü¢ LONG ' || SYMBOL_1 || ', SHORT ' || SYMBOL_2
            ELSE '‚ö™ No Signal'
        END AS TRADE_ACTION
    FROM pair_analysis
    WHERE DATE = (SELECT MAX(DATE) FROM pair_analysis)
      AND SPREAD_STD > 0
)
SELECT 
    SECTOR,
    SYMBOL_1,
    SYMBOL_2,
    ROUND(CORRELATION, 2) AS CORR,
    ROUND(SPREAD * 100, 2) AS SPREAD_PCT,
    ROUND(ZSCORE, 2) AS ZSCORE,
    TRADE_ACTION
FROM latest_signals
WHERE ABS(ZSCORE) > 1.5
ORDER BY ABS(ZSCORE) DESC
LIMIT 15
"""

print("üéØ CURRENT PAIRS TRADING OPPORTUNITIES (Same-Sector, High-Corr Only)")
print("=" * 85)
print("‚ö†Ô∏è  FILTERS: Same sector + Correlation >= 0.65")
print("=" * 85)

opportunities_df = session.sql(current_opportunities_sql).to_pandas()

if len(opportunities_df) > 0:
    for _, row in opportunities_df.iterrows():
        zscore_emoji = "üî•" if abs(row['ZSCORE']) > 2.5 else "üìä"
        signal_color = "üî¥" if row['ZSCORE'] > 0 else "üü¢"
        print(f"{zscore_emoji} {row['SECTOR']:12} ‚îÇ {row['SYMBOL_1']:5}‚Üî{row['SYMBOL_2']:5} ‚îÇ Corr: {row['CORR']:.2f} ‚îÇ Z: {row['ZSCORE']:+6.2f} ‚îÇ {row['TRADE_ACTION']}")
    
    print(f"\nüìà SUMMARY:")
    strong_signals = len(opportunities_df[abs(opportunities_df['ZSCORE']) > 2])
    print(f"   ‚Ä¢ Strong signals (|Z| > 2): {strong_signals}")
    print(f"   ‚Ä¢ Moderate signals (|Z| > 1.5): {len(opportunities_df)}")
    print(f"   ‚Ä¢ All pairs are same-sector with correlation >= 0.65 ‚úÖ")
else:
    print("   ‚úÖ No divergent pairs found among same-sector correlated stocks")
    print("   (All sector peers are moving together as expected)")


In [None]:
# Factor-level Information Coefficients
factor_ic_sql = """
WITH signals_with_returns AS (
    SELECT 
        a.*,
        (LEAD(CLOSE, 5) OVER (PARTITION BY SYMBOL ORDER BY DATE) - CLOSE) / CLOSE AS FORWARD_RETURN_5D
    FROM ALPHA_SIGNALS a
)
SELECT 
    'Momentum' AS FACTOR, ROUND(CORR(MOMENTUM_RANK, FORWARD_RETURN_5D), 4) AS IC
FROM signals_with_returns WHERE FORWARD_RETURN_5D IS NOT NULL
UNION ALL
SELECT 'Volatility', ROUND(CORR(VOLATILITY_RANK, FORWARD_RETURN_5D), 4)
FROM signals_with_returns WHERE FORWARD_RETURN_5D IS NOT NULL
UNION ALL
SELECT 'Sentiment', ROUND(CORR(SENTIMENT_RANK, FORWARD_RETURN_5D), 4)
FROM signals_with_returns WHERE FORWARD_RETURN_5D IS NOT NULL
UNION ALL
SELECT 'Composite', ROUND(CORR(COMPOSITE_ALPHA, FORWARD_RETURN_5D), 4)
FROM signals_with_returns WHERE FORWARD_RETURN_5D IS NOT NULL
"""

print("üìä FACTOR INFORMATION COEFFICIENTS")
print("(Higher IC = better predictive power)")
print("=" * 60)
session.sql(factor_ic_sql).show()


---
## 8Ô∏è‚É£ Production Deployment

Deploy this as a scheduled production pipeline using Snowflake Tasks and Stored Procedures.


In [None]:
# Create a UDF for composite alpha calculation (using IC-optimized weights)
from snowflake.snowpark.functions import udf

@udf(name="COMPOSITE_ALPHA_UDF", is_permanent=False, replace=True)
def composite_alpha_udf(
    momentum_rank: float, 
    volatility_rank: float, 
    sentiment_rank: float
) -> float:
    """Calculate IC-optimized composite alpha score.
    
    Weights are based on IC analysis:
    - Momentum: -0.20 (flipped - mean reversion)
    - Volatility: -0.30 (flipped - high vol outperforming)
    - Sentiment: +0.50 (most predictive factor)
    """
    m = momentum_rank if momentum_rank is not None else 0.5
    v = volatility_rank if volatility_rank is not None else 0.5
    s = sentiment_rank if sentiment_rank is not None else 0.5
    # IC-Optimized weights (negative = flipped signal)
    return float(m * (-0.20) + v * (-0.30) + s * 0.50)

print("‚úÖ Created COMPOSITE_ALPHA_UDF with IC-optimized weights")
print("   Momentum:   -20% (flipped)")
print("   Volatility: -30% (flipped)")
print("   Sentiment:  +50%")


In [None]:
# Example: Create a stored procedure for daily alpha refresh
# NOTE: In production, consider using ASOF JOIN instead of LAG(20) for robustness
# See momentum factor cell for the ASOF JOIN pattern that handles data gaps
stored_proc_sql = """
-- Stored Procedure for Daily Alpha Calculation
-- Run this in your Snowflake console to create the procedure
-- TIP: For production, use ASOF JOIN instead of LAG(20) for robust lookbacks

CREATE OR REPLACE PROCEDURE CALCULATE_DAILY_ALPHA()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Step 1: Refresh Momentum Factor
    CREATE OR REPLACE TABLE MOMENTUM_FACTOR AS
    SELECT DATE, SYMBOL, CLOSE, MOMENTUM_20D, MOMENTUM_RANK, MOMENTUM_SIGNAL
    FROM (
        SELECT 
            DATE, SYMBOL, CLOSE,
            (CLOSE - LAG(CLOSE, 20) OVER (PARTITION BY SYMBOL ORDER BY DATE)) 
                / NULLIF(LAG(CLOSE, 20) OVER (PARTITION BY SYMBOL ORDER BY DATE), 0) AS MOMENTUM_20D,
            PERCENT_RANK() OVER (PARTITION BY DATE ORDER BY MOMENTUM_20D) AS MOMENTUM_RANK,
            CASE WHEN MOMENTUM_RANK > 0.8 THEN 1 WHEN MOMENTUM_RANK < 0.2 THEN -1 ELSE 0 END AS MOMENTUM_SIGNAL
        FROM MARKET_DATA
    );
    
    -- Step 2: Combine factors and create signals
    -- ... (similar logic to our composite query)
    
    RETURN 'Alpha calculation completed at ' || CURRENT_TIMESTAMP();
END;
$$;

-- Schedule as a task (runs daily at 6 AM ET)
CREATE OR REPLACE TASK DAILY_ALPHA_TASK
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
    CALL CALCULATE_DAILY_ALPHA();

-- Resume the task (run once to enable)
-- ALTER TASK DAILY_ALPHA_TASK RESUME;
"""

print("üìã PRODUCTION DEPLOYMENT SQL")
print("=" * 60)
print("Copy the following SQL to create scheduled alpha calculations:")
print(stored_proc_sql)


---
## üéì Summary

This notebook demonstrated how Snowflake enables hedge funds to **find alpha**:

| Capability | Snowflake Feature | Benefit |
|------------|-------------------|---------|
| **Unified Data** | Single platform | No data silos, faster insights |
| **Alternative Data** | Marketplace | Instant access to sentiment, news, web traffic |
| **Alpha Computation** | Snowpark Python | ML/analytics without data movement |
| **Scalability** | Elastic compute | Process billions of rows in minutes |
| **Production ML** | UDFs + Tasks | Deploy factors as scalable pipelines |
| **Governance** | Row-level security | Secure data sharing with LPs |

### üöÄ Next Steps
1. Connect real market data via **Snowpipe**
2. Subscribe to **Marketplace** sentiment data (RavenPack, FactSet)
3. Tune factor weights with **ML optimization**
4. Deploy as **scheduled production pipeline**
5. Set up **LP reporting shares**


In [None]:
# Show all created tables
print("üì¶ TABLES CREATED IN THIS DEMO")
print("=" * 60)
session.sql("SHOW TABLES IN SCHEMA HEDGE_FUND_DEMO.ANALYTICS").show()

print("\nüéâ Demo complete! Upload this notebook to Snowflake Notebooks to run it.")
