In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Stock Analysis with Spark").getOrCreate()

In [3]:
#Read the csv data of the tickers
stocks = spark.read.load("StockData",
                         format="csv",
                         header=True, 
                         inferSchema=True)

In [4]:
stocks.show(30)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
| BRK-B|05/23/2023|  $323.11 |4031342|$328.19 |$329.27 |$322.97 |
| BRK-B|05/22/2023|  $329.13 |2763422|$330.75 |$331.49 |$328.35 |
| BRK-B|05/19/2023|  $330.39 |4323538|$331.00 |$333.94 |$329.12 |
| BRK-B|05/18/2023|  $329.76 |2808329|$326.87 |$329.98 |$325.85 |
| BRK-B|05/17/2023|  $327.39 |3047626|$325.02 |$328.26 |$324.82 |
| BRK-B|05/16/2023|  $323.75 |2139996|$322.46 |$324.69 |$322.36 |
| BRK-B|05/15/2023|  $323.53 |2191609|$322.89 |$323.83 |$320.13 |
| BRK-B|05

In [5]:
#Show the data schema
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [6]:
stocks.select(["Ticker", "Date", "Open"]).show(3)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
+------+----------+--------+
only showing top 3 rows



In [7]:
#Filter the df by showing data related to a specific ticker symbol
stocks.filter(stocks["Ticker"] == "V").show(3)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
|     V|05/30/2023|  $221.64 | 6916013|$225.01 |$225.47 |$219.75 |
|     V|05/26/2023|  $225.01 | 5067460|$223.82 |$226.21 |$223.34 |
+------+----------+----------+--------+--------+--------+--------+
only showing top 3 rows



In [8]:
#Filter the df by showing data related to a specific ticker symbol
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V")) & (stocks.Date == "05/31/2023")).show(15)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [9]:
#DEFINE UDF FUNCTIONS
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime

In [10]:
#Add a new columns containing the date parsed from string to datetime through the UDF
date_parser = udf(lambda d: datetime.strptime(d, "%m/%d/%Y"), DateType())
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))

In [11]:
stocks.show(3)
stocks.printSchema()

+------+----------+----------+-------+--------+--------+--------+----------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|ParsedDate|
+------+----------+----------+-------+--------+--------+--------+----------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |2023-05-31|
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |2023-05-30|
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |2023-05-26|
+------+----------+----------+-------+--------+--------+--------+----------+
only showing top 3 rows

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [12]:
#Convert Close, Open,High, Low to float types
from pyspark.sql.types import FloatType

# toFloat UDF
def toFloat(n):
    if isinstance(n, int): return float(n)
    if isinstance(n, float): return n
    if isinstance(n, str): return float(n.replace("$", ""))
    return None

toFloat_udf = udf(toFloat, FloatType())

# Applichiamo la UDF alle colonne richieste
stocks = (stocks.withColumn("Open", toFloat_udf(stocks["Open"]))
                .withColumn("Close", toFloat_udf(stocks["Close/Last"]))
                .withColumn("Low", toFloat_udf(stocks["Low"]))
                .withColumn("High", toFloat_udf(stocks["High"])))

In [13]:
stocks.createOrReplaceTempView("StocksTable")
stocks = spark.sql("SELECT Ticker, ParsedDate AS Date, Volume, Open, Low, High, Close FROM StocksTable")
stocks.show(3)
stocks.printSchema()

+------+----------+-------+------+------+------+------+
|Ticker|      Date| Volume|  Open|   Low|  High| Close|
+------+----------+-------+------+------+------+------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|
+------+----------+-------+------+------+------+------+
only showing top 3 rows

root
 |-- Ticker: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Close: float (nullable = true)



In [14]:
#Calulate statistics about the df
stocks.describe(["Volume", "Open", "Low", "High", "Close"]).show()

+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|             Close|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953464E7|101.16125813324399|100.26590135955216|101.96625521621753|101.14891782168563|
|    min|              961133|             12.07|              11.8|             12.45|             11.93|
|    max|           914080943|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



In [15]:
stocks.filter(stocks.Ticker == "V").describe(["Volume", "Open", "Low", "High", "Close"]).show()

+-------+-----------------+------------------+------------------+------------------+------------------+
|summary|           Volume|              Open|               Low|              High|             Close|
+-------+-----------------+------------------+------------------+------------------+------------------+
|  count|             1259|              1259|              1259|              1259|              1259|
|   mean|8269259.627482128|192.89731548209338|190.93404293287549|194.85096124432408|192.90231931654586|
| stddev|3744734.274750118|30.600622781295748|30.389683305829045|30.834919243002513| 30.57141006201524|
|    min|          1993541|            122.08|             121.6|            125.21|            121.73|
|    max|         38379570|            250.05|            248.22|            252.67|            250.93|
+-------+-----------------+------------------+------------------+------------------+------------------+



In [16]:
stocks.filter(stocks.Ticker == "QQQ").describe(["Volume", "Open", "Low", "High", "Close"]).show()

+-------+--------------------+-----------------+------------------+------------------+------------------+
|summary|              Volume|             Open|               Low|              High|             Close|
+-------+--------------------+-----------------+------------------+------------------+------------------+
|  count|                1259|             1259|              1259|              1259|              1259|
|   mean| 4.800409354487689E7|265.6845165128458|263.26520927980647|267.93787476749435|265.76679137020096|
| stddev|2.4497349118980378E7|71.38029094940865| 70.70549093113937| 71.92219873425739|  71.3698475978749|
|    min|             7089020|           145.12|            143.46|            147.98|             143.5|
|    max|           199448100|           405.57|            402.58|            408.71|            403.99|
+-------+--------------------+-----------------+------------------+------------------+------------------+



