# Exploratory Data Analysis for Statistical Arbitrage System

**Purpose**: Comprehensive exploratory data analysis (EDA) for financial market data to understand data characteristics, identify patterns, and generate hypotheses for statistical arbitrage strategies.

**Created**: June 25, 2025  
**Author**: Statistical Arbitrage Research Team  
**Version**: 1.0

---

## Overview

This notebook performs initial data exploration and quality assessment for the statistical arbitrage system. We'll examine:

1. **Data Loading**: Import various raw and processed datasets
2. **Statistical Summaries**: Descriptive statistics and data quality checks
3. **Data Visualization**: Charts and plots to understand distributions
4. **Correlation Analysis**: Relationships between variables
5. **Hypothesis Testing**: Initial statistical tests and model exploration
6. **Key Findings**: Documentation of insights and next steps

---

**Note**: This is an exploratory notebook focused on discovery and understanding. For production models, refer to the structured notebooks in the `model_experimentation` directory.

## 1. Setup and Library Imports

Import all necessary libraries for data analysis, visualization, and statistical testing.

In [None]:
# Core data manipulation and analysis libraries
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# Visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff
from plotly.subplots import make_subplots

# Statistical libraries
import scipy.stats as stats
from scipy import signal
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.stats.diagnostic import acorr_ljungbox
import statsmodels.api as sm

# Financial data libraries
import yfinance as yf
import pandas_datareader as pdr

# Machine learning libraries
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

# Time series analysis
from datetime import datetime, timedelta
import pandas_ta as ta

# System libraries
import os
import sys
import pickle
from pathlib import Path

# Set up paths
project_root = Path.cwd().parent.parent
data_dir = project_root / 'data'
sys.path.append(str(project_root / 'src'))

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

print("✅ All libraries imported successfully!")
print(f"📁 Project root: {project_root}")
print(f"📊 Data directory: {data_dir}")

## 2. Load Raw and Processed Datasets

Load various financial datasets from different sources to understand data availability and quality.

In [None]:
def load_sample_data():
    """Load sample financial data for analysis"""
    # Define a list of sample stocks for pairs trading
    stocks = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'NVDA', 'META', 'NFLX']
    
    # Date range for analysis
    start_date = '2020-01-01'
    end_date = '2024-12-31'
    
    print(f"📥 Loading data for {len(stocks)} stocks from {start_date} to {end_date}")
    
    try:
        # Download stock data
        data = yf.download(stocks, start=start_date, end=end_date)
        print(f"✅ Successfully loaded data: {data.shape}")
        return data
    except Exception as e:
        print(f"❌ Error loading data: {e}")
        return None

def generate_synthetic_data():
    """Generate synthetic financial data for testing"""
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2024-12-31', freq='D')
    n_days = len(dates)
    
    # Generate correlated stock returns for pairs trading
    returns = np.random.multivariate_normal(
        mean=[0.0005, 0.0005],  # Small positive drift
        cov=[[0.0004, 0.0002],  # Correlation between assets
             [0.0002, 0.0004]], 
        size=n_days
    )
    
    # Convert to prices
    prices = np.cumprod(1 + returns, axis=0) * 100
    
    synthetic_data = pd.DataFrame(
        prices, 
        index=dates, 
        columns=['Stock_A', 'Stock_B']
    )
    
    print(f"🔧 Generated synthetic data: {synthetic_data.shape}")
    return synthetic_data

# Load real market data
market_data = load_sample_data()

# Generate synthetic data as fallback
synthetic_data = generate_synthetic_data()

# Check data availability
if market_data is not None:
    print(f"\n📊 Market Data Overview:")
    print(f"Shape: {market_data.shape}")
    print(f"Date range: {market_data.index[0]} to {market_data.index[-1]}")
    print(f"Columns: {market_data.columns.get_level_values(0).unique().tolist()}")
else:
    print("⚠️ Using synthetic data for analysis")
    market_data = synthetic_data

