In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime

# Configuration - list of stocks to track
stocks = ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]  # US stocks
api_key = "ZVG3CQYLUO8Q83XQ"

# Function to fetch stock data
def fetch_stock_data(symbol):
    url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={api_key}"
    response = requests.get(url)
    data = response.json()
    
    if "Time Series (Daily)" not in data:
        print(f"Error fetching {symbol}: {data}")
        return []
    
    time_series = data["Time Series (Daily)"]
    rows = []
    
    for date, prices in time_series.items():
        row = {
            "date": date,
            "symbol": symbol,
            "open": float(prices["1. open"]),
            "high": float(prices["2. high"]),
            "low": float(prices["3. low"]),
            "close": float(prices["4. close"]),
            "volume": int(prices["5. volume"]),
            "ingestion_timestamp": datetime.now()
        }
        rows.append(row)
    
    return rows

print(f"Fetching data for {len(stocks)} stocks...")

In [0]:
import time

# Fetch data for all stocks
all_rows = []

for i, symbol in enumerate(stocks):
    print(f"Fetching {symbol} ({i+1}/{len(stocks)})...")
    
    rows = fetch_stock_data(symbol)
    all_rows.extend(rows)
    
    print(f"  âœ“ Got {len(rows)} records for {symbol}")
    
    # Rate limit: wait 12 seconds between API calls (5 calls per minute max)
    if i < len(stocks) - 1:  # Don't wait after last stock
        print(f"  Waiting 12 seconds for rate limit...")
        time.sleep(12)

print(f"\nTotal records fetched: {len(all_rows)}")
print(f"Stocks: {set([r['symbol'] for r in all_rows])}")

In [0]:
# Create DataFrame from all rows
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(all_rows)

print(f"DataFrame created with {df.count()} records")

# Show sample data from each stock
df.groupBy("symbol").agg(
    count("*").alias("record_count"),
    min("date").alias("earliest_date"),
    max("date").alias("latest_date")
).show()

# Write to bronze (using merge to avoid duplicates on re-runs)
from delta.tables import DeltaTable

# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS stock_market")

# Check if table exists
if spark.catalog.tableExists("stock_market.bronze_stock_data"):
    # Merge to handle duplicates
    bronze_table = DeltaTable.forName(spark, "stock_market.bronze_stock_data")
    
    bronze_table.alias("target").merge(
        df.alias("source"),
        "target.symbol = source.symbol AND target.date = source.date"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
    
    print("Merged new data into existing bronze table")
else:
    # First time - just write
    df.write.format("delta").mode("append").saveAsTable("stock_market.bronze_stock_data")
    print("Created new bronze table")

# Verify
final_count = spark.table("stock_market.bronze_stock_data").count()
print(f"\nBronze table now has {final_count} total records")