In [1]:
## 1. Reading and Cleaning Stock Price Data

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
ss = SparkSession.builder.appName("Stock Price Analysis").getOrCreate()

24/10/20 14:13:04 WARN Utils: Your hostname, dhrumil-Inspiron-14-7440-2-in-1 resolves to a loopback address: 127.0.1.1; using 192.168.1.152 instead (on interface wlp0s20f3)
24/10/20 14:13:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/20 14:13:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
## Reading CSV data => Stocks
df = ss.read.csv("StockData",header=True)
df.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [6]:
df.select('Ticker').show(5)

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 5 rows



In [7]:
df_filter = df.filter(df["Ticker"] == "MSFT")
df_filter.select(["Ticker"]).show(5)
# df_filter = df[df["Ticker"]== "MSFT"].show()


+------+
|Ticker|
+------+
|  MSFT|
|  MSFT|
|  MSFT|
|  MSFT|
|  MSFT|
+------+
only showing top 5 rows



In [8]:
df.filter(((df["Ticker"] == "MSFT") | (df["Ticker"] == "V")) & (df["Date"] == "05/31/2023")).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [9]:
df.filter((df["Ticker"] == "MSFT") & (df["Date"].isin(["05/31/2023", "05/30/2023", "05/29/2023", "05/12/2022"]))).show(20)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/12/2022|  $255.35 |51033800|$257.69 |$259.88 |$250.02 |
+------+----------+----------+--------+--------+--------+--------+



In [10]:
df.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [11]:
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf

In [12]:
date_parser = udf(lambda date: datetime.strptime(date,"%m/%d/%Y"), DateType())

In [43]:
df = df.withColumn("PharseDate", date_parser(df.Date))

In [44]:
df.show(5)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|PharseDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
+------+----------+----------+-------+------+------+------+----------+------+
only showing top 5 rows



In [45]:
from pyspark.sql.types import FloatType

In [46]:
def pharse_number(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, int) or isinstance(value, float):
        return value
    else:
        return None
    
    # if value is None:
    #     return None  # Explicitly return None for nulls
    # elif isinstance(value, str):
    #     try:
    #         val = float(value.strip("$"))
    #         return val
    #     except ValueError:
    #         return None
    # elif isinstance(value, (int, float)):
    #     return value
    # else:
    #     return None

num_parser = udf(pharse_number, FloatType())

In [47]:
df = (df.withColumn("Open", num_parser(df.Open))
        .withColumn("High", num_parser(df.High))
        .withColumn("Low", num_parser(df.Low))
        .withColumn("Close", num_parser(df["Close/Last"])))

In [48]:
df.show()

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|PharseDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|05/23/2023|  $323.11 |4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|05/22/2023|  $329.13 |2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|05/19/2023|  $330.39 |4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|05/18/2023|  $329.76 |2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|05/17/2023|  $327.39 |3047626|325.02|328.26|324.82|2023-

In [49]:
from pyspark.sql.types import IntegerType
intger_pharse = udf(lambda volume: int(volume), IntegerType())
df = df.withColumn("Volume", intger_pharse(df["Volume"]))
df.show()

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|PharseDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|05/23/2023|  $323.11 |4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|05/22/2023|  $329.13 |2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|05/19/2023|  $330.39 |4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|05/18/2023|  $329.76 |2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|05/17/2023|  $327.39 |3047626|325.02|328.26|324.82|2023-

In [50]:
df.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- PharseDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [68]:
cleanned_df = df.select(["Ticker", "PharseDate", "Close", "Open", "Low", "High", "Volume"])
cleanned_df.show(4)

+------+----------+------+------+------+------+-------+
|Ticker|PharseDate| Close|  Open|   Low|  High| Volume|
+------+----------+------+------+------+------+-------+
| BRK-B|2023-05-31|321.08|321.12|319.39|322.41|6175417|
| BRK-B|2023-05-30|322.19|321.86| 319.0|322.47|3232461|
| BRK-B|2023-05-26| 320.6|320.44|319.67|322.63|3229873|
| BRK-B|2023-05-25|319.02|320.56|317.71|320.56|4251935|
+------+----------+------+------+------+------+-------+
only showing top 4 rows



In [69]:
## 2. Basic Stock Analysis

In [70]:
cleanned_df.groupBy("Ticker").max("Open").show()

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|  MSFT|   344.62|
|  META|   381.68|
|  TSLA|   411.47|
|  AAPL|   182.63|
|  AMZN|    187.2|
| GOOGL|   151.25|
|  NVDA|   405.95|
|     V|   250.05|
|   TSM|   141.61|
|   SPY|   479.22|
|   QQQ|   405.57|
+------+---------+



In [71]:
import pyspark.sql.functions as func

In [72]:
cleanned_df.groupBy("Ticker").max("Open").withColumnRenamed("max(OPen)", "Max Stock Price").show()

+------+---------------+
|Ticker|Max Stock Price|
+------+---------------+
| BRK-B|         361.39|
|  MSFT|         344.62|
|  META|         381.68|
|  TSLA|         411.47|
|  AAPL|         182.63|
|  AMZN|          187.2|
| GOOGL|         151.25|
|  NVDA|         405.95|
|     V|         250.05|
|   TSM|         141.61|
|   SPY|         479.22|
|   QQQ|         405.57|
+------+---------------+



In [73]:
cleanned_df.groupby("Ticker").agg(func.max("Open").alias("MaxStock")).show()

+------+--------+
|Ticker|MaxStock|
+------+--------+
| BRK-B|  361.39|
|  MSFT|  344.62|
|  META|  381.68|
|  TSLA|  411.47|
|  AAPL|  182.63|
|  AMZN|   187.2|
| GOOGL|  151.25|
|  NVDA|  405.95|
|     V|  250.05|
|   TSM|  141.61|
|   SPY|  479.22|
|   QQQ|  405.57|
+------+--------+



