In [None]:
!pip install -U pyspark==3.2.2

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from tempfile import TemporaryDirectory

tmpdir = TemporaryDirectory()
builder = (
            SparkSession.builder.master("local[*]")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.warehouse.dir", f"file:///{tmpdir.name}")
        )

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql import functions as F

In [None]:
schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", IntegerType(), True),
    StructField("ticker", StringType(), True)
])

stocksDF = spark.read.csv('stock_prices.csv', header=True, schema=schema)
stocksDF.show()

In [None]:
stocksDF = stocksDF.withColumn("date", F.to_date(F.col("date"),"M/d/yyyy"))
stocksDF.show()

### Question 1: Compute the average daily return of all stocks for every date.

In [None]:
# stock market return is the growth rate of annual average stock market index.

window1 = Window.partitionBy("ticker").orderBy(F.asc("date"))

result1 = (stocksDF
 .withColumn("lastClose", F.lag("close").over(window1))
 .withColumn("daily_return", (F.col("close") - F.col("lastClose"))/F.col("lastClose"))
 .groupBy("date").agg(F.mean(F.col("daily_return")).alias("average_daily_return"))
 .orderBy("date")
)
result1.show()

In [None]:
result1.write.parquet("result1_output")

### Question 2: Which stock was traded most frequently - as measured by closing price * volume - on average?

In [None]:
result2 = (stocksDF
 .withColumn("shares_traded", F.col("close") * F.col("volume"))
 .groupBy("ticker").agg(F.mean(F.col("shares_traded")).alias("average_shares_traded"))
 .orderBy(F.desc("average_shares_traded"))
 .limit(1)
 )
result2.show()

In [None]:
result2.write.parquet("result2_output")

### Question 3: Which stock was the most volatile as measured by the annualized standard deviation of daily returns?

In [None]:
from math import sqrt as math_sqrt

In [None]:
# to calc annual standard deviation, we multiply the daily standard deviation by the square root of 250 (assuming 250 trading days in a year).

window1 = Window.partitionBy("ticker").orderBy(F.asc("date"))

result3 = (stocksDF
 .withColumn("lastClose", F.lag("close", 30).over(window1))
 .withColumn("daily_return", (F.col("close") - F.col("lastClose"))/F.col("lastClose"))
 .groupBy("ticker").agg((F.stddev(F.col("daily_return")) *  F.lit(math_sqrt(250))).alias("annual_standard_deviation"))
 .orderBy(F.desc("annual_standard_deviation"))
 .limit(1)
)
result3.show()

In [None]:
result3.write.parquet("result3_output")

### Question 4: What were the top three 30-day return dates (% increase in closing price as compared to the closing price 30 days prior) — present ticker and date combinations

In [None]:
window1 = Window.partitionBy("ticker").orderBy(F.asc("date"))

result4 = (stocksDF
 .withColumn("lastClose", F.lag("close").over(window1))
 .withColumn("30_day_return", (F.col("close") - F.col("lastClose"))/F.col("lastClose"))
 .select("ticker", "30_day_return")
 .orderBy(F.desc("30_day_return"))
 .limit(3)
)
result4.show()

In [None]:
result4.write.parquet("result4_output")