# Stock Analysis Project

In [1]:
import yfinance as yf
import datetime as dt
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from google.colab import drive
import os

In [2]:
# Mount gDrive Folder
drive.mount('/content/gdrive', force_remount=True)
!ln -s /content/drive/MyDrive/ccl

Mounted at /content/gdrive


In [None]:
# Defining paths to data layers
bronze_path = '/bronze_path'
silver_path = '/silver_path'
gold_path = '/gold_path'

if not os.path.exists(bronze_path):
  os.makedirs(bronze_path)
if not os.path.exists(silver_path):
  os.makedirs(silver_path)
if not os.path.exists(gold_path):
  os.makedirs(gold_path)

# Defining tickers I'll work with
tickers = ['CM.TO', 'TD.TO', 'BNS.TO', 'BMO.TO', 'RY.TO']
# Start Date (YYYY-MM-DD)
start_date = '2020-01-01'
# End Date (YYYY-MM-DD)
end_date = '2025-04-07'

In [3]:
class StockDataFunctions:
  # Constructor
  def __init__(self, spark=None):
    if spark is None:
        self.spark = SparkSession.builder \
            .appName("stockAnalysis") \
            .getOrCreate()
    else:
        self.spark = spark

    self.data = {}

  # Function to fetch stocks based on ticker, start date and end date
  def fetch_stock(self, tickers, start_date, end_date):

    # Transform single string values into a list
    if isinstance(tickers, str):
      tickers = [tickers]

    # Define default values
    if not start_date or not end_date:
      if not start_date:
        start_date = (dt.datetime.now() - dt.timedelta(days=365)).strftime("%Y-%m-%d")
      if not end_date:
        end_date = dt.datetime.now().strftime("%Y-%m-%d")

    # Fetch data for each ticker
    for each in tickers:
      ticker = yf.Ticker(each)
      df = ticker.history(start=start_date, end=end_date)
      df = df.reset_index()
      spark_df = self.spark.createDataFrame(df)
      spark_df = spark_df.withColumn("ticker", f.lit(each))
      self.data[each] = spark_df

    return self.data

  # Function to transform bronze data into silver (i.e. first transformations to become more readable)
  def create_silver(self, df):
    df_silver = df\
    .withColumnRenamed("Date", "book_date")\
    .withColumnRenamed("Open", "open")\
    .withColumnRenamed("High", "high")\
    .withColumnRenamed("Low", "low")\
    .withColumnRenamed("Close", "close")\
    .withColumnRenamed("Dividends", "dividends")\
    .withColumnRenamed("Volume", "volume")\
    .withColumn("book_date", f.col("book_date").cast("date"))\
    .withColumn("open", f.round(f.col("open"), 4))\
    .withColumn("high", f.round(f.col("high"), 4))\
    .withColumn("low", f.round(f.col("low"), 4))\
    .withColumn("close", f.round(f.col("close"), 4))\
    .withColumn("volume", f.when(f.col("volume").isNull(), 0).otherwise(f.col("volume")))
    return df_silver

  # Function to transform silver data into gold (final dataset for the visualization)
  def create_gold(self, df):
    # Create window for analysis
    window = Window.orderBy("book_date")
    window_month = Window.orderBy("book_date").rowsBetween(-21, 0)
    window_anual = Window.orderBy("book_date").rowsBetween(-252, 0)
    window_50d = Window.orderBy("book_date").rowsBetween(-50, 0)
    window_200d = Window.orderBy("book_date").rowsBetween(-200, 0)
    window_vwap = Window.orderBy("book_date").rowsBetween(Window.unboundedPreceding, 0)

    # Update dataset
    df_gold = df\
      .withColumn("average_price", f.round((f.col("open") + f.col("high") + f.col("low") + f.col("close")) / f.lit(4.0), 4))\
      .withColumn("previous_close", f.lag("Close", 1).over(window))\
      .withColumn("price_change", f.round(f.col("Close") - f.col("previous_close"), 4))\
      .withColumn("daily_return_percentage", f.round(((f.col("price_change"))/ f.col("previous_close"))*100, 4))\
      .withColumn("daily_return_percentage", f.when(f.col("daily_return_percentage").isNull(), f.lit(0)).otherwise(f.col("daily_return_percentage")))\
      .withColumn("growth", f.round((f.col("daily_return_percentage")/100 + 1), 4))\
      .withColumn("cumulative_growth", f.round(f.expr("exp(sum(log(growth)) over (ORDER BY book_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) - 1"), 4))\
      .withColumn("volatility_30days", f.round(f.stddev(f.col("daily_return_percentage")).over(window_month), 4))\
      .withColumn("mov_avg30", f.round(f.avg(f.col("Close")).over(window_month), 4))\
      .withColumn("dividend_yield_percentage", f.round((f.sum(f.col("Dividends")).over(window_anual)/f.col("Close")), 4))\
      .withColumn("52week_high", f.max(f.col("High")).over(window_anual))\
      .withColumn("52week_low", f.min(f.col("Low")).over(window_anual))\
      .withColumn("ma50d", f.round(f.avg(f.col("Close")).over(window_50d), 4))\
      .withColumn("ma200d", f.round(f.avg(f.col("Close")).over(window_200d), 4))\
      .withColumn("price_volume", f.round(f.col("average_price") * f.col("Volume"), 4))\
      .withColumn("cumulative_price_volume", f.round(f.sum("price_volume").over(window_vwap), 4))\
      .withColumn("cumulative_volume", f.round(f.sum("Volume").over(window_vwap), 4))\
      .withColumn("vwap", f.round(f.col("cumulative_price_volume") / f.col("cumulative_volume"), 4))
    df_gold = df_gold.drop("Stock Splits","previous_close", "mov_avg30", "price_change", "price_volume", "cumulative_price_volume", "cumulative_volume")
    return df_gold

  # Function to save raw data into bronze data lake
  def save_bronze(self, path):
    for ticker, df in self.data.items():
      df.write.mode("overwrite").option("header", "true").csv(f"{path}/{ticker}_bronze.csv")

  # Read bronze dataframe
  def read_bronze(self, ticker, path):
    df_bronze = self.spark.read\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .csv(path + '/' + ticker + '_bronze.csv')
    self.data[ticker] = df_bronze

  # Read parquet data
  def read_parquet(self, tickers, path):

    # Transform single string values into a list
    if isinstance(tickers, str):
      tickers = [tickers]

    # Read data for each ticker
    for ticker in tickers:
      df = self.spark.read.parquet(f"{path}/{ticker}/")
      self.data[ticker] = df

  # Function to save parquet data
  def save_parquet(self, dicts, path):
    for ticker, df in dicts.items():
      df.write.mode("overwrite").parquet(f"{path}/{ticker}/")



