# Financial Data Ingestion to Delta Lake

This notebook downloads financial data from Yahoo Finance and ingests it into our Delta Lake bronze layer. 
The process includes:
1. Downloading historical price data for selected stock symbols
2. Transforming and validating the data
3. Writing to Delta Lake with proper optimization

## Environment Setup

In [None]:
# Import required libraries
import logging
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Any

import yfinance as yf
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, LongType

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

# Initialize Spark with Delta Lake support
spark = (SparkSession.builder
         .appName("FinanceDataIngestion")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .getOrCreate())

## Configure Schema and Error Handling

Define the schema for our financial data and set up error handling for the Yahoo Finance API.

In [None]:
# Define schema for financial data
schema = StructType([
    StructField("ticker", StringType(), False),
    StructField("date", DateType(), False),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("adj_close", DoubleType(), True),
    StructField("volume", LongType(), True),
    StructField("ingestion_timestamp", DateType(), False)
])

# Error handling class
class DataIngestionError(Exception):
    """Custom exception for data ingestion errors."""
    pass

# Initialize ingestion stats dictionary
ingestion_stats = {
    "successful_tickers": [],
    "failed_tickers": [],
    "total_rows": 0,
    "start_time": None,
    "end_time": None
}

## Define Target Symbols and Date Range

Specify the list of stock symbols to download and the time period for historical data.

In [None]:
# Define target stock symbols
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META']

# Set date range (default: 1 year of data)
end_date = datetime.now()
start_date = end_date - timedelta(days=365)

print(f"Configured to download data for {len(tickers)} symbols:")
print(f"Symbols: {', '.join(tickers)}")
print(f"Date range: {start_date.date()} to {end_date.date()}")

## Download and Transform Financial Data

Download historical price data for all symbols using yfinance API and transform it into the required format.

In [None]:
# Download and process data
ingestion_stats["start_time"] = datetime.now()
all_data = []

try:
    for ticker in tickers:
        try:
            logger.info(f"Downloading data for {ticker}...")
            yf_ticker = yf.Ticker(ticker)
            hist = yf_ticker.history(start=start_date, end=end_date)
            
            if hist.empty:
                raise DataIngestionError(f"No data available for {ticker}")
            
            # Handle timezone and date conversion
            if hasattr(hist.index, 'tz_localize'):
                hist.index = hist.index.tz_localize(None)
            hist.reset_index(inplace=True)
            
            # Convert date to proper datetime format
            hist['Date'] = pd.to_datetime(hist['Date'])
            hist['ticker'] = ticker
            
            # Rename columns immediately after download to ensure consistency
            hist.columns = hist.columns.str.lower().str.replace(' ', '_')
            
            all_data.append(hist)
            logger.info(f"Successfully downloaded {len(hist)} rows for {ticker}")
            ingestion_stats["successful_tickers"].append(ticker)
            
        except Exception as exc:
            logger.error(f"Failed to download data for {ticker}: {str(exc)}")
            ingestion_stats["failed_tickers"].append(ticker)
            continue

    if not all_data:
        raise DataIngestionError("No data downloaded for any ticker")

    # Combine all data and add ingestion timestamp
    combined_data = pd.concat(all_data, ignore_index=True)
    combined_data['ingestion_timestamp'] = pd.to_datetime(datetime.now().date())
    
    # Verify column names after combining
    logger.info(f"Available columns: {list(combined_data.columns)}")

    ingestion_stats["total_rows"] = len(combined_data)
    logger.info(f"Successfully combined data: {len(combined_data)} total rows")
    
except Exception as exc:
    logger.error(f"Data download failed: {str(exc)}")
    raise DataIngestionError(f"Data download failed: {str(exc)}")
finally:
    ingestion_stats["end_time"] = datetime.now()

# Display ingestion summary
duration = ingestion_stats["end_time"] - ingestion_stats["start_time"]
print("\n=== Ingestion Summary ===")
print(f"Duration: {duration.total_seconds():.2f} seconds")
print(f"Total rows: {ingestion_stats['total_rows']}")
print(f"Successful tickers: {', '.join(ingestion_stats['successful_tickers'])}")
if ingestion_stats['failed_tickers']:
    print(f"Failed tickers: {', '.join(ingestion_stats['failed_tickers'])}")
print("=====================")

## Write to Delta Lake Bronze Layer

Write the transformed data to Delta Lake with proper optimization settings.

In [None]:
# Create or get the catalog
spark.sql("CREATE CATALOG IF NOT EXISTS finance_catalog")
spark.sql("USE CATALOG finance_catalog")

spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("USE bronze")

try:
    if 'adj_close' not in combined_data.columns:
        combined_data['adj_close'] = combined_data['close']
    
    selected_columns = ['ticker', 'date', 'open', 'high', 'low', 'close', 
                       'adj_close', 'volume', 'ingestion_timestamp']
    combined_data_filtered = combined_data[selected_columns]
    
    spark_df = spark.createDataFrame(combined_data_filtered)
    
    spark_df = spark_df.select(
        spark_df.ticker.cast(StringType()),
        spark_df.date.cast(DateType()),
        spark_df.open.cast(DoubleType()),
        spark_df.high.cast(DoubleType()),
        spark_df.low.cast(DoubleType()),
        spark_df.close.cast(DoubleType()),
        spark_df.adj_close.cast(DoubleType()),
        spark_df.volume.cast(LongType()),
        spark_df.ingestion_timestamp.cast(DateType())
    )
    
    table_name = "prices"
    (spark_df.write
     .format("delta")
     .mode("append")
     .option("mergeSchema", "true")
     .option("delta.autoOptimize.optimizeWrite", "true")
     .option("delta.autoOptimize.autoCompact", "true")
     .saveAsTable(f"finance_catalog.bronze.{table_name}"))
    
    row_count = spark_df.count()
    print(f"\nSuccessfully wrote {row_count} rows to finance_catalog.bronze.{table_name}")
    
except Exception as exc:
    logger.error("Failed to write to Delta table", exc_info=True)
    raise DataIngestionError(f"Delta table write failed: {str(exc)}")

## Initial Data Validation

Perform basic validation of the ingested data.

In [None]:
# Read the table and create a view
prices_df = spark.table("finance_catalog.bronze.prices")
prices_df.createOrReplaceTempView("prices_view")

# Get basic statistics
print("\nBasic Statistics:")
print(f"Total records: {prices_df.count()}")
print("\nSchema:")
prices_df.printSchema()

# Check record counts by symbol
symbol_counts = spark.sql("""
    SELECT 
        ticker,
        COUNT(*) as record_count
    FROM prices_view
    GROUP BY ticker
    ORDER BY record_count DESC
""")

print("\nRecord counts by symbol:")
display(symbol_counts)

# Show sample of recent data
print("\nSample of recent data:")
display(spark.sql("""
    SELECT *
    FROM prices_view
    ORDER BY date DESC
    LIMIT 5
"""))