In [None]:
# Data preprocessing and initial inspection
def preprocess_market_data(data):
    """Clean and preprocess market data"""
    if data.columns.nlevels > 1:
        # Extract adjusted close prices for multi-level columns
        if 'Adj Close' in data.columns.get_level_values(0):
            prices = data['Adj Close'].copy()
        else:
            prices = data['Close'].copy()
    else:
        prices = data.copy()
    
    # Forward fill missing values
    prices = prices.fillna(method='ffill')
    
    # Calculate returns
    returns = prices.pct_change().dropna()
    
    # Calculate log returns
    log_returns = np.log(prices / prices.shift(1)).dropna()
    
    return {
        'prices': prices,
        'returns': returns,
        'log_returns': log_returns
    }

# Preprocess the data
processed_data = preprocess_market_data(market_data)
prices = processed_data['prices']
returns = processed_data['returns']
log_returns = processed_data['log_returns']

print("📈 Data Preprocessing Complete:")
print(f"Prices shape: {prices.shape}")
print(f"Returns shape: {returns.shape}")
print(f"Log returns shape: {log_returns.shape}")

# Display first few rows
print(f"\n📋 First 5 rows of prices:")
print(prices.head())

print(f"\n📋 Last 5 rows of returns:")
print(returns.tail())

## 3. Statistical Summaries of Data

Generate comprehensive descriptive statistics to understand data characteristics, distributions, and quality.

In [None]:
def generate_comprehensive_stats(data, data_type="returns"):
    """Generate comprehensive statistical summary"""
    stats_dict = {}
    
    for column in data.columns:
        series = data[column].dropna()
        
        stats_dict[column] = {
            'count': len(series),
            'mean': series.mean(),
            'std': series.std(),
            'min': series.min(),
            '25%': series.quantile(0.25),
            '50%': series.median(),
            '75%': series.quantile(0.75),
            'max': series.max(),
            'skewness': series.skew(),
            'kurtosis': series.kurtosis(),
            'jarque_bera': stats.jarque_bera(series)[1],  # p-value
            'missing_values': data[column].isna().sum(),
            'missing_pct': (data[column].isna().sum() / len(data)) * 100
        }
        
        # Additional financial metrics for returns
        if data_type == "returns":
            stats_dict[column].update({
                'sharpe_ratio': series.mean() / series.std() * np.sqrt(252),
                'var_95': series.quantile(0.05),
                'var_99': series.quantile(0.01),
                'max_drawdown': (series.cumsum() - series.cumsum().expanding().max()).min()
            })
    
    return pd.DataFrame(stats_dict).round(6)

# Generate statistics for prices
print("📊 PRICE STATISTICS")
print("=" * 50)
price_stats = generate_comprehensive_stats(prices, "prices")
print(price_stats.T.head(10))

print("\n📊 RETURNS STATISTICS")
print("=" * 50)
return_stats = generate_comprehensive_stats(returns, "returns")
print(return_stats.T)

print("\n📊 LOG RETURNS STATISTICS")
print("=" * 50)
log_return_stats = generate_comprehensive_stats(log_returns, "returns")
print(log_return_stats.T)

