# ETL Pipeline for Multi-Asset Portfolio Analysis

This project develops a simple ETL pipeline to transform raw financial market data into actionable investment insights. Using Yahoo Finance as the primary data source, the system extracts price and volume data for a diversified portfolio spanning multiple asset classes including equities, bonds, commodities, and volatility instruments.

## YFinance Setup

The yfinance library is not officially affiliated with Yahoo Finance. It uses web scraping techniques and can required to be tweaked or updated to work properly. Here we define some utility functions to avoid detection. The methods are very similar to those discussed in class while using BeautifulSoup or Selenium.

In [6]:
# General basic imports for the analysis
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [8]:
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Configure yfinance with custom session and headers
def setup_yfinance_session():
    """Set up a robust session for yfinance with headers and retry logic."""
    session = requests.Session()
    
    # Custom headers to avoid detection
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate',
        'DNT': '1',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
    }
    session.headers.update(headers)
    
    # Retry strategy
    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        backoff_factor=1
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# Apply the session globally to yfinance
custom_session = setup_yfinance_session()
yf._get_data_session = lambda: custom_session

print("Custom yfinance session configured!")

Custom yfinance session configured!


Let's test this solution by looking up a stock. The [Lookup API](https://yfinance-python.org/reference/api/yfinance.Lookup.html#yfinance.Lookup) queries Yahoo Finance for tickers:

In [12]:
lookup_res = yf.Lookup("AAPL")
lookup_res.get_stock().head()

Unnamed: 0_level_0,exchange,industryLink,industryName,quoteType,rank,regularMarketChange,regularMarketPercentChange,regularMarketPrice,shortName
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
AAPL,NMS,https://finance.yahoo.com/sector/technology,Technology,equity,32914.0,6.309998,2.746821,236.029999,Apple Inc.
AAPL.NE,NEO,https://finance.yahoo.com/sector/technology,Technology,equity,20014.0,0.82,2.478089,33.91,APPLE CDR (CAD HEDGED)
AAPLUSTRAD.BO,BSE,https://finance.yahoo.com/sector/industrials,Industrials,equity,20002.0,0.0,0.0,0.84,AA Plus Tradelink Limited
AAPL34.SA,SAO,https://finance.yahoo.com/sector/technology,Technology,equity,20002.0,0.619999,0.975762,64.129997,APPLE DRN
AAPL.BA,BUE,https://finance.yahoo.com/sector/technology,Technology,equity,20002.0,-175.0,-1.102362,15700.0,APPLE INC CEDEAR(REPR 1/20 SHR)


## Financial Instruments Perimeter

In this section we are going to define a diversified set of financial instruments to capture performance across asset classes and market segments. These instruments were selected using my expertise and consulting with Claude Sonnet 4. The aim is simply to have a small but representative set, with equity indices for growth exposure, sector ETFs for tactical allocation, fixed income securities for stability, alternative assets for diversification, and volatility instruments for risk management. This approach should enable risk-return analysis and correlation studies across different market environments.

In [16]:
# Portfolio tickers for yfinance
tickers_list = [
    # Equity Indices
    "SPY",  # SPDR S&P 500 ETF Trust (US Large Cap)
    "QQQ",  # Invesco QQQ Trust (NASDAQ-100/Technology Heavy)
    "IWM",  # iShares Russell 2000 ETF (US Small Cap)
    "EFA",  # iShares MSCI EAFE ETF (International Developed Markets)
    "EEM",  # iShares MSCI Emerging Markets ETF (Emerging Markets)
    # Sector ETFs
    "XLF",  # Financial Select Sector SPDR Fund
    "XLK",  # Technology Select Sector SPDR Fund
    "XLE",  # Energy Select Sector SPDR Fund
    "XLV",  # Health Care Select Sector SPDR Fund
    "XLI",  # Industrial Select Sector SPDR Fund
    # Fixed Income
    "TLT",  # iShares 20+ Year Treasury Bond ETF (Long Duration)
    "SHY",  # iShares 1-3 Year Treasury Bond ETF (Short Duration)
    # Alternative Assets
    "GLD",  # SPDR Gold Trust (Precious Metals)
    "SLV",  # iShares Silver Trust (Industrial Precious Metals)
    "DBC",  # Invesco DB Commodity Index Tracking Fund (Broad Commodities)
    # Risk & Currency
    "VIX",  # CBOE Volatility Index (Market Fear Gauge)
    "UUP",  # Invesco DB US Dollar Index Bullish Fund (US Dollar Strength)
]
tickers = [yf.Ticker(ticker_str) for ticker_str in tickers_list]

