In [0]:
"""
Notebook: Daily Stock Price ETL
Author: Prajwal Raj Giri
Description:
- Fetches daily stock prices
- Transforms & cleans data
- Stores in AWS S3 as Delta Lake
"""


In [0]:
#initial S3 Connection Check
test_df = spark.createDataFrame(
    [(1, "s3_test_success")],
    ["id", "message"]
)

test_df.write.mode("overwrite").parquet(
    "s3a://etl-stock-data/s3_test/"
)


In [0]:
%pip install yfinance


In [0]:
import yfinance as yf
import pandas as pd
from datetime import date
from pyspark.sql.functions import col


In [0]:
STOCKS = ["AAPL", "MSFT", "GOOGL"]
RUN_DATE = date.today().strftime("%Y-%m-%d")

print("Running ETL for date:", RUN_DATE)


In [0]:
stock_data = []

for stock in STOCKS:
    df = yf.download(stock, period="1d", interval="1d")
    
    if df.empty:
        print(f"No data for {stock}")
        continue
        
    df["symbol"] = stock
    df["run_date"] = RUN_DATE
    stock_data.append(df)


In [0]:
pandas_df = pd.concat(stock_data).reset_index()
pandas_df.head()


In [0]:
spark_df = spark.createDataFrame(pandas_df)
spark_df.printSchema()


In [0]:
# Select columns using actual schema names
clean_df = (
    spark_df.select(
        col("('symbol', '')").alias("symbol"),
        col("('run_date', '')").alias("run_date"),
        col("('Date', '')").alias("stock_date"),
        col("('Open', 'AAPL')").alias("open_price_aapl"),
        col("('High', 'AAPL')").alias("high_price_aapl"),
        col("('Low', 'AAPL')").alias("low_price_aapl"),
        col("('Close', 'AAPL')").alias("close_price_aapl"),
        col("('Volume', 'AAPL')").alias("volume_aapl"),
        col("('Open', 'MSFT')").alias("open_price_msft"),
        col("('High', 'MSFT')").alias("high_price_msft"),
        col("('Low', 'MSFT')").alias("low_price_msft"),
        col("('Close', 'MSFT')").alias("close_price_msft"),
        col("('Volume', 'MSFT')").alias("volume_msft"),
        col("('Open', 'GOOGL')").alias("open_price_googl"),
        col("('High', 'GOOGL')").alias("high_price_googl"),
        col("('Low', 'GOOGL')").alias("low_price_googl"),
        col("('Close', 'GOOGL')").alias("close_price_googl"),
        col("('Volume', 'GOOGL')").alias("volume_googl")
    )
)

In [0]:
assert clean_df.count() > 0, "No data loaded!"
clean_df.show()


In [0]:
S3_PATH = "s3a://etl-stock-data/stock_data/daily_prices"


In [0]:
clean_df.write \
    .format("delta") \
    .mode("append") \
    .save(S3_PATH)


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS stock_prices
USING DELTA
LOCATION '{S3_PATH}'
""")


In [0]:
%sql
SELECT symbol, stock_date, close_price_aapl, close_price_msft, close_price_googl
FROM stock_prices
ORDER BY stock_date DESC;


In [0]:
print("âœ… Daily Stock ETL completed successfully for", RUN_DATE)