In [None]:
# Data Quality Assessment
def assess_data_quality(data, name="Dataset"):
    """Perform comprehensive data quality assessment"""
    print(f"\n🔍 DATA QUALITY ASSESSMENT: {name}")
    print("=" * 60)
    
    # Basic info
    print(f"Shape: {data.shape}")
    print(f"Memory usage: {data.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"Date range: {data.index[0]} to {data.index[-1]}")
    print(f"Total trading days: {len(data)}")
    
    # Missing values analysis
    missing_data = data.isnull().sum()
    missing_pct = (missing_data / len(data)) * 100
    
    print(f"\n📉 Missing Values:")
    for col in data.columns:
        if missing_data[col] > 0:
            print(f"  {col}: {missing_data[col]} ({missing_pct[col]:.2f}%)")
        else:
            print(f"  {col}: No missing values ✅")
    
    # Outlier detection (using IQR method)
    print(f"\n🎯 Outlier Detection (IQR method):")
    for col in data.columns:
        Q1 = data[col].quantile(0.25)
        Q3 = data[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = data[(data[col] < lower_bound) | (data[col] > upper_bound)][col]
        print(f"  {col}: {len(outliers)} outliers ({len(outliers)/len(data)*100:.2f}%)")
    
    # Data consistency checks
    print(f"\n🔧 Data Consistency:")
    print(f"  Duplicated timestamps: {data.index.duplicated().sum()}")
    print(f"  Sorted chronologically: {data.index.is_monotonic_increasing}")
    
    return {
        'missing_data': missing_data,
        'missing_pct': missing_pct,
        'outliers_detected': True
    }

# Assess quality for each dataset
price_quality = assess_data_quality(prices, "Stock Prices")
return_quality = assess_data_quality(returns, "Stock Returns")

## 4. Data Visualization

Create comprehensive visualizations to understand data distributions, trends, and potential anomalies.

In [None]:
# Price Trends Visualization
def plot_price_trends(data, title="Stock Price Trends"):
    """Plot price trends for all stocks"""
    fig = make_subplots(
        rows=2, cols=1,
        subplot_titles=['Normalized Prices (Base=100)', 'Absolute Prices'],
        vertical_spacing=0.1
    )
    
    # Normalize prices to base 100
    normalized_data = data.div(data.iloc[0]) * 100
    
    colors = px.colors.qualitative.Set1
    
    for i, col in enumerate(data.columns):
        color = colors[i % len(colors)]
        
        # Normalized prices
        fig.add_trace(
            go.Scatter(
                x=normalized_data.index,
                y=normalized_data[col],
                name=f"{col} (Norm)",
                line=dict(color=color),
                showlegend=True
            ),
            row=1, col=1
        )
        
        # Absolute prices
        fig.add_trace(
            go.Scatter(
                x=data.index,
                y=data[col],
                name=f"{col} (Abs)",
                line=dict(color=color, dash='dash'),
                showlegend=True
            ),
            row=2, col=1
        )
    
    fig.update_layout(
        title=title,
        height=800,
        hovermode='x unified'
    )
    
    fig.update_xaxes(title_text="Date", row=2, col=1)
    fig.update_yaxes(title_text="Normalized Price", row=1, col=1)
    fig.update_yaxes(title_text="Absolute Price ($)", row=2, col=1)
    
    return fig

# Plot price trends
price_fig = plot_price_trends(prices)
price_fig.show()

print("📈 Price trends plotted successfully!")

In [None]:
# Returns Distribution Analysis
def plot_returns_distribution(returns_data, title="Returns Distribution Analysis"):
    """Create comprehensive returns distribution plots"""
    n_stocks = len(returns_data.columns)
    n_cols = min(3, n_stocks)
    n_rows = (n_stocks + n_cols - 1) // n_cols
    
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5*n_rows))
    if n_rows == 1:
        axes = [axes] if n_stocks == 1 else axes
    else:
        axes = axes.flatten()
    
    for i, col in enumerate(returns_data.columns):
        data = returns_data[col].dropna()
        
        # Create histogram with normal distribution overlay
        axes[i].hist(data, bins=50, density=True, alpha=0.7, color='skyblue', edgecolor='black')
        
        # Fit normal distribution
        mu, sigma = stats.norm.fit(data)
        x = np.linspace(data.min(), data.max(), 100)
        axes[i].plot(x, stats.norm.pdf(x, mu, sigma), 'r-', linewidth=2, label=f'Normal (μ={mu:.4f}, σ={sigma:.4f})')
        
        # Add statistics to plot
        axes[i].axvline(data.mean(), color='red', linestyle='--', alpha=0.8, label=f'Mean: {data.mean():.4f}')
        axes[i].axvline(data.median(), color='green', linestyle='--', alpha=0.8, label=f'Median: {data.median():.4f}')
        
        axes[i].set_title(f'{col} - Returns Distribution')
        axes[i].set_xlabel('Daily Returns')
        axes[i].set_ylabel('Density')
        axes[i].legend()
        axes[i].grid(True, alpha=0.3)
    
    # Hide unused subplots
    for i in range(n_stocks, len(axes)):
        axes[i].set_visible(False)
    
    plt.tight_layout()
    plt.suptitle(title, y=1.02, fontsize=16)
    plt.show()