### Financial Instruments Table

We can create a **relational table** describing our instruments by using the `ticker.get_info` method. The symbol (ticker) is our primary key.

In [39]:
# Create a DataFrame with ticker information
ticker_infos = {}

print("Fetching ticker information...")
for i, ticker_obj in enumerate(tickers):
    try:
        info = ticker_obj.get_info()
        ticker_infos[tickers_list[i]] = info
        # Small delay to avoid rate limiting
        time.sleep(0.3) 
    except Exception as e:
        print(f"Error fetching {tickers_list[i]}: {str(e)}")
        # Continue with next ticker even if one fails

# Convert to DataFrame
portfolio_info_df = pd.DataFrame.from_dict(ticker_infos, orient='index')

print(f"Successfully fetched information for {len(ticker_infos)} tickers")

Fetching ticker information...
Successfully fetched information for 17 tickers


Writing and displaying this table:

In [40]:
portfolio_info_df["symbol"] = portfolio_info_df.index
portfolio_info_df.reset_index(drop=True, inplace=True)

# Write portfolio to file
portfolio_info_df.to_csv("../data/portfolio_info.csv")

# Display basic information about our portfolio
key_columns = [
    "symbol",
    "shortName",
    "longName",
    "exchange",
    "quoteType",
    "currency",
    "marketCap",
]
available_columns = [col for col in key_columns if col in portfolio_info_df.columns]

portfolio_info_df[available_columns].head(10)

Unnamed: 0,symbol,shortName,longName,exchange,quoteType,currency,marketCap
0,SPY,SPDR S&P 500,SPDR S&P 500 ETF,PCX,ETF,USD,589867700000.0
1,QQQ,"Invesco QQQ Trust, Series 1",Invesco QQQ Trust,NGM,ETF,USD,223984400000.0
2,IWM,iShares Russell 2000 ETF,iShares Russell 2000 ETF,PCX,ETF,USD,65841580000.0
3,EFA,iShares MSCI EAFE ETF,iShares MSCI EAFE ETF,PCX,ETF,USD,85094110000.0
4,EEM,iShares MSCI Emerging Index Fun,iShares MSCI Emerging Markets ETF,PCX,ETF,USD,37657690000.0
5,XLF,SPDR Select Sector Fund - Finan,The Financial Select Sector SPDR Fund,PCX,ETF,USD,47189210000.0
6,XLK,SPDR Select Sector Fund - Techn,The Technology Select Sector SPDR Fund,PCX,ETF,USD,71017500000.0
7,XLE,SPDR Select Sector Fund - Energ,The Energy Select Sector SPDR Fund,PCX,ETF,USD,16559110000.0
8,XLV,SPDR Select Sector Fund - Healt,The Health Care Select Sector SPDR Fund,PCX,ETF,USD,27023150000.0
9,XLI,SPDR Select Sector Fund - Indus,The Industrial Select Sector SPDR Fund,PCX,ETF,USD,20488440000.0


## Downloading Historical Financial Data

We can now download financial data. We initially select a very wide time window to **extract and store all possible raw data**. Potential issues will be identified and handled at a later stage.

In [43]:
# Download historical data for all portfolio tickers (1990-2024)
start_date = "1990-01-01"
end_date = "2024-12-31"

In [None]:
print(f"Downloading historical data from {start_date} to {end_date}...")

