In [24]:
# =============================================================================
# PROJECT 1: Investment Portfolio Performance & Risk Analysis
# =============================================================================
# Scenario: Ingesting daily CSV exports to build a BI data model and dashboard.
# Skills: ETL from file sources, Data Modeling (Star Schema), SQL, Pandas,
#         Advanced Metrics (Beta, Sharpe Ratio), Data Visualization (Plotly).
# =============================================================================

# Step 0: Setup and Data Simulation
# -----------------------------------------------------------------------------
# In a real-world scenario, these CSV files would be provided daily/quarterly.
# Here, we simulate their content as strings to make the notebook self-contained.

import pandas as pd
import numpy as np
import io
import sqlite3
import plotly.express as px
import plotly.graph_objects as go

def generate_market_data_csv():
    """Generates a realistic-looking CSV string for daily market data."""
    dates = pd.to_datetime(pd.date_range(start="2023-01-01", end="2023-12-31"))
    symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'SPY']
    data = []
    np.random.seed(42)

    for symbol in symbols:
        price = 100 + np.random.rand() * 100
        for date in dates:
            if date.dayofweek < 5: # Simulate trading days
                daily_return = np.random.normal(0.0005, 0.02)
                price *= (1 + daily_return)
                volume = np.random.randint(1_000_000, 100_000_000)
                data.append([date.strftime('%Y-%m-%d'), symbol, round(price, 2), round(volume, 0)])

    df = pd.DataFrame(data, columns=['Date', 'Symbol', 'AdjClose', 'Volume'])
    return df.to_csv(index=False)

def generate_fundamentals_csv():
    """Generates a CSV string for company fundamental data."""
    data = {
        'Symbol': ['AAPL', 'GOOGL', 'MSFT', 'AMZN'],
        'PERatio': [28.5, 25.2, 35.8, 58.1],
        'EPS': [5.9, 5.1, 9.7, 2.3],
        'MarketCapB': [2800, 1700, 2500, 1300]
    }
    df = pd.DataFrame(data)
    return df.to_csv(index=False)

def generate_sentiment_csv():
    """Generates a CSV string for simulated news sentiment data."""
    data = """Date,Symbol,SentimentScore,Headline
2023-10-25,AAPL,-0.5,"Apple faces antitrust probe in Europe"
2023-10-26,MSFT,0.8,"Microsoft smashes earnings expectations on cloud strength"
2023-10-27,GOOGL,0.6,"Google announces breakthroughs in generative AI"
2023-10-28,AMZN,-0.3,"Amazon reports sluggish growth in e-commerce division"
2023-11-15,AAPL,0.9,"New iPhone receives stellar reviews"
2023-11-16,MSFT,0.7,"Success of GitHub Copilot integration boosts stock"
"""
    return data

# --- Generate the CSV data ---
market_data_csv = generate_market_data_csv()
fundamentals_csv = generate_fundamentals_csv()
sentiment_csv = generate_sentiment_csv()

print("--- Simulated CSV Data Sources ---")
print("Market Data Snippet:\n", market_data_csv[:200])
print("\nFundamentals Data:\n", fundamentals_csv)
print("\nSentiment Data:\n", sentiment_csv)


# Step 1: Ingestion and ETL from CSV Sources
# -----------------------------------------------------------------------------
print("\n--- Step 1: Ingesting and processing CSV data ---")

# Load data into pandas DataFrames
df_market = pd.read_csv(io.StringIO(market_data_csv), parse_dates=['Date'])
df_fundamentals = pd.read_csv(io.StringIO(fundamentals_csv))
df_sentiment = pd.read_csv(io.StringIO(sentiment_csv), parse_dates=['Date'])

# Basic transformation: Calculate daily returns
df_market.sort_values(by=['Symbol', 'Date'], inplace=True)
df_market['DailyReturn'] = df_market.groupby('Symbol')['AdjClose'].pct_change().fillna(0)

print("Market data loaded and daily returns calculated.")
print(df_market.head())


# Step 2: Data Modeling (Star Schema)
# -----------------------------------------------------------------------------
# We create a clean, relational model for business intelligence.
print("\n--- Step 2: Building the Star Schema Data Model ---")

# --- Dimension Table 1: D_Stock ---
# Contains static information about each stock
stock_info = {
    'Symbol': ['AAPL', 'GOOGL', 'AMZN', 'MSFT', 'SPY'],
    'CompanyName': ['Apple Inc.', 'Alphabet Inc.', 'Amazon.com, Inc.', 'Microsoft Corporation', 'SPDR S&P 500 ETF'],
    'Sector': ['Technology', 'Technology', 'Consumer Cyclical', 'Technology', 'Index']
}
D_Stock = pd.DataFrame(stock_info)
D_Stock = D_Stock.merge(df_fundamentals, on='Symbol', how='left')
D_Stock['StockKey'] = D_Stock.index + 1  # Surrogate key