# Plot returns distributions
plot_returns_distribution(returns, "Daily Returns Distribution")

# Summary statistics in a nice format
print("\n📊 RETURNS DISTRIBUTION SUMMARY")
print("=" * 50)
for col in returns.columns:
    data = returns[col].dropna()
    jb_stat, jb_pvalue = stats.jarque_bera(data)
    
    print(f"\n{col}:")
    print(f"  Mean: {data.mean():.6f}")
    print(f"  Std Dev: {data.std():.6f}")
    print(f"  Skewness: {data.skew():.4f}")
    print(f"  Kurtosis: {data.kurtosis():.4f}")
    print(f"  Jarque-Bera p-value: {jb_pvalue:.6f}")
    print(f"  Normal distribution: {'No ❌' if jb_pvalue < 0.05 else 'Yes ✅'}")

In [None]:
# Volatility Analysis and Rolling Statistics
def plot_volatility_analysis(returns_data, window=30):
    """Plot rolling volatility and other time-varying statistics"""
    
    # Calculate rolling statistics
    rolling_mean = returns_data.rolling(window=window).mean()
    rolling_std = returns_data.rolling(window=window).std()
    rolling_vol = rolling_std * np.sqrt(252)  # Annualized volatility
    
    # Create subplots
    fig = make_subplots(
        rows=3, cols=1,
        subplot_titles=[
            f'Rolling {window}-Day Mean Returns',
            f'Rolling {window}-Day Volatility (Annualized)',
            'Cumulative Returns'
        ],
        vertical_spacing=0.08
    )
    
    colors = px.colors.qualitative.Set1
    
    for i, col in enumerate(returns_data.columns):
        color = colors[i % len(colors)]
        
        # Rolling mean
        fig.add_trace(
            go.Scatter(
                x=rolling_mean.index,
                y=rolling_mean[col],
                name=f"{col} Mean",
                line=dict(color=color),
                showlegend=True
            ),
            row=1, col=1
        )
        
        # Rolling volatility
        fig.add_trace(
            go.Scatter(
                x=rolling_vol.index,
                y=rolling_vol[col],
                name=f"{col} Vol",
                line=dict(color=color),
                showlegend=False
            ),
            row=2, col=1
        )
        
        # Cumulative returns
        cum_returns = (1 + returns_data[col]).cumprod() - 1
        fig.add_trace(
            go.Scatter(
                x=cum_returns.index,
                y=cum_returns,
                name=f"{col} Cum",
                line=dict(color=color),
                showlegend=False
            ),
            row=3, col=1
        )
    
    fig.update_layout(
        title="Volatility and Performance Analysis",
        height=900,
        hovermode='x unified'
    )
    
    fig.update_yaxes(title_text="Rolling Mean", row=1, col=1)
    fig.update_yaxes(title_text="Annualized Volatility", row=2, col=1)
    fig.update_yaxes(title_text="Cumulative Returns", row=3, col=1)
    fig.update_xaxes(title_text="Date", row=3, col=1)
    
    return fig

# Plot volatility analysis
vol_fig = plot_volatility_analysis(returns, window=30)
vol_fig.show()

# Calculate and display volatility statistics
print("\n📊 VOLATILITY STATISTICS")
print("=" * 50)
annual_vol = returns.std() * np.sqrt(252)
for col in returns.columns:
    vol = annual_vol[col]
    print(f"{col}: {vol:.2%} annual volatility")

print(f"\nAverage volatility: {annual_vol.mean():.2%}")
print(f"Volatility range: {annual_vol.min():.2%} - {annual_vol.max():.2%}")

## 5. Correlation Analysis

Analyze relationships between different assets to identify potential pairs trading opportunities and portfolio diversification benefits.

