In [21]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DateType

orderSchema = StructType([
    StructField("Date", StringType()),
    StructField("Open", FloatType()),
    StructField("High", FloatType()),
    StructField("Low", FloatType()),
    StructField("Close", FloatType()),
    StructField("Volume", StringType())
    ])

df = spark.read.format("csv").option("header","true").schema(orderSchema).load("Files/apple_stock_data.csv")
df = df.withColumn("Volume", col("Volume").cast("integer"))
# df = df.withColumn("date", col("Date").cast("string"))
display(df)

StatementMeta(, a3bff8dc-a48a-4209-b7a2-a582f2bf4146, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2c3f864e-3031-4f88-ad17-35f9eed2c518)

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, min, stddev, round, to_date, year, month
from pyspark.sql.window import Window

# df = df.withColumn("Date", to_date(col("Date")))

df = df.withColumn("year", year(col("date"))) \
       .withColumn("month", month(col("date")))


# Calculate daily return %
df = df.withColumn("daily_return_pct", round(((col("close") - col("open")) / col("open")) * 100, 2))

# Define 5-day window
window_spec = Window.orderBy("date").rowsBetween(-4, 0)

# Add moving average and rolling volatility
df = df.withColumn("ma_5", round(avg(col("close")).over(window_spec), 2)) \
       .withColumn("volatility_5", round(stddev(col("close")).over(window_spec), 2))

# Display results
df.select("date", "open", "close", "daily_return_pct", "ma_5", "volatility_5") \
    .filter(col("date").isNotNull())\
  .orderBy("date") \
  .show(20)


StatementMeta(, a3bff8dc-a48a-4209-b7a2-a582f2bf4146, 24, Finished, Available, Finished)

+----------+------+------+----------------+------+------------+
|      date|  open| close|daily_return_pct|  ma_5|volatility_5|
+----------+------+------+----------------+------+------------+
|01/13/2025|233.53| 234.4|            0.37| 234.4|        NULL|
|01/14/2025|234.75|233.28|           -0.63|233.84|        0.79|
|01/15/2025|234.64|237.87|            1.38|235.18|        2.39|
|01/16/2025|237.35|228.26|           -3.83|233.45|        3.98|
|01/17/2025|232.12|229.98|           -0.92|232.76|        3.78|
|01/21/2025| 224.0|222.64|           -0.61|230.41|        5.68|
|01/22/2025|219.79|223.83|            1.84|228.52|        6.05|
|01/23/2025|224.74|223.66|           -0.48|225.67|        3.24|
|01/24/2025|224.78|222.78|           -0.89|224.58|        3.06|
|01/27/2025|224.02|229.86|            2.61|224.55|        3.01|
|01/28/2025|230.85|238.26|            3.21|227.68|        6.55|
|01/29/2025|234.12|239.36|            2.24|230.78|        7.83|
|01/30/2025|238.67|237.59|           -0.

In [25]:
df.write.format("delta").saveAsTable("apple_stock_data")

StatementMeta(, a3bff8dc-a48a-4209-b7a2-a582f2bf4146, 27, Finished, Available, Finished)

In [26]:
%%sql
SELECT 
    date,
    ROUND(((close - open) / open) * 100, 2) AS daily_return_pct,
    ROUND(AVG(close) OVER (ORDER BY date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), 2) AS ma_5,
    ROUND(STDDEV(close) OVER (ORDER BY date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), 2) AS volatility_5
FROM apple_stock_data
where date is not null
ORDER BY date;

StatementMeta(, a3bff8dc-a48a-4209-b7a2-a582f2bf4146, 28, Finished, Available, Finished)

<Spark SQL result set with 250 rows and 4 fields>

In [38]:
%%sql
CREATE OR REPLACE TABLE apple_stock_metrics AS
SELECT 
    date,
    open,
    close,
    ROUND(((close - open) / open) * 100, 2) AS daily_return_pct,
    ROUND(AVG(close) OVER (ORDER BY date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), 2) AS ma_5,
    ROUND(STDDEV(close) OVER (ORDER BY date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), 2) AS volatility_5
FROM apple_stock_data
where date is not null;


StatementMeta(, fd154a6a-8fe9-4c2f-b4e3-25e68a0a3d78, 40, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [39]:
%%sql
select * from apple_stock_metrics;

StatementMeta(, fd154a6a-8fe9-4c2f-b4e3-25e68a0a3d78, 41, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 6 fields>