# --- Dimension Table 2: D_Date ---
# Created from the full date range in our data
D_Date = pd.DataFrame(df_market['Date'].unique(), columns=['FullDate'])
D_Date = D_Date.sort_values('FullDate').reset_index(drop=True)
D_Date['DateKey'] = D_Date.index + 1
D_Date['Year'] = D_Date['FullDate'].dt.year
D_Date['Quarter'] = D_Date['FullDate'].dt.quarter
D_Date['Month'] = D_Date['FullDate'].dt.month
D_Date['DayOfWeek'] = D_Date['FullDate'].dt.day_name()

# --- Fact Table: F_DailyPerformance ---
F_DailyPerformance = df_market.copy()

# Join with dimension tables to get the foreign keys
F_DailyPerformance = pd.merge(F_DailyPerformance, D_Stock[['Symbol', 'StockKey']], on='Symbol')
F_DailyPerformance = pd.merge(F_DailyPerformance, D_Date[['FullDate', 'DateKey']], left_on='Date', right_on='FullDate')

# Join with sentiment data (left join to keep all performance days)
F_DailyPerformance = pd.merge(F_DailyPerformance, df_sentiment, on=['Date', 'Symbol'], how='left')

# Final selection and renaming for the fact table
F_DailyPerformance = F_DailyPerformance[['DateKey', 'StockKey', 'AdjClose', 'Volume', 'DailyReturn', 'SentimentScore']]
F_DailyPerformance.rename(columns={'AdjClose': 'ClosePrice', 'DailyReturn': 'Return'}, inplace=True)

print("Data model created successfully.")
print("D_Stock Dimension:\n", D_Stock.head())
print("\nF_DailyPerformance Fact Table:\n", F_DailyPerformance.head())


# Step 3: Loading to Data Warehouse and Advanced Analysis
# -----------------------------------------------------------------------------
print("\n--- Step 3: Loading to Warehouse and calculating advanced metrics ---")

# --- Load data model into a SQLite database ---
conn = sqlite3.connect('finance_warehouse.db')
D_Stock.to_sql('D_Stock', conn, if_exists='replace', index=False)
D_Date.to_sql('D_Date', conn, if_exists='replace', index=False)
F_DailyPerformance.to_sql('F_DailyPerformance', conn, if_exists='replace', index=False)

# --- Fetch analysis data using SQL ---
query = """
SELECT
    d.FullDate,
    s.Symbol,
    f.Return
FROM F_DailyPerformance f
JOIN D_Stock s ON f.StockKey = s.StockKey
JOIN D_Date d ON f.DateKey = d.DateKey;
"""
df_returns = pd.read_sql_query(query, conn, parse_dates=['FullDate'])
conn.close()

# --- Calculate advanced metrics using Pandas ---
# Pivot to get returns for each stock in columns for easier calculation
df_pivot = df_returns.pivot(index='FullDate', columns='Symbol', values='Return')

# Calculate metrics for each stock against the market benchmark (SPY)
final_metrics = []
risk_free_rate = 0.02 / 252  # Annual 2% risk-free rate, divided by ~252 trading days

for symbol in df_pivot.columns.drop('SPY'):
    # Beta = covariance(stock_return, market_return) / variance(market_return)
    covariance = df_pivot[symbol].cov(df_pivot['SPY'])
    market_variance = df_pivot['SPY'].var()
    beta = covariance / market_variance

    # Sharpe Ratio = (avg_return - risk_free_rate) / std_dev_return
    avg_return = df_pivot[symbol].mean()
    std_return = df_pivot[symbol].std()
    sharpe_ratio = (avg_return - risk_free_rate) / std_return

    # Annualize Sharpe Ratio for better interpretation
    annualized_sharpe = sharpe_ratio * np.sqrt(252)

    final_metrics.append({'Symbol': symbol, 'Beta': beta, 'AnnualizedSharpeRatio': annualized_sharpe})

df_advanced_metrics = pd.DataFrame(final_metrics)
print("Advanced Risk/Return Metrics:\n", df_advanced_metrics)


# Step 4: Data Visualization (Dashboard Prototype)
# -----------------------------------------------------------------------------
print("\n--- Step 4: Creating dashboard visualizations ---")

# Merge metrics back with stock info for plotting
df_plot_data = D_Stock.merge(df_advanced_metrics, on='Symbol', how='left')
df_plot_data.dropna(inplace=True) # Drop SPY and any stocks without metrics

