# Financial ML System - Data Pipeline

Version: 1.0.0

This notebook handles data ingestion, validation, and initial processing.

## 1. Setup

In [None]:
import sys
import warnings
from pathlib import Path
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns

warnings.filterwarnings('ignore')

PROJECT_ROOT = Path('/content/financial-ml-system')
sys.path.insert(0, str(PROJECT_ROOT))

from src.utils.constants import DATA_DIR, TRADING_DAYS_PER_YEAR
from src.utils.config_loader import config
from src.utils.helpers import calculate_returns, calculate_sharpe_ratio, calculate_max_drawdown

print("Setup complete")

## 2. Data Acquisition Module

In [None]:
class DataFetcher:
    """Fetch market data from Yahoo Finance."""
    
    def __init__(self, cache_dir: Path = None):
        self.cache_dir = cache_dir or DATA_DIR / 'raw'
        self.cache_dir.mkdir(parents=True, exist_ok=True)
    
    def fetch(self, ticker: str, start_date: str, end_date: str = None) -> pd.DataFrame:
        """Fetch OHLCV data for a ticker."""
        if end_date is None:
            end_date = datetime.now().strftime('%Y-%m-%d')
        
        cache_file = self.cache_dir / f"{ticker}_{start_date}_{end_date}.csv"
        
        if cache_file.exists():
            print(f"Loading cached data for {ticker}")
            return pd.read_csv(cache_file, index_col=0, parse_dates=True)
        
        print(f"Fetching data for {ticker} from {start_date} to {end_date}")
        data = yf.download(ticker, start=start_date, end=end_date, progress=False)
        
        if data.empty:
            raise ValueError(f"No data returned for {ticker}")
        
        data.to_csv(cache_file)
        print(f"Cached data to {cache_file}")
        
        return data
    
    def fetch_multiple(self, tickers: list, start_date: str, end_date: str = None) -> dict:
        """Fetch data for multiple tickers."""
        return {ticker: self.fetch(ticker, start_date, end_date) for ticker in tickers}

print("DataFetcher class defined")

## 3. Data Validation Module

In [None]:
class DataValidator:
    """Validate market data quality."""
    
    @staticmethod
    def validate(data: pd.DataFrame) -> dict:
        """Run validation checks on market data."""
        results = {
            'missing_values': data.isnull().sum().to_dict(),
            'total_rows': len(data),
            'date_range': (data.index.min(), data.index.max()),
            'negative_prices': (data[['Open', 'High', 'Low', 'Close']] < 0).sum().to_dict(),
            'zero_volume': (data['Volume'] == 0).sum(),
            'high_low_check': (data['High'] < data['Low']).sum(),
        }
        return results
    
    @staticmethod
    def print_validation_report(results: dict):
        """Print validation results."""
        print("Data Validation Report")
        print("-" * 50)
        print(f"Total Rows: {results['total_rows']}")
        print(f"Date Range: {results['date_range'][0]} to {results['date_range'][1]}")
        print(f"\nMissing Values:")
        for col, count in results['missing_values'].items():
            if count > 0:
                print(f"  {col}: {count}")
        print(f"\nZero Volume Days: {results['zero_volume']}")
        print(f"High < Low Issues: {results['high_low_check']}")
        print("-" * 50)

print("DataValidator class defined")

## 4. Data Cleaning Module

In [None]:
class DataCleaner:
    """Clean and prepare market data."""
    
    @staticmethod
    def clean(data: pd.DataFrame) -> pd.DataFrame:
        """Clean market data."""
        df = data.copy()
        
        # Forward fill missing values
        df = df.fillna(method='ffill')
        
        # Remove any remaining NaN rows
        df = df.dropna()
        
        # Ensure positive prices
        price_cols = ['Open', 'High', 'Low', 'Close', 'Adj Close']
        for col in price_cols:
            if col in df.columns:
                df = df[df[col] > 0]
        
        # Sort by date
        df = df.sort_index()
        
        return df
    
    @staticmethod
    def add_basic_features(data: pd.DataFrame) -> pd.DataFrame:
        """Add basic derived features."""
        df = data.copy()
        
        # Daily returns
        df['Returns'] = df['Close'].pct_change()
        
        # Log returns
        df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
        
        # Price range
        df['Range'] = df['High'] - df['Low']
        df['Range_Pct'] = df['Range'] / df['Close']
        
        # Gap
        df['Gap'] = df['Open'] - df['Close'].shift(1)
        df['Gap_Pct'] = df['Gap'] / df['Close'].shift(1)
        
        return df

print("DataCleaner class defined")

## 5. Fetch Data

In [None]:
# Configuration
ticker = config.get('data.default_ticker', 'SPY')
start_date = config.get('data.start_date', '2018-01-01')

# Fetch data
fetcher = DataFetcher()
raw_data = fetcher.fetch(ticker, start_date)

print(f"\nFetched {len(raw_data)} rows")
print(f"Columns: {list(raw_data.columns)}")
raw_data.head()

## 6. Validate Data

In [None]:
validator = DataValidator()
validation_results = validator.validate(raw_data)
validator.print_validation_report(validation_results)

## 7. Clean Data

In [None]:
cleaner = DataCleaner()
cleaned_data = cleaner.clean(raw_data)
cleaned_data = cleaner.add_basic_features(cleaned_data)

print(f"Cleaned data: {len(cleaned_data)} rows")
print(f"Columns: {list(cleaned_data.columns)}")
cleaned_data.head()