In [74]:
cleanned_df.groupby("Ticker").agg((func.max("Open").alias("MaxStock")),
                                 func.sum("Volume").alias("SummationOFVolume")).show()

+------+--------+-----------------+
|Ticker|MaxStock|SummationOFVolume|
+------+--------+-----------------+
| BRK-B|  361.39|       5862401321|
|  MSFT|  344.62|      37976660472|
|  META|  381.68|      30148848043|
|  TSLA|  411.47|     171802975076|
|  AAPL|  182.63|     139310061360|
|  AMZN|   187.2|     104503287430|
| GOOGL|  151.25|      43956560981|
|  NVDA|  405.95|      58787218324|
|     V|  250.05|      10410997871|
|   TSM|  141.61|      12506470104|
|   SPY|  479.22|     107925285300|
|   QQQ|  405.57|      60437153773|
+------+--------+-----------------+



In [75]:
cleanned_df.show(3)

+------+----------+------+------+------+------+-------+
|Ticker|PharseDate| Close|  Open|   Low|  High| Volume|
+------+----------+------+------+------+------+-------+
| BRK-B|2023-05-31|321.08|321.12|319.39|322.41|6175417|
| BRK-B|2023-05-30|322.19|321.86| 319.0|322.47|3232461|
| BRK-B|2023-05-26| 320.6|320.44|319.67|322.63|3229873|
+------+----------+------+------+------+------+-------+
only showing top 3 rows



In [76]:
cleanned_df = (cleanned_df.withColumn("Year", func.year(cleanned_df.PharseDate))
                            .withColumn("Day", func.dayofmonth(cleanned_df.PharseDate))
                            .withColumn("Week", func.weekofyear(cleanned_df.PharseDate))
                            .withColumn("Month", func.month(cleanned_df.PharseDate)))

In [77]:
cleanned_df.show()

+------+----------+------+------+------+------+-------+----+---+----+-----+
|Ticker|PharseDate| Close|  Open|   Low|  High| Volume|Year|Day|Week|Month|
+------+----------+------+------+------+------+-------+----+---+----+-----+
| BRK-B|2023-05-31|321.08|321.12|319.39|322.41|6175417|2023| 31|  22|    5|
| BRK-B|2023-05-30|322.19|321.86| 319.0|322.47|3232461|2023| 30|  22|    5|
| BRK-B|2023-05-26| 320.6|320.44|319.67|322.63|3229873|2023| 26|  21|    5|
| BRK-B|2023-05-25|319.02|320.56|317.71|320.56|4251935|2023| 25|  21|    5|
| BRK-B|2023-05-24| 320.2|322.71|319.56| 323.0|3075393|2023| 24|  21|    5|
| BRK-B|2023-05-23|323.11|328.19|322.97|329.27|4031342|2023| 23|  21|    5|
| BRK-B|2023-05-22|329.13|330.75|328.35|331.49|2763422|2023| 22|  21|    5|
| BRK-B|2023-05-19|330.39| 331.0|329.12|333.94|4323538|2023| 19|  20|    5|
| BRK-B|2023-05-18|329.76|326.87|325.85|329.98|2808329|2023| 18|  20|    5|
| BRK-B|2023-05-17|327.39|325.02|324.82|328.26|3047626|2023| 17|  20|    5|
| BRK-B|2023

In [81]:
yearly = cleanned_df.groupby("Ticker", "Year").agg(func.max("Open").alias("YearlyOpenMax"), func.min("Open").alias("YearlyOpenLow"))

In [82]:
yearly.show(4)

+------+----+-------------+-------------+
|Ticker|Year|YearlyOpenMax|YearlyOpenLow|
+------+----+-------------+-------------+
| BRK-B|2023|        331.0|       294.68|
| BRK-B|2018|        224.0|       185.43|
| BRK-B|2021|       300.88|       228.21|
| BRK-B|2020|       233.92|        165.3|
+------+----+-------------+-------------+
only showing top 4 rows



In [83]:
weekly = cleanned_df.groupby("Ticker", "Year", "Week").agg(func.max("Open").alias("WeeklyOpenMax"), func.min("Open").alias("WeeklyOpenLow"))
monthly = cleanned_df.groupby("Ticker", "Year", "Month").agg(func.max("Open").alias("MonthlyOpenMax"), func.min("Open").alias("MonthlyOpenLow"))

In [84]:
weekly.show(5)

+------+----+----+-------------+-------------+
|Ticker|Year|Week|WeeklyOpenMax|WeeklyOpenLow|
+------+----+----+-------------+-------------+
| BRK-B|2022|  14|        352.0|       341.17|
| BRK-B|2022|  10|       326.59|       322.49|
| BRK-B|2021|  14|       264.22|       260.02|
| BRK-B|2018|  48|       217.23|        209.3|
| BRK-B|2020|  19|       180.05|        173.4|
+------+----+----+-------------+-------------+
only showing top 5 rows



In [85]:
 monthly.show(5)

+------+----+-----+--------------+--------------+
|Ticker|Year|Month|MonthlyOpenMax|MonthlyOpenLow|
+------+----+-----+--------------+--------------+
| BRK-B|2022|   10|        297.98|        260.58|
| BRK-B|2018|    9|        222.13|        209.21|
| BRK-B|2021|   10|        290.85|        273.02|
| BRK-B|2020|   10|        216.74|        200.03|
| BRK-B|2019|    9|        212.24|        201.19|
+------+----+-----+--------------+--------------+
only showing top 5 rows

