In [None]:
# Mounting Azure Blob Storage in Databricks
storage_account_name = "myazurestorageaccount"
container_name = "mycontainer"
storage_account_key = "mysecretkey"
mount_point = f"/mnt/{container_name}"
dbutils.fs.mount(
  source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
  mount_point = mount_point,
  extra_configs = {
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
  }
)

In [0]:
# Reading Parquet files from Azure Blob Storage
paths = [
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/RY_stock_data.parquet",
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/TD_stock_data.parquet",
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/BNS_stock_data.parquet",
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/BMO_stock_data.parquet",
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/CM_stock_data.parquet",
]

df = spark.read.option('mergeSchema', 'true').parquet(*paths)
base_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net"

In [0]:
# Importing necessary libraries
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a window specification for calculating average close price
symbols = ["RY", "TD", "BNS", "BMO", "CM"]

dfs = []

for symbol in symbols:
    path = f"{base_path}/{symbol}_stock_data.parquet"
    df = spark.read.parquet(path)
    
    # Add symbol column
    df = df.withColumn("symbol", F.lit(symbol))
    
    dfs.append(df)
    F.avg("Close").over(window_spec)

In [0]:
# Combine all DataFrames into one
from functools import reduce

df_all = reduce(lambda a, b: a.unionByName(b), dfs)


### Volatility and momentum metrics

In [0]:

window_spec = Window.partitionBy("symbol").orderBy("Date").rowsBetween(-6, 0)
df_with_rolling_avg = df_all.withColumn(
    "rolling_avg_close_7d",
    F.avg("Close").over(window_spec)
)

df_with_rolling_avg.show(40)


+-----------------+-----------------+-----------------+-----------------+-------+-------------------+------+--------------------+
|            Close|             High|              Low|             Open| Volume|               Date|symbol|rolling_avg_close_7d|
+-----------------+-----------------+-----------------+-----------------+-------+-------------------+------+--------------------+
|93.24373626708984|93.98671308718268|92.97702738472545|93.23420887349911| 592800|2024-01-02 00:00:00|   BMO|   93.24373626708984|
|92.26262664794922|92.81509920686439|91.99591776220316|92.51028801354431| 409900|2024-01-03 00:00:00|   BMO|   92.75318145751953|
|92.41502380371094|93.31993721015978| 91.9387558847184|92.07211758792629| 920400|2024-01-04 00:00:00|   BMO|   92.64046223958333|
|92.97703552246094|93.98672131329005| 92.1769015380647|92.46266523262177| 560700|2024-01-05 00:00:00|   BMO|   92.72460556030273|
| 93.9486083984375| 94.0438677973859|92.66268645262404|92.97702502770142| 428800|2024-01-0

In [0]:
display(df_with_rolling_avg)



Close,High,Low,Open,Volume,Date,symbol,rolling_avg_close_7d
93.24373626708984,93.98671308718268,92.97702738472545,93.23420887349911,592800,2024-01-02T00:00:00,BMO,93.24373626708984
92.26262664794922,92.8150992068644,91.99591776220316,92.51028801354433,409900,2024-01-03T00:00:00,BMO,92.75318145751952
92.41502380371094,93.31993721015978,91.9387558847184,92.07211758792629,920400,2024-01-04T00:00:00,BMO,92.64046223958331
92.97703552246094,93.98672131329003,92.1769015380647,92.46266523262176,560700,2024-01-05T00:00:00,BMO,92.72460556030272
93.9486083984375,94.0438677973859,92.66268645262404,92.97702502770142,428800,2024-01-08T00:00:00,BMO,92.96940612792967
92.30072784423828,93.5866498234728,92.27215293071984,93.2723112402258,433700,2024-01-09T00:00:00,BMO,92.85795974731444
90.95764923095705,92.8436734727108,90.8338221826598,92.27215339763366,916300,2024-01-10T00:00:00,BMO,92.58648681640624
90.17657470703124,90.824299429821,89.29071600185219,90.824299429821,596900,2024-01-11T00:00:00,BMO,92.14832087925502
89.83365631103516,91.30056243494012,89.6621995623379,90.92907402397474,383900,2024-01-12T00:00:00,BMO,91.80132511683873
89.91939544677734,90.02417498302425,88.72872550512847,89.41455255428183,544000,2024-01-16T00:00:00,BMO,91.44480678013392