## 8. Exploratory Data Analysis

In [None]:
# Basic statistics
print("Summary Statistics")
print("="*50)
print(cleaned_data[['Open', 'High', 'Low', 'Close', 'Volume']].describe())

In [None]:
# Price chart
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Close price
axes[0].plot(cleaned_data.index, cleaned_data['Close'], label='Close Price', linewidth=1)
axes[0].set_title(f'{ticker} Price History', fontsize=14, fontweight='bold')
axes[0].set_ylabel('Price ($)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Volume
axes[1].bar(cleaned_data.index, cleaned_data['Volume'], alpha=0.5, label='Volume')
axes[1].set_title('Trading Volume', fontsize=14, fontweight='bold')
axes[1].set_ylabel('Volume')
axes[1].set_xlabel('Date')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(DATA_DIR.parent / 'results' / 'price_volume_chart.png', dpi=300, bbox_inches='tight')
plt.show()

print("Chart saved to results/price_volume_chart.png")

In [None]:
# Returns distribution
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Returns histogram
axes[0].hist(cleaned_data['Returns'].dropna(), bins=100, alpha=0.7, edgecolor='black')
axes[0].set_title('Daily Returns Distribution', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Returns')
axes[0].set_ylabel('Frequency')
axes[0].axvline(x=0, color='red', linestyle='--', linewidth=1)
axes[0].grid(True, alpha=0.3)

# Q-Q plot
from scipy import stats
stats.probplot(cleaned_data['Returns'].dropna(), dist="norm", plot=axes[1])
axes[1].set_title('Q-Q Plot (Returns vs Normal)', fontsize=14, fontweight='bold')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(DATA_DIR.parent / 'results' / 'returns_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

print("Chart saved to results/returns_distribution.png")

## 9. Performance Metrics

In [None]:
# Calculate performance metrics
returns = cleaned_data['Returns'].dropna()

metrics = {
    'Total Return': (cleaned_data['Close'].iloc[-1] / cleaned_data['Close'].iloc[0] - 1) * 100,
    'Annualized Return': returns.mean() * TRADING_DAYS_PER_YEAR * 100,
    'Annualized Volatility': returns.std() * np.sqrt(TRADING_DAYS_PER_YEAR) * 100,
    'Sharpe Ratio': calculate_sharpe_ratio(returns),
    'Max Drawdown': calculate_max_drawdown(cleaned_data['Close']) * 100,
    'Positive Days': (returns > 0).sum() / len(returns) * 100,
    'Negative Days': (returns < 0).sum() / len(returns) * 100,
}

print("\nPerformance Metrics")
print("="*50)
for metric, value in metrics.items():
    print(f"{metric:<25} {value:>10.2f}{'%' if 'Return' in metric or 'Days' in metric or 'Drawdown' in metric or 'Volatility' in metric else ''}")
print("="*50)

## 10. Save Processed Data

In [None]:
# Save to processed data directory
processed_dir = DATA_DIR / 'processed'
processed_dir.mkdir(parents=True, exist_ok=True)

output_file = processed_dir / f"{ticker}_processed.csv"
cleaned_data.to_csv(output_file)

print(f"Processed data saved to: {output_file}")
print(f"Shape: {cleaned_data.shape}")
print(f"Date range: {cleaned_data.index.min()} to {cleaned_data.index.max()}")

## 11. Create Data Module

In [None]:
# Save data module to src/data/
data_module_code = '''"""Data ingestion and processing module."""

import pandas as pd
import yfinance as yf
from pathlib import Path
from datetime import datetime
import numpy as np

class DataFetcher:
    """Fetch market data from Yahoo Finance."""
    
    def __init__(self, cache_dir: Path = None):
        self.cache_dir = cache_dir
        if cache_dir:
            self.cache_dir.mkdir(parents=True, exist_ok=True)
    
    def fetch(self, ticker: str, start_date: str, end_date: str = None) -> pd.DataFrame:
        if end_date is None:
            end_date = datetime.now().strftime('%Y-%m-%d')
        
        if self.cache_dir:
            cache_file = self.cache_dir / f"{ticker}_{start_date}_{end_date}.csv"
            if cache_file.exists():
                return pd.read_csv(cache_file, index_col=0, parse_dates=True)
        
        data = yf.download(ticker, start=start_date, end=end_date, progress=False)
        
        if data.empty:
            raise ValueError(f"No data returned for {ticker}")
        
        if self.cache_dir:
            data.to_csv(cache_file)
        
        return data

class DataCleaner:
    """Clean and prepare market data."""
    
    @staticmethod
    def clean(data: pd.DataFrame) -> pd.DataFrame:
        df = data.copy()
        df = df.fillna(method='ffill').dropna()
        
        price_cols = ['Open', 'High', 'Low', 'Close', 'Adj Close']
        for col in price_cols:
            if col in df.columns:
                df = df[df[col] > 0]
        
        return df.sort_index()
    
    @staticmethod
    def add_basic_features(data: pd.DataFrame) -> pd.DataFrame:
        df = data.copy()
        df['Returns'] = df['Close'].pct_change()
        df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
        df['Range'] = df['High'] - df['Low']
        df['Range_Pct'] = df['Range'] / df['Close']
        return df
'''

with open(PROJECT_ROOT / 'src' / 'data' / 'ingestion.py', 'w') as f:
    f.write(data_module_code)

print("Created: src/data/ingestion.py")

## Data Pipeline Complete

Next: Open `02_feature_engineering.ipynb`