In [4]:
# Create Sample Bronze File
stock_fetcher = StockDataFunctions()
stock_fetcher.fetch_stock(tickers, start_date, end_date)
stock_fetcher.save_bronze(bronze_path)

Read Bronze File from folder and creating Silver File

In [5]:
# Read bronze dataframe
for ticker in tickers:
  df_bronze = stock_fetcher.read_bronze(ticker, bronze_path)
stock_fetcher.data["BMO.TO"].printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)
 |-- ticker: string (nullable = true)



In [7]:
stock_fetcher.data['BMO.TO'].show()

+-------------------+------------------+------------------+------------------+------------------+-------+---------+------------+------+
|               Date|              Open|              High|               Low|             Close| Volume|Dividends|Stock Splits|ticker|
+-------------------+------------------+------------------+------------------+------------------+-------+---------+------------+------+
|2022-08-18 04:00:00|119.55279194373115|120.84022614767434|119.20651999421538| 120.3696517944336|1321900|      0.0|         0.0|BMO.TO|
|2022-08-19 04:00:00|120.14767879176954|120.29861645281099|118.75370358129673|118.87800598144531|2032100|      0.0|         0.0|BMO.TO|
|2022-08-22 04:00:00| 117.5195783181215|117.79481843747472|117.06676523930962| 117.2354507446289|2407900|      0.0|         0.0|BMO.TO|
|2022-08-23 04:00:00|116.99572231139473|117.00459621674878|115.82371650319897|116.44522857666016|1711900|      0.0|         0.0|BMO.TO|
|2022-08-24 04:00:00|115.73492505311397|116.3475

In [8]:
df_silver = {}
for ticker, df in stock_fetcher.data.items():
  df_ticker = stock_fetcher.create_silver(df)
  df_silver[ticker] = df_ticker

In [9]:
df_silver['BMO.TO'].show()

+----------+--------+--------+--------+--------+-------+---------+------------+------+
| book_date|    open|    high|     low|   close| volume|dividends|Stock Splits|ticker|
+----------+--------+--------+--------+--------+-------+---------+------------+------+
|2022-08-18|119.5528|120.8402|119.2065|120.3697|1321900|      0.0|         0.0|BMO.TO|
|2022-08-19|120.1477|120.2986|118.7537| 118.878|2032100|      0.0|         0.0|BMO.TO|
|2022-08-22|117.5196|117.7948|117.0668|117.2355|2407900|      0.0|         0.0|BMO.TO|
|2022-08-23|116.9957|117.0046|115.8237|116.4452|1711900|      0.0|         0.0|BMO.TO|
|2022-08-24|115.7349|116.3475|114.3765|114.5984|2827600|      0.0|         0.0|BMO.TO|
|2022-08-25|114.9003| 116.019|114.5363|114.9891|1774800|      0.0|         0.0|BMO.TO|
|2022-08-26|115.3354|116.0457|113.6218| 114.119|1905300|      0.0|         0.0|BMO.TO|
|2022-08-29|112.9825|113.6129|112.5208|113.4442|1939600|      0.0|         0.0|BMO.TO|
|2022-08-30|111.6951|111.9614|110.2745| 110