In [None]:
# Correlation Analysis
def analyze_correlations(data, title="Correlation Analysis"):
    """Comprehensive correlation analysis"""
    
    # Calculate correlation matrices
    price_corr = prices.corr()
    return_corr = returns.corr()
    
    # Create correlation heatmaps
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    # Price correlations
    sns.heatmap(price_corr, annot=True, cmap='RdBu_r', center=0, 
                square=True, ax=axes[0], cbar_kws={'label': 'Correlation'})
    axes[0].set_title('Price Correlations')
    
    # Return correlations
    sns.heatmap(return_corr, annot=True, cmap='RdBu_r', center=0, 
                square=True, ax=axes[1], cbar_kws={'label': 'Correlation'})
    axes[1].set_title('Return Correlations')
    
    plt.tight_layout()
    plt.show()
    
    return price_corr, return_corr

# Perform correlation analysis
price_corr, return_corr = analyze_correlations(returns)

# Find highly correlated pairs for potential pairs trading
def find_correlation_pairs(corr_matrix, threshold=0.7):
    """Find highly correlated asset pairs"""
    pairs = []
    for i in range(len(corr_matrix.columns)):
        for j in range(i+1, len(corr_matrix.columns)):
            corr = corr_matrix.iloc[i, j]
            if abs(corr) >= threshold:
                pairs.append({
                    'asset1': corr_matrix.columns[i],
                    'asset2': corr_matrix.columns[j],
                    'correlation': corr
                })
    return pd.DataFrame(pairs).sort_values('correlation', key=abs, ascending=False)

print("🔗 HIGHLY CORRELATED PAIRS (|correlation| >= 0.7)")
print("=" * 60)
corr_pairs = find_correlation_pairs(return_corr, threshold=0.7)
if len(corr_pairs) > 0:
    print(corr_pairs)
else:
    print("No pairs found with correlation >= 0.7")

# Rolling correlation analysis
def plot_rolling_correlations(data1, data2, window=60, title="Rolling Correlation"):
    """Plot rolling correlation between two assets"""
    if len(data1.columns) >= 2:
        asset1 = data1.columns[0]
        asset2 = data1.columns[1] if len(data1.columns) > 1 else data1.columns[0]
        
        rolling_corr = data1[asset1].rolling(window=window).corr(data1[asset2])
        
        fig = go.Figure()
        fig.add_trace(go.Scatter(
            x=rolling_corr.index,
            y=rolling_corr,
            mode='lines',
            name=f'{asset1} vs {asset2}',
            line=dict(color='blue', width=2)
        ))
        
        fig.add_hline(y=0.7, line_dash="dash", line_color="red", 
                     annotation_text="High Correlation (0.7)")
        fig.add_hline(y=-0.7, line_dash="dash", line_color="red")
        fig.add_hline(y=0, line_dash="dot", line_color="gray")
        
        fig.update_layout(
            title=f"{title} ({window}-day window)",
            xaxis_title="Date",
            yaxis_title="Rolling Correlation",
            hovermode='x'
        )
        
        return fig
    return None

# Plot rolling correlations for first two assets
if len(returns.columns) >= 2:
    rolling_corr_fig = plot_rolling_correlations(returns, returns, window=60)
    if rolling_corr_fig:
        rolling_corr_fig.show()

print(f"\n📊 CORRELATION SUMMARY")
print("=" * 30)
print(f"Average return correlation: {return_corr.values[np.triu_indices_from(return_corr.values, k=1)].mean():.4f}")
print(f"Max return correlation: {return_corr.values[np.triu_indices_from(return_corr.values, k=1)].max():.4f}")
print(f"Min return correlation: {return_corr.values[np.triu_indices_from(return_corr.values, k=1)].min():.4f}")

## 6. Test Simple Hypotheses

Formulate and test basic statistical hypotheses about the data to validate assumptions for trading strategies.