In [0]:
# Show the count of records for each bank
df_all.groupBy("symbol").count().show()


+------+-----+
|symbol|count|
+------+-----+
|    RY|  391|
|    TD|  391|
|   BNS|  391|
|   BMO|  391|
|    CM|  391|
+------+-----+



In [0]:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Computing 7d avg close and 30d stddev close using window functions
window_7d = Window.partitionBy("symbol").orderBy("Date").rowsBetween(-6, 0)
window_30d = Window.partitionBy("symbol").orderBy("Date").rowsBetween(-29, 0)
window_base = Window.partitionBy("symbol").orderBy("Date")

# Momentum calculation as the percentage change over 7 days
df_enhanced = df_all \
    .withColumn("rolling_avg_close_7d", F.avg("Close").over(window_7d)) \
    .withColumn("rolling_stddev_close_30d", F.stddev("Close").over(window_30d)) \
    .withColumn("lag_7_close", F.lag("Close", 7).over(window_base)) \
    .withColumn("momentum_7d", (F.col("Close") - F.col("lag_7_close")) / F.col("lag_7_close"))

df_enhanced.show(10)


+-----------------+-----------------+-----------------+-----------------+------+-------------------+------+--------------------+------------------------+-----------------+--------------------+
|            Close|             High|              Low|             Open|Volume|               Date|symbol|rolling_avg_close_7d|rolling_stddev_close_30d|      lag_7_close|         momentum_7d|
+-----------------+-----------------+-----------------+-----------------+------+-------------------+------+--------------------+------------------------+-----------------+--------------------+
|93.24373626708984|93.98671308718268|92.97702738472545|93.23420887349911|592800|2024-01-02 00:00:00|   BMO|   93.24373626708984|                    NULL|             NULL|                NULL|
|92.26262664794922|92.81509920686439|91.99591776220316|92.51028801354431|409900|2024-01-03 00:00:00|   BMO|   92.75318145751953|      0.6937492647816869|             NULL|                NULL|
|92.41502380371094|93.3199372101597

### Rank stocks by volatility

In [0]:
# Adding 'month' to get latest row per symbol-month
df_monthly = df_enhanced \
    .filter(F.col("rolling_stddev_close_30d").isNotNull()) \
    .withColumn("month", F.date_format("Date", "yyyy-MM"))

window_monthly = Window.partitionBy("symbol", "month").orderBy(F.col("Date").desc())

df_latest_per_month = df_monthly \
    .withColumn("row_num", F.row_number().over(window_monthly)) \
    .filter(F.col("row_num") == 1)

# Rank banks within each month by volatility
window_rank = Window.partitionBy("month").orderBy(F.col("rolling_stddev_close_30d").desc())

df_monthly_ranked = df_latest_per_month \
    .withColumn("volatility_rank", F.dense_rank().over(window_rank)) \
    .select("symbol", "month", "rolling_stddev_close_30d", "volatility_rank")

# 3. Append to original enhanced DataFrame by joining on symbol & month
# Drop rolling_stddev_close_30d from df_monthly_ranked to avoid conflict
df_monthly_ranked_clean = df_monthly_ranked.drop("rolling_stddev_close_30d")

# Join the monthly ranked DataFrame with the enhanced DataFrame
df_combined = df_enhanced \
    .withColumn("month", F.date_format("Date", "yyyy-MM")) \
    .join(df_monthly_ranked_clean, on=["symbol", "month"], how="left")







In [0]:
# Save the final DataFrame to Azure Blob Storage in Parquet format
df_combined.write.mode("overwrite").parquet(f"{base_path}/advanced_bank_data")