In [17]:
#Calculate MAX and MIN stock price for every ticker
maxCloses = stocks.groupby("Ticker").max("Close").show(15)

+------+----------+
|Ticker|max(Close)|
+------+----------+
| BRK-B|    359.57|
|  MSFT|    343.11|
|  META|    382.18|
|  TSLA|    409.97|
|  AAPL|    182.01|
|  AMZN|    186.57|
| GOOGL|    149.84|
|  NVDA|    401.11|
|     V|    250.93|
|   TSM|    140.66|
|   SPY|    477.71|
|   QQQ|    403.99|
+------+----------+



In [21]:
stocks.groupBy("Ticker").max("Close").withColumnRenamed("max(Close)", "MaxStockPrice").show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       359.57|
|  MSFT|       343.11|
|  META|       382.18|
|  TSLA|       409.97|
|  AAPL|       182.01|
|  AMZN|       186.57|
| GOOGL|       149.84|
|  NVDA|       401.11|
|     V|       250.93|
|   TSM|       140.66|
|   SPY|       477.71|
|   QQQ|       403.99|
+------+-------------+



In [24]:
import pyspark.sql.functions as f

In [25]:
#gather MaxStockPrice and the sum of all Volumes for each ticker
stocks.groupBy("Ticker").agg(f.max("Close").alias("MaxStockPrice"),
                                                  f.sum("Volume").alias("TotalVolume")
                                                 ).show(15)

+------+-------------+------------+
|Ticker|MaxStockPrice| TotalVolume|
+------+-------------+------------+
| BRK-B|       359.57|  5862401321|
|  MSFT|       343.11| 37976660472|
|  META|       382.18| 30148848043|
|  TSLA|       409.97|171802975076|
|  AAPL|       182.01|139310061360|
|  AMZN|       186.57|104503287430|
| GOOGL|       149.84| 43956560981|
|  NVDA|       401.11| 58787218324|
|     V|       250.93| 10410997871|
|   TSM|       140.66| 12506470104|
|   SPY|       477.71|107925285300|
|   QQQ|       403.99| 60437153773|
+------+-------------+------------+



In [28]:
#divide the Date column in three columns: year, month, day, weekofYear
stocks = (stocks.withColumn("Year", f.year(stocks.Date))
          .withColumn("Month", f.month(stocks.Date))
          .withColumn("Day", f.dayofmonth(stocks.Date))
          .withColumn("Week", f.weekofyear(stocks.Date)))

stocks.show(3)

+------+----------+-------+------+------+------+------+----+-----+---+----+
|Ticker|      Date| Volume|  Open|   Low|  High| Close|Year|Month|Day|Week|
+------+----------+-------+------+------+------+------+----+-----+---+----+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|2023|    5| 31|  22|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|2023|    5| 30|  22|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|2023|    5| 26|  21|
+------+----------+-------+------+------+------+------+----+-----+---+----+
only showing top 3 rows



In [29]:
#For each ticker, group by year and show maxClose, minClose and avgClose
yearlyMaxMin = stocks.groupby(["Ticker", "Year"]).agg(
    f.max("Close").alias("MaxCloseY"),
    f.min("Close").alias("MinCloseY"),
    f.avg("Close").alias("AvgCloseY"))

yearlyMaxMin.show(30)

+------+----+---------+---------+------------------+
|Ticker|Year|MaxCloseY|MinCloseY|         AvgCloseY|
+------+----+---------+---------+------------------+
| BRK-B|2023|   330.39|   293.51| 313.6467963653861|
| BRK-B|2018|   223.76|   184.91|204.63770345739417|
| BRK-B|2021|   300.17|   227.36| 272.7105555458674|
| BRK-B|2020|   233.92|   162.13|205.01023744123256|
| BRK-B|2019|   227.05|   191.66|208.07250019860646|
| BRK-B|2022|   359.57|    264.0|304.48920261337463|
|  MSFT|2019|   158.96|     97.4| 130.3820636688717|
|  MSFT|2021|   343.11|   212.25|275.94079347640746|
|  MSFT|2020|   231.65|   135.42|193.02612642431447|
|  MSFT|2018|   115.61|    94.13| 106.6812163688041|
|  MSFT|2023|   332.89|   222.31| 273.9646604963877|
|  MSFT|2022|   334.75|   214.25| 268.9170912784409|
|  META|2020|   303.91|   146.01|234.55086984747484|
|  META|2021|   382.18|   245.64| 321.1662303985111|
|  META|2018|    217.5|   124.06|167.72925619176917|
|  META|2022|   338.54|    88.91|180.187689762

In [None]:
#For each ticker, group by year and show maxClose, minClose and avgClose, but in SQL fashion
stocks.createOrReplaceTempView("StocksTable")#because we added the new columns for year, month ecc we have to create the table again!
yearlyMaxMinSQL = spark.sql("SELECT Ticker, Year, MAX(Close) AS MaxCloseY, MIN(Close) as MinCloseY, AVG(Close) AS AvgCloseY\
                            FROM StocksTable\
                            GROUP BY Ticker, Year")

yearlyMaxMin.show(30)