In [None]:
from pyspark.sql.functions import col, to_timestamp
# List of stock symbols
stocks = ["MSFT", "AAPL", "GOOGL", "AMZN"]

# Read all CSV files into a list of DataFrames
dfs = []
for stock in stocks:
    df = spark.read.csv(f"abfss://bronze@stockmarketdatalake123.dfs.core.windows.net/stock_data_raw/{stock}_stock.csv", header=True, inferSchema=True)
    dfs.append(df)

# Combine all DataFrames into one
combined_df = dfs[0]
for df in dfs[1:]:
    combined_df = combined_df.union(df)

# Clean the data
# Remove null values
df_clean = combined_df.na.drop()

# Convert Date column to timestamp
df_clean = df_clean.withColumn("Date", to_timestamp("Date", "yyyy-MM-dd"))

# Filter for the desired date range (October 1, 2024, to March 21, 2025)
df_clean = df_clean.filter((df_clean.Date >= "2024-10-01") & (df_clean.Date <= "2025-03-21"))

# Save to the silver layer as Parquet
df_clean.write.mode("overwrite").parquet("abfss://silver@stockmarketdatalake123.dfs.core.windows.net/stock_data_clean/")

# Display the cleaned data
df_clean.show()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Read the cleaned data from the silver layer
df_clean = spark.read.parquet("abfss://silver@stockmarketdatalake123.dfs.core.windows.net/stock_data_clean/")

# Define a window for the 7-day moving average, partitioned by Symbol
window_spec = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-6, 0)

# Compute the 7-day moving average for the Close price
df_gold = df_clean.withColumn("moving_avg_7d", avg("Close").over(window_spec))

# Save to the gold layer as Parquet
df_gold.write.mode("overwrite").parquet("abfss://gold@stockmarketdatalake123.dfs.core.windows.net/stock_data_aggregated/")

# Display the result
df_gold.head(3)

In [None]:
# Add this code to your notebook to check the silver layer data
df_check = spark.read.parquet("abfss://silver@stockmarketdatalake123.dfs.core.windows.net/stock_data_clean/")
df_check.groupBy("Symbol").count().show()