# Phase 3: Incremental Loading (Daily Update Simulation)

### Step 1: Fetch & Prepare Incremental Data

In [1]:
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from pyspark.sql.functions import col, year, month, date_format
from pyspark.sql.window import Window

# 1. Define parameters for the incremental load (last 5 days)
incremental_start = (datetime.now() - timedelta(days=5)).strftime('%Y-%m-%d')
tickers = ["MSFT", "AAPL", "GOOGL", "AMZN"]
print(f"Fetching incremental data from {incremental_start}...")

# 2. Download data
raw_inc_pd = yf.download(tickers, start=incremental_start, progress=False, auto_adjust=True)

# 3. Robust Pandas Restructuring (MATCHING PHASE 1 LOGIC)
try:
    stacked_inc = raw_inc_pd.stack(level=1).reset_index()
except IndexError:
    stacked_inc = raw_inc_pd.stack(level=0).reset_index()

# 4. Standardize Column Names
stacked_inc.rename(columns={stacked_inc.columns[0]: 'date', stacked_inc.columns[1]: 'ticker'}, inplace=True)

rename_map = {}
for c in stacked_inc.columns:
    c_lower = c.lower()
    if 'date' in c_lower: rename_map[c] = 'date'
    elif 'ticker' in c_lower: rename_map[c] = 'ticker'
    elif 'volume' in c_lower: rename_map[c] = 'volume'
    elif ('adj' in c_lower and 'close' in c_lower) or ('close' in c_lower and 'adj' not in c_lower):
        rename_map[c] = 'close_price'
stacked_inc.rename(columns=rename_map, inplace=True)

# 5. Convert to Spark & Finalize Schema
new_data_df = spark.createDataFrame(stacked_inc)
new_data_df = new_data_df.select(
    col("date"), 
    col("ticker"), 
    col("close_price").cast("double"), 
    col("volume").cast("long")
)

# Add time attributes required by Silver table schema
new_data_df = new_data_df.withColumn("year", year(col("date"))) \
                         .withColumn("month", month(col("date"))) \
                         .withColumn("day_of_week", date_format(col("date"), "E"))

print("Incremental data fetched and prepared.")
display(new_data_df.limit(5))

StatementMeta(, 4b2fa67f-0cfd-4675-a9be-838e0ef16f5d, 5, Finished, Available, Finished)

Fetching incremental data from 2025-11-01...
Incremental data fetched and prepared.


SynapseWidget(Synapse.DataFrame, bb215158-7141-44c9-899e-426c03f3e5a7)

### Step 2: Enrich Data (Recalculate SMA)

In [ ]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

print("Recalculating SMA for new incoming data...")

# Define the same 7-day window
window_spec = Window.partitionBy("ticker").orderBy("date").rowsBetween(-6, 0)

# Apply the window function to the new data chunk
new_data_final = new_data_df.withColumn("sma_7_day", avg(col("close_price")).over(window_spec))

print("Data enrichment complete. Ready for MERGE.")

StatementMeta(, 2d2da600-be74-4ae3-b4a9-41a766519f61, -1, Cancelled, , Cancelled)

### Step 3: Execute Delta Merge (Upsert)

In [ ]:
from delta.tables import DeltaTable

print("Starting Incremental Merge into StockData_Silver...")

if spark.catalog.tableExists("StockData_Silver"):
    delta_table = DeltaTable.forName(spark, "StockData_Silver")
    
    # Perform the Merge (Upsert) based on Date AND Ticker
    delta_table.alias("target").merge(
        new_data_final.alias("source"),
        "target.date = source.date AND target.ticker = source.ticker"
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
    
    print("Merge SUCCESS! StockData_Silver table is now fully up-to-date.")
else:
    print("ERROR: Target table 'StockData_Silver' not found. Please re-run Phase 2.")

StatementMeta(, 2d2da600-be74-4ae3-b4a9-41a766519f61, -1, Cancelled, , Cancelled)

### Step 4: Create/Update Gold Layer

In [ ]:
# Read the now fully up-to-date Silver table
silver_df = spark.table("StockData_Silver")

# Write to Gold, partitioned by YEAR and TICKER for best reporting performance
silver_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year", "ticker") \
    .saveAsTable("StockMarket_Gold")

print("StockMarket_Gold successfully refreshed and partitioned.")

StatementMeta(, 2d2da600-be74-4ae3-b4a9-41a766519f61, -1, Cancelled, , Cancelled)

# Phase 4: Orchestration and Visualization

### Step 1: Verify Gold Data (Sanity Check)

In [ ]:
%%sql
-- Verify the Gold table content and partitioning
SELECT 
    year, 
    ticker, 
    COUNT(*) as daily_records, 
    MIN(date) as start_date, 
    MAX(date) as end_date, 
    AVG(close_price) as avg_price, 
    AVG(sma_7_day) as avg_7day_sma
FROM 
    StockMarket_Gold
GROUP BY 
    year, ticker
ORDER BY 
    year DESC, ticker


StatementMeta(, 2d2da600-be74-4ae3-b4a9-41a766519f61, -1, Cancelled, , Cancelled)