try:
    # Use space-separated string of tickers for bulk download
    tickers_string = " ".join(tickers_list)
    
    # Download with multi-level columns
    portfolio_data = yf.download(
        tickers_string,
        start=start_date,
        end=end_date,
        auto_adjust=True,  # Adjust for stock splits
        prepost=False,     # Only regular trading hours
        threads=True       # Use threading for faster downloads
    )
    
    print(f"Successfully downloaded data for {len(tickers_list)} tickers")
    print(f"Date range: {portfolio_data.index.min()} to {portfolio_data.index.max()}")
    
except Exception as e:
    print(f"Error downloading bulk data: {str(e)}")


# Display basic information about the downloaded data
print(f"\nPortfolio data overview:")
print(f"Shape: {portfolio_data.shape}")
print(f"Columns (first level): {portfolio_data.columns.get_level_values(0).unique().tolist()}")
print(f"Tickers (second level): {portfolio_data.columns.get_level_values(1).unique().tolist()}")

# Show sample of the data
portfolio_data.head()

Downloading historical data from 1900-01-01 to 2024-12-31...


[*********************100%***********************]  17 of 17 completed

Successfully downloaded data for 17 tickers
Date range: 1993-01-29 00:00:00 to 2024-12-30 00:00:00

Portfolio data overview:
Shape: (8037, 85)
Columns (first level): ['Close', 'High', 'Low', 'Open', 'Volume']
Tickers (second level): ['DBC', 'EEM', 'EFA', 'GLD', 'IWM', 'QQQ', 'SHY', 'SLV', 'SPY', 'TLT', 'UUP', 'VIX', 'XLE', 'XLF', 'XLI', 'XLK', 'XLV']





Price,Close,Close,Close,Close,Close,Close,Close,Close,Close,Close,...,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume
Ticker,DBC,EEM,EFA,GLD,IWM,QQQ,SHY,SLV,SPY,TLT,...,SLV,SPY,TLT,UUP,VIX,XLE,XLF,XLI,XLK,XLV
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
1993-01-29,,,,,,,,,24.380438,,...,,1003200,,,,,,,,
1993-02-01,,,,,,,,,24.553837,,...,,480500,,,,,,,,
1993-02-02,,,,,,,,,24.605865,,...,,201300,,,,,,,,
1993-02-03,,,,,,,,,24.865965,,...,,529400,,,,,,,,
1993-02-04,,,,,,,,,24.970005,,...,,531500,,,,,,,,


### Historical Data Table

Similarly to the financial instruments table, we can create a simple relational table with the obtained historical series. 
Notice that this is a multi-level-index data frame.

The ideal tool for historical series (columnar data) would be e.g. a Cassandra database, but in the interest of conciseness we are going to use another CSV file:

In [None]:
portfolio_data.to_csv("../data/portfolio_data.csv")

## Data Exploration

In this section we are going to perform a preliminary data exploration with the aim to:

### **Data Quality Assessment**
- **Missing Data Analysis**: Identify patterns of missing values across tickers and time periods
- **Data Completeness**: Evaluate coverage for each instrument (some ETFs may have shorter histories)
- **Outlier Detection**: Spot anomalous price movements, volume spikes, or data errors
- **Temporal Consistency**: Verify trading day alignment and handle market holidays

### **Data Profiling**
- **Statistical Summaries**: Generate descriptive statistics for OHLCV data across all tickers
- **Data Types & Formats**: Validate numeric precision and date formatting
- **Cross-Asset Validation**: Compare data ranges and patterns across asset classes
- **Volume Analysis**: Assess liquidity patterns and trading activity

### **Preliminary Financial Analysis**
- **Price Evolution**: Visualize historical performance across the 1990-2024 period
- **Volatility Patterns**: Identify periods of high market stress (2008, 2020, etc.)
- **Correlation Structure**: Examine relationships between different asset classes
- **Market Regime Analysis**: Detect structural breaks and regime changes

### **ETL Pipeline Readiness**
- **Data Standardization Needs**: Identify required transformations and normalization
- **Performance Optimization**: Assess data loading and processing efficiency
- **Star Schema Design**: Plan dimensional modeling for the data warehouse
- **Business Logic Validation**: Ensure data integrity for downstream analytics

This exploration will inform our transformation logic and help design robust data quality checks for the production ETL pipeline.