# Objective: 
- To gather raw financial data from various sources, perform initial cleaning, handle missing values, and transform it into a consistent, usable format ready for feature engineering. This notebook establishes the foundation of your data pipeline.

# Environment Setup & Data Paths

In [None]:
import os
import yaml
import sys
import subprocess

def install_libraries():
    """
    Install required libraries using pip.
    """
    required = ["pandas", "numpy", "yfinance", "pyyaml"]
    for pkg in required:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

def setup_data_paths():
    """
    Define and create data storage directories.
    """
    raw_path = "data/raw"
    processed_path = "data/processed"
    os.makedirs(raw_path, exist_ok=True)
    os.makedirs(processed_path, exist_ok=True)
    return raw_path, processed_path


def load_config(config_path="config/config.yaml"):
    """
    Load configuration from a YAML file.
    """
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    return config

if __name__ == "__main__":
    install_libraries()
    raw_path, processed_path = setup_data_paths()
    config = load_config()
    print(f"Config loaded: {config}")
    print(f"Raw data path: {raw_path}, Processed data path: {processed_path}")


# Raw Data Acquisition (Simulated/Proxy)

In [None]:
import os
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta

def fetch_yfinance_data(tickers, start, end, save_dir):
    """
    Fetch OHLCV and adjusted prices for given tickers using yfinance.
    Save each ticker's data as a CSV in save_dir.
    """
    os.makedirs(save_dir, exist_ok=True)
    for ticker in tickers:
        df = yf.download(ticker, start=start, end=end, auto_adjust=False)
        if df is not None and not df.empty:
            df.to_csv(os.path.join(save_dir, f"{ticker}_ohlcv.csv"))
            print(f"Saved {ticker} data to {save_dir}")
        else:
            print(f"No data found for {ticker} in the given date range.")

def generate_mock_index_constituents(index_name, tickers, save_dir):
    """
    Generate mock index constituent data.
    """
    os.makedirs(save_dir, exist_ok=True)
    df = pd.DataFrame({
        "index": [index_name] * len(tickers),
        "ticker": tickers,
        "weight": np.round(np.random.dirichlet(np.ones(len(tickers))), 4)
    })
    df.to_csv(os.path.join(save_dir, f"{index_name}_constituents.csv"), index=False)
    print(f"Saved {index_name} constituents to {save_dir}")

def generate_mock_etf_flows(etf_names, dates, save_dir):
    """
    Generate mock ETF flow data.
    """
    os.makedirs(save_dir, exist_ok=True)
    data = []
    for etf in etf_names:
        flows = np.random.normal(0, 10_000_000, size=len(dates))
        data.append(pd.DataFrame({
            "date": dates,
            "etf": etf,
            "flow_usd": flows
        }))
    df = pd.concat(data, ignore_index=True)
    df.to_csv(os.path.join(save_dir, "etf_flows.csv"), index=False)
    print(f"Saved ETF flows to {save_dir}")

def generate_mock_corporate_actions(tickers, dates, save_dir):
    """
    Generate mock corporate actions (splits, dividends).
    """
    os.makedirs(save_dir, exist_ok=True)
    actions = []
    for ticker in tickers:
        for date in np.random.choice(dates, size=2, replace=False):
            actions.append({
                "ticker": ticker,
                "date": date,
                "action": "split" if np.random.rand() > 0.5 else "dividend",
                "value": np.round(np.random.uniform(0.1, 2.0), 2)
            })
    df = pd.DataFrame(actions)
    df.to_csv(os.path.join(save_dir, "corporate_actions.csv"), index=False)
    print(f"Saved corporate actions to {save_dir}")

if __name__ == "__main__":
    # Example tickers and dates
    tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "META"]
    etfs = ["SPY", "IVV", "EFA"]
    indices = [("MSCI", tickers), ("S&P", tickers), ("FTSE", tickers)]
    start_date = (datetime.today() - timedelta(days=365)).strftime("%Y-%m-%d")
    end_date = datetime.today().strftime("%Y-%m-%d")
    dates = pd.date_range(start=start_date, end=end_date, freq="B").strftime("%Y-%m-%d")

    raw_dir = "data/raw"

    # Fetch historical market data
    fetch_yfinance_data(tickers, start_date, end_date, raw_dir)

    # Generate mock index constituents
    for index_name, index_tickers in indices:
        generate_mock_index_constituents(index_name, index_tickers, raw_dir)

    # Generate mock ETF flows
    generate_mock_etf_flows(etfs, dates, raw_dir)

    # Generate mock corporate actions
    generate_mock_corporate_actions(tickers, dates, raw_dir)

# Initial Data Cleaning & Standardization

In [None]:
import os
import pandas as pd
from src.data_ingestion.data_loader import load_csv_file

