In [0]:
%pip install yfinance

import yfinance as yf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev, when, lit, sum as _sum
from pyspark.sql.window import Window


Collecting yfinance
  Obtaining dependency information for yfinance from https://files.pythonhosted.org/packages/8d/51/9f26741aeeb149fe75b30bceee389cf9920f9458d0f213539398aa217b99/yfinance-0.2.52-py2.py3-none-any.whl.metadata
  Using cached yfinance-0.2.52-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Obtaining dependency information for multitasking>=0.0.7 from https://files.pythonhosted.org/packages/3e/8a/bb3160e76e844db9e69a413f055818969c8acade64e1a9ac5ce9dfdcf6c1/multitasking-0.0.11-py3-none-any.whl.metadata
  Using cached multitasking-0.0.11-py3-none-any.whl.metadata (5.5 kB)
Collecting lxml>=4.9.1 (from yfinance)
  Obtaining dependency information for lxml>=4.9.1 from https://files.pythonhosted.org/packages/42/07/b29571a58a3a80681722ea8ed0ba569211d9bb8531ad49b5cacf6d409185/lxml-5.3.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata
  Using cached lxml-5.3.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.8 kB)
Collecting frozendict>=2.3.4

In [0]:
import yfinance as yf

# List of stock tickers to fetch data for
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "TSLA"]

# Initialize an empty DataFrame for all stocks
stocks_df = None

# Fetch historical data for each ticker
for ticker in tickers:
    # Fetch 5 years of historical data
    stock = yf.Ticker(ticker)
    hist = stock.history(period="5y")
    hist.reset_index(inplace=True)  # Reset index to make 'Date' a column
    hist["Ticker"] = ticker  # Add the ticker as a column for identification

    # Convert to a Spark DataFrame
    ticker_df = spark.createDataFrame(hist)

    # Combine all tickers into one DataFrame
    if stocks_df is None:
        stocks_df = ticker_df
    else:
        stocks_df = stocks_df.union(ticker_df)

# Drop the 'Stock Splits' column as it's not needed
stocks_df = stocks_df.drop("Stock Splits")

# Save the raw data to Delta format for storage
stocks_df.write.format("delta").mode("overwrite").save("/mnt/raw/stock_data")

# Display the raw DataFrame to verify the data
stocks_df.show()


+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+------+
|               Date|             Open|             High|              Low|            Close|   Volume|Dividends|Ticker|
+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+------+
|2020-01-24 05:00:00|77.61920173441393| 78.3657000922343|76.95752718801279|77.14900207519531|146537600|      0.0|  AAPL|
|2020-01-27 05:00:00|75.14943709018755|75.56388879724423|73.89395899955545|74.88040924072266|161940000|      0.0|  AAPL|
|2020-01-28 05:00:00|75.76505276717205| 77.1707993206053|75.66567993131815|76.99871826171875|162234000|      0.0|  AAPL|
|2020-01-29 05:00:00|78.63716877095959|79.46122758988011|77.89308902061387|78.61050415039062|216229200|      0.0|  AAPL|
|2020-01-30 05:00:00| 77.6894834620713|78.54989623607841|77.25563796528307|78.49657440185547|126743200|      0.0|  AAPL|
|2020-01-31 05:00:00|77.78398880

In [0]:
# Define windows rolling metrics
rolling_window_30 = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-29, 0)
rolling_window_90 = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-89, 0)

# 1. Daily Return
stocks_df = stocks_df.withColumn(
    "daily_return", (col("Close") - col("Open")) / col("Open")
)

# 2. Rolling Metrics (30-day and 90-day)
stocks_df = stocks_df.withColumn("SMA_30", avg("Close").over(rolling_window_30))
stocks_df = stocks_df.withColumn("SMA_90", avg("Close").over(rolling_window_90))
stocks_df = stocks_df.withColumn("Volatility_30", stddev("Close").over(rolling_window_30))

# 3. Cumulative Return Over 5 Years
stocks_df = stocks_df.withColumn(
    "cumulative_return",
    _sum("daily_return").over(Window.partitionBy("Ticker").orderBy("Date"))
)

# 4. Yearly Aggregations
# Extract the year from the date
stocks_df = stocks_df.withColumn("Year", col("Date").substr(1, 4))

# Calculate average close price and total volume by year
stocks_yearly_df = stocks_df.groupBy("Ticker", "Year").agg(
    avg("Close").alias("avg_close"),
    _sum("Volume").alias("total_volume")
)

# Create schemas
spark.sql("CREATE SCHEMA IF NOT EXISTS staging")
spark.sql("CREATE SCHEMA IF NOT EXISTS analytics")

# Save the transformed data
stocks_df.write.format("delta").mode("overwrite").saveAsTable("dehub.staging.stocks_data_transformed")
stocks_yearly_df.write.format("delta").mode("overwrite").saveAsTable("dehub.analytics.stocks_yearly")



# Display both datasets
stocks_df.select("Date", "Ticker", "Close", "SMA_30", "Volatility_30", "cumulative_return").show()
stocks_yearly_df.show()


+-------------------+------+-----------------+-----------------+------------------+--------------------+
|               Date|Ticker|            Close|           SMA_30|     Volatility_30|   cumulative_return|
+-------------------+------+-----------------+-----------------+------------------+--------------------+
|2020-01-24 05:00:00|  AAPL|77.14900207519531|77.14900207519531|              NULL|-0.00605777499268...|
|2020-01-27 05:00:00|  AAPL|74.88040924072266|76.01470565795898|1.6041373770068261|-0.00963768004429...|
|2020-01-28 05:00:00|  AAPL|76.99871826171875|76.34270985921223|1.2686168177485777|0.006645097430142716|
|2020-01-29 05:00:00|  AAPL|78.61050415039062|76.90965843200684| 1.535789207636151|0.006306013241511392|
|2020-01-30 05:00:00|  AAPL|78.49657440185547|77.22704162597657|1.5075300417540216|0.016694690110217868|
|2020-01-31 05:00:00|  AAPL|75.01612091064453|76.85855484008789|1.6225943823642721|-0.01888934120745...|
|2020-02-03 05:00:00|  AAPL|74.81012725830078|76.565922

In [0]:
#Verifying everything is setup correctly
spark.sql("SELECT * FROM staging.stocks_data_transformed LIMIT 10").show()
spark.sql("SELECT * FROM analytics.stocks_yearly LIMIT 10").show()


+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+------+--------------------+-----------------+-----------------+------------------+--------------------+----+
|               Date|             Open|             High|              Low|            Close|   Volume|Dividends|Ticker|        daily_return|           SMA_30|           SMA_90|     Volatility_30|   cumulative_return|Year|
+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+------+--------------------+-----------------+-----------------+------------------+--------------------+----+
|2020-01-24 05:00:00|77.61920173441393| 78.3657000922343|76.95752718801279|77.14900207519531|146537600|      0.0|  AAPL|-0.00605777499268...|77.14900207519531|77.14900207519531|              NULL|-0.00605777499268...|2020|
|2020-01-27 05:00:00|75.14943709018755|75.56388879724423|73.89395899955545|74.88040924072266|161940000|     