In [None]:
# Hypothesis Testing Framework
def test_statistical_hypotheses(data, alpha=0.05):
    """Test various statistical hypotheses relevant to trading"""
    
    results = {}
    
    for column in data.columns:
        series = data[column].dropna()
        column_results = {}
        
        # H0: Returns are normally distributed
        jb_stat, jb_pvalue = stats.jarque_bera(series)
        column_results['normality'] = {
            'test': 'Jarque-Bera',
            'statistic': jb_stat,
            'p_value': jb_pvalue,
            'reject_h0': jb_pvalue < alpha,
            'interpretation': 'Returns are NOT normally distributed' if jb_pvalue < alpha else 'Returns are normally distributed'
        }
        
        # H0: Mean return = 0 (no drift)
        t_stat, t_pvalue = stats.ttest_1samp(series, 0)
        column_results['zero_mean'] = {
            'test': 'One-sample t-test',
            'statistic': t_stat,
            'p_value': t_pvalue,
            'reject_h0': t_pvalue < alpha,
            'interpretation': 'Mean return significantly different from 0' if t_pvalue < alpha else 'Mean return not significantly different from 0'
        }
        
        # H0: No autocorrelation (returns are independent)
        try:
            ljung_stat, ljung_pvalue = acorr_ljungbox(series, lags=10, return_df=False)
            column_results['autocorrelation'] = {
                'test': 'Ljung-Box',
                'statistic': ljung_stat[9],  # 10th lag
                'p_value': ljung_pvalue[9],
                'reject_h0': ljung_pvalue[9] < alpha,
                'interpretation': 'Significant autocorrelation detected' if ljung_pvalue[9] < alpha else 'No significant autocorrelation'
            }
        except:
            column_results['autocorrelation'] = {'test': 'Ljung-Box', 'error': 'Could not compute'}
        
        # H0: Data is stationary
        try:
            adf_stat, adf_pvalue, _, _, adf_critical, _ = adfuller(series)
            column_results['stationarity'] = {
                'test': 'Augmented Dickey-Fuller',
                'statistic': adf_stat,
                'p_value': adf_pvalue,
                'critical_values': adf_critical,
                'reject_h0': adf_pvalue < alpha,
                'interpretation': 'Series is stationary' if adf_pvalue < alpha else 'Series has unit root (non-stationary)'
            }
        except:
            column_results['stationarity'] = {'test': 'ADF', 'error': 'Could not compute'}
        
        results[column] = column_results
    
    return results

# Run hypothesis tests
print("🧪 STATISTICAL HYPOTHESIS TESTING")
print("=" * 50)

hypothesis_results = test_statistical_hypotheses(returns)

for asset, tests in hypothesis_results.items():
    print(f"\n📊 {asset}:")
    print("-" * 30)
    
    for test_name, result in tests.items():
        if 'error' not in result:
            print(f"{test_name.title()} Test ({result['test']}):")
            print(f"  Statistic: {result['statistic']:.4f}")
            print(f"  P-value: {result['p_value']:.6f}")
            print(f"  Result: {result['interpretation']}")
            print()

# Cointegration test for pairs trading
def test_cointegration(data, alpha=0.05):
    """Test for cointegration between asset pairs"""
    print(f"\n🔗 COINTEGRATION TESTING")
    print("=" * 40)
    
    cointegration_results = []
    
    for i in range(len(data.columns)):
        for j in range(i+1, len(data.columns)):
            asset1, asset2 = data.columns[i], data.columns[j]
            
            try:
                # Cointegration test
                score, pvalue, crit_values = coint(data[asset1], data[asset2])
                
                result = {
                    'asset1': asset1,
                    'asset2': asset2,
                    'score': score,
                    'p_value': pvalue,
                    'critical_values': crit_values,
                    'cointegrated': pvalue < alpha
                }
                
                cointegration_results.append(result)
                
                print(f"{asset1} - {asset2}:")
                print(f"  Score: {score:.4f}")
                print(f"  P-value: {pvalue:.4f}")
                print(f"  Cointegrated: {'Yes ✅' if pvalue < alpha else 'No ❌'}")
                print()
                
            except Exception as e:
                print(f"{asset1} - {asset2}: Error - {e}")
    
    return cointegration_results

# Test cointegration if we have enough assets
if len(prices.columns) >= 2:
    coint_results = test_cointegration(prices)
else:
    print("Need at least 2 assets for cointegration testing")