def clean_dataframe(df, date_cols=['date'], price_cols=['open', 'high', 'low', 'close', 'adj_close'], tz='UTC'):
    """
    Clean a DataFrame:
    - Standardize column names
    - Convert date columns to UTC datetime
    - Convert price columns to numeric
    - Fill missing values
    """
    # Standardize column names
    df.columns = [col.strip().lower().replace(' ', '_') for col in df.columns]

    # Convert date columns to datetime and localize to UTC
    for col in date_cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
            if df[col].dt.tz is None:
                df[col] = df[col].dt.tz_localize(tz)
            else:
                df[col] = df[col].dt.tz_convert(tz)

    # Convert price columns to numeric
    for col in price_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    # Sort by date and fill missing values
    if date_cols and date_cols[0] in df.columns:
        df = df.sort_values(by=date_cols[0])
    df = df.ffill().bfill()

    return df

def clean_all_raw_data(raw_data_dir="data/raw", cleaned_data_dir="data/processed"):
    """
    Load all CSV files from raw_data_dir using load_csv_file, clean, and save to cleaned_data_dir.
    """
    os.makedirs(cleaned_data_dir, exist_ok=True)
    for fname in os.listdir(raw_data_dir):
        if fname.endswith(".csv"):
            fpath = os.path.join(raw_data_dir, fname)
            df = load_csv_file(fpath)
            cleaned_df = clean_dataframe(df)
            cleaned_path = os.path.join(cleaned_data_dir, fname)
            cleaned_df.to_csv(cleaned_path, index=False)
            print(f"Cleaned and saved: {cleaned_path}")

if __name__ == "__main__":
    clean_all_raw_data()

# Data Integrity Checks

In [None]:
import os
import pandas as pd
import numpy as np
from src.data_ingestion.data_loader import load_csv_file

def check_for_outliers(df, price_cols=['open', 'high', 'low', 'close', 'adj_close'], volume_col='volume', z_thresh=6):
    """
    Identify extreme outliers in price and volume columns using z-score.
    """
    outlier_report = {}
    for col in price_cols + [volume_col]:
        if col in df.columns:
            vals = df[col].dropna()
            if len(vals) == 0:
                continue
            z_scores = np.abs((vals - vals.mean()) / (vals.std() + 1e-8))
            outliers = df[z_scores > z_thresh]
            outlier_report[col] = {
                "num_outliers": outliers.shape[0],
                "outlier_indices": outliers.index.tolist()
            }
    return outlier_report

def check_for_negatives(df, price_cols=['open', 'high', 'low', 'close', 'adj_close'], volume_col='volume'):
    """
    Check for negative prices or volumes.
    """
    negatives = {}
    for col in price_cols + [volume_col]:
        if col in df.columns:
            neg = df[df[col] < 0]
            negatives[col] = {
                "num_negative": neg.shape[0],
                "negative_indices": neg.index.tolist()
            }
    return negatives

def check_for_gaps(df, date_col='date'):
    """
    Check for unexpected gaps in the date column (assuming business day frequency).
    """
    if date_col not in df.columns:
        return {"gap_count": None, "gap_dates": []}
    df = df.sort_values(date_col)
    dates = pd.to_datetime(df[date_col])
    expected = pd.date_range(dates.min(), dates.max(), freq='B')
    missing = set(expected) - set(dates)
    return {
        "gap_count": len(missing),
        "gap_dates": sorted(list(missing))
    }

def summarize_coverage(df, ticker_col='ticker', date_col='date'):
    """
    Summarize data coverage: start/end dates, number of unique tickers.
    """
    summary = {}
    if date_col in df.columns:
        dates = pd.to_datetime(df[date_col])
        summary['start_date'] = str(dates.min())
        summary['end_date'] = str(dates.max())
    if ticker_col in df.columns:
        summary['num_unique_tickers'] = df[ticker_col].nunique()
        summary['unique_tickers'] = df[ticker_col].unique().tolist()
    return summary

def run_integrity_checks_on_dir(data_dir="data/processed"):
    """
    Run integrity checks on all CSVs in a directory.
    """
    for fname in os.listdir(data_dir):
        if fname.endswith(".csv"):
            fpath = os.path.join(data_dir, fname)
            df = load_csv_file(fpath)
            print(f"\n=== {fname} ===")
            outliers = check_for_outliers(df)
            negatives = check_for_negatives(df)
            gaps = check_for_gaps(df)
            coverage = summarize_coverage(df)
            print("Outliers:", outliers)
            print("Negatives:", negatives)
            print("Gaps:", gaps)
            print("Coverage:", coverage)

if __name__ == "__main__":
    run_integrity_checks_on_dir()

# Save Pre-processed Data

In [None]:
import os
from src.data_ingestion.data_loader import load_csv_file

def save_all_csvs_as_parquet(processed_csv_dir="data/processed", parquet_dir="data/processed"):
    """
    Load all CSV files from processed_csv_dir and save as Parquet files in parquet_dir.
    """
    os.makedirs(parquet_dir, exist_ok=True)
    for fname in os.listdir(processed_csv_dir):
        if fname.endswith(".csv"):
            fpath = os.path.join(processed_csv_dir, fname)
            df = load_csv_file(fpath)
            parquet_fname = fname.replace(".csv", ".parquet")
            parquet_path = os.path.join(parquet_dir, parquet_fname)
            df.to_parquet(parquet_path, index=False)
            print(f"Saved {parquet_path}")
if __name__ == "__main__":
    save_all_csvs_as_parquet()