In [10]:
# Save Silver File
stock_fetcher.save_parquet(df_silver, silver_path)

Read silver tier file, create gold tier, and save gold tier parquet

In [11]:
# Read Silver File
stock_fetcher.read_parquet(tickers, silver_path)

for each in tickers:
  stock_fetcher.data[each].show()

+----------+-------+-------+-------+-------+-------+---------+------------+------+
| book_date|   open|   high|    low|  close| volume|dividends|Stock Splits|ticker|
+----------+-------+-------+-------+-------+-------+---------+------------+------+
|2020-01-02|41.0723|41.0761|40.8072| 40.989|1896000|      0.0|         0.0| CM.TO|
|2020-01-03|40.7164|40.8679|  40.58|40.8527|1918600|      0.0|         0.0| CM.TO|
|2020-01-06|40.6595|40.8603|40.6027|40.7277|3684400|      0.0|         0.0| CM.TO|
|2020-01-07|40.7656|40.9549|40.6255|40.6482|1965000|      0.0|         0.0| CM.TO|
|2020-01-08|40.7277|41.0155|40.7277| 40.883|3719600|      0.0|         0.0| CM.TO|
|2020-01-09|41.0686| 41.239|40.7618|40.7618|3672400|      0.0|         0.0| CM.TO|
|2020-01-10|40.7618|40.9436|40.6785|40.8224|2924800|      0.0|         0.0| CM.TO|
|2020-01-13|  40.83|40.9625|40.6179|40.9587|7989200|      0.0|         0.0| CM.TO|
|2020-01-14|41.0421|41.0458|40.7164|40.7315|3132600|      0.0|         0.0| CM.TO|
|202

In [12]:
df_gold = {}
for ticker, df in stock_fetcher.data.items():
  df_ticker = stock_fetcher.create_gold(df)
  df_gold[ticker] = df_ticker

In [13]:
df_gold['BMO.TO'].printSchema()

root
 |-- book_date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividends: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- average_price: double (nullable = true)
 |-- daily_return_percentage: double (nullable = true)
 |-- growth: double (nullable = true)
 |-- cumulative_growth: double (nullable = true)
 |-- volatility_30days: double (nullable = true)
 |-- dividend_yield_percentage: double (nullable = true)
 |-- 52week_high: double (nullable = true)
 |-- 52week_low: double (nullable = true)
 |-- ma50d: double (nullable = true)
 |-- ma200d: double (nullable = true)
 |-- vwap: double (nullable = true)



In [14]:
df_gold['BMO.TO'].show()

+----------+-------+-------+-------+-------+-------+---------+------+-------------+-----------------------+------+-----------------+-----------------+-------------------------+-----------+----------+-------+-------+-------+
| book_date|   open|   high|    low|  close| volume|dividends|ticker|average_price|daily_return_percentage|growth|cumulative_growth|volatility_30days|dividend_yield_percentage|52week_high|52week_low|  ma50d| ma200d|   vwap|
+----------+-------+-------+-------+-------+-------+---------+------+-------------+-----------------------+------+-----------------+-----------------+-------------------------+-----------+----------+-------+-------+-------+
|2020-01-02|79.7502|79.7738|79.1266|79.6002| 753600|      0.0|BMO.TO|      79.5627|                    0.0|   1.0|              0.0|             NULL|                      0.0|    79.7738|   79.1266|79.6002|79.6002|79.5627|
|2020-01-03|79.0082|79.4581|78.7241|79.4029|1473000|      0.0|BMO.TO|      79.1483|                -0.24

In [15]:
# Save gold parquet
stock_fetcher.save_parquet(df_gold, gold_path)

Prepare gold parquet for export to data visualization

In [16]:
# Read Parquet and transform to csv
stock_fetcher.read_parquet(tickers, gold_path)
final_file = None
for each in tickers:
  if final_file is None:
    final_file = stock_fetcher.data[each]
  else:
    final_file = final_file.union(stock_fetcher.data[each])

if final_file is not None:
    df_pd = final_file.toPandas()
    df_pd.to_csv("gold_file.csv", index=False)