In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev, lag, round, to_date,lit
from pyspark.sql.window import Window
from datetime import datetime

In [0]:

# 2. Load raw stock data

raw_path = "s3://stock-analysis-yk/stockdatadelta/"
# List all subfolders (each is one stock’s Delta table)
folders = [f.path for f in dbutils.fs.ls(raw_path) if f.isDir()]
# Initialize combined_df as None
df_raw = None

# Loop through each stock folder and union
for folder in folders:
    stock_name = folder.rstrip('/').split('/')[-1]
    df = spark.read.format("delta").load(folder).withColumn("ticker", lit(stock_name))
    
    if df_raw is None:
        df_raw = df
    else:
        df_raw = df_raw.unionByName(df)

# Now combined_df contains all the stock data



In [0]:
display(df_raw)

In [0]:
display(df_raw.summary())

In [0]:
from pyspark.sql.functions import count, col
duplicates = df_raw.groupBy(df.columns)\
    .agg(count("*").alias("count"))\
    .filter(col("count") > 1)
display(duplicates)

In [0]:
# 3. Basic cleaning
df = df_raw.withColumn("Date", to_date(col("Date"))) \
           .dropna(subset=["Ticker", "Date", "Close"]) \
           .filter(col("Close") > 0)

display(df)

In [0]:
# 4. Create window spec to calculate rolling metrics per stock
windowSpec = Window.partitionBy("Ticker").orderBy("Date")

# 5. Add technical indicators
df_transformed = df \
    .withColumn("Prev_Close", lag("Close").over(windowSpec)) \
    .withColumn("Daily_Return", round(((col("Close") - col("Prev_Close")) / col("Prev_Close")) * 100, 2)) \
    .withColumn("MA_7", round(avg("Close").over(windowSpec.rowsBetween(-6, 0)), 2)) \
    .withColumn("MA_14", round(avg("Close").over(windowSpec.rowsBetween(-13, 0)), 2)) \
    .withColumn("Volatility_7", round(stddev("Close").over(windowSpec.rowsBetween(-6, 0)), 2)) \
    .drop("Prev_Close")
df_transformed = df_transformed.dropna(subset=["Daily_Return", "MA_7", "MA_14", "Volatility_7"])
display(df_transformed)

In [0]:
# 6. Save cleaned data
cleaned_path = "s3://stock-analysis-yk/stockdatacleaned/"  # or "s3a://your-bucket/stock_data/cleaned"
df_transformed.write.format("delta").option("header",True).mode("overwrite").save(cleaned_path)

print("✅ Data cleaned and stored at:", cleaned_path)
#display(df_transformed)