# Create an interactive bubble chart for Risk vs. Return
fig = px.scatter(
    df_plot_data,
    x='Beta',
    y='AnnualizedSharpeRatio',
    size='MarketCapB',
    color='Sector',
    hover_name='CompanyName',
    text='Symbol',
    title='Portfolio Risk-Return Analysis (Bubble Size = Market Cap)',
    labels={'Beta': 'Market Risk (Beta)', 'AnnualizedSharpeRatio': 'Risk-Adjusted Return (Sharpe Ratio)'},
    template='plotly_white'
)

fig.update_layout(
    xaxis_title='Market Risk (Beta) → Higher is more volatile than market',
    yaxis_title='Annualized Sharpe Ratio → Higher is better return for the risk',
    legend_title='Sector',
    title_x=0.5
)

# Add reference lines for context
fig.add_hline(y=1, line_dash="dash", line_color="green", annotation_text="Good Sharpe Ratio (>1)")
fig.add_vline(x=1, line_dash="dash", line_color="grey", annotation_text="Market Beta = 1")
fig.show()


# Step 5: Airflow DAG Simulation
# -----------------------------------------------------------------------------
# This code defines the structure of an Airflow DAG to automate the pipeline.
# It is not executed in Colab but is part of the project's documentation.
print("\n--- Step 5: Airflow DAG Definition (for orchestration) ---")

airflow_dag_code = """
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta

# In a real project, these functions would be in separate scripts.
# def process_financial_csvs(): ...
# def load_model_to_warehouse(): ...

default_args = {
    'owner': 'BI_Team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'financial_csv_ingestion_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for ingesting financial CSV exports.',
    schedule_interval='@daily',
    catchup=False,
    tags=['finance', 'etl', 'csv'],
) as dag:

    # Task 1: Wait for the daily market data CSV to arrive.
    # The path would point to a shared network drive or S3 bucket.
    wait_for_market_data_file = FileSensor(
        task_id='wait_for_market_data_file',
        filepath='/path/to/source/files/market_data_{{ ds_nodash }}.csv',
        poke_interval=300, # Check every 5 minutes
        timeout=60*60*4 # Timeout after 4 hours
    )

    # Task 2: Process all available CSVs, build the model.
    process_files_task = PythonOperator(
        task_id='process_csv_files_and_model',
        python_callable=lambda: print("Processing CSVs and building star schema..."),
        # python_callable=process_financial_csvs
    )

    # Task 3: Load the final model into the data warehouse.
    load_warehouse_task = PythonOperator(
        task_id='load_model_to_warehouse',
        python_callable=lambda: print("Loading data into SQLite warehouse..."),
        # python_callable=load_model_to_warehouse
    )

    wait_for_market_data_file >> process_files_task >> load_warehouse_task
"""
print(airflow_dag_code)

--- Simulated CSV Data Sources ---
Market Data Snippet:
 Date,Symbol,AdjClose,Volume
2023-01-02,AAPL,134.47,27735830
2023-01-03,AAPL,135.39,94410762
2023-01-04,AAPL,136.21,89358551
2023-01-05,AAPL,139.04,89409749
2023-01-06,AAPL,139.33,5521373
2023-01-09,AA

Fundamentals Data:
 Symbol,PERatio,EPS,MarketCapB
AAPL,28.5,5.9,2800
GOOGL,25.2,5.1,1700
MSFT,35.8,9.7,2500
AMZN,58.1,2.3,1300


Sentiment Data:
 Date,Symbol,SentimentScore,Headline
2023-10-25,AAPL,-0.5,"Apple faces antitrust probe in Europe"
2023-10-26,MSFT,0.8,"Microsoft smashes earnings expectations on cloud strength"
2023-10-27,GOOGL,0.6,"Google announces breakthroughs in generative AI"
2023-10-28,AMZN,-0.3,"Amazon reports sluggish growth in e-commerce division"
2023-11-15,AAPL,0.9,"New iPhone receives stellar reviews"
2023-11-16,MSFT,0.7,"Success of GitHub Copilot integration boosts stock"


--- Step 1: Ingesting and processing CSV data ---
Market data loaded and daily returns calculated.
        Date Symbol  AdjClose    Volum


--- Step 5: Airflow DAG Definition (for orchestration) ---

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta

# In a real project, these functions would be in separate scripts.
# def process_financial_csvs(): ...
# def load_model_to_warehouse(): ...

default_args = {
    'owner': 'BI_Team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'financial_csv_ingestion_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for ingesting financial CSV exports.',
    schedule_interval='@daily',
    catchup=False,
    tags=['finance', 'etl', 'csv'],
) as dag:

    # Task 1: Wait for the daily market data CSV to arrive.
    # The path would point to a shared network drive or S3 bucket.
    wait_for_market_data_file = FileSensor(
 