# Stock Price Analysis using Spark

## 1. Reading and Cleaning Stock Price Data

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Stock Price Analysis").getOrCreate()

23/06/11 16:23:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
## Reading CSV data => Stocks
stocks = spark.read.csv("StockData", header=True)

In [4]:
## Seeing Data => Dataframe
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [5]:
## Seeing Schema of the Data => Data Types in Dataframe
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [6]:
## Basic select operation => Select Ticker, Date and Close price

In [7]:
stocks.select("Ticker").show(3)

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 3 rows



In [8]:
stocks.select(["Ticker", "Date", "Open"]).show(5)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
+------+----------+--------+
only showing top 5 rows



In [9]:
## Filtering Data => Select rows containing Microsoft Stock in last one month
stocks.filter(stocks.Ticker == "MSFT").show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  MSFT|05/24/2023|  $313.85 |23384890|$314.73 |$316.50 |$312.61 |
|  MSFT|05/23/2023|  $315.26 |30797170|$320.03 |$322.72 |$315.25 |
|  MSFT|05/22/2023|  $321.18 |24115660|$318.60 |$322.59 |$318.01 |
|  MSFT|05/19/2023|  $318.34 |27546700|$316.74 |$318.75 |$316.37 |
|  MSFT|05/18/2023|  $318.52 |27275990|$314.53 |$319.04 |$313.72 |
|  MSFT|05/17/2023|  $314.00 |24315010|$312.29 |$314.43 |$310.74 |
+------+----------+----------+--------+--------+--------+--------+
only showing top 10 rows



In [10]:
stocks.filter((stocks.Ticker == "MSFT") & (stocks.Date == "05/31/2023")).show(5)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
+------+----------+----------+--------+--------+--------+--------+



In [11]:
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V")) & (stocks.Date == "05/31/2023")).show(15)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [12]:
stocks.filter((stocks.Ticker.isin(["MSFT", "QQQ", "SPY", "V", "TSLA"])) & (stocks.Date == "05/31/2023")).show(15)

+------+----------+----------+---------+--------+--------+--------+
|Ticker|      Date|Close/Last|   Volume|    Open|    High|     Low|
+------+----------+----------+---------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 | 45950550|$332.29 |$335.94 |$327.33 |
|  TSLA|05/31/2023|  $203.93 |150711700|$199.78 |$203.95 |$195.12 |
|     V|05/31/2023|  $221.03 | 20460620|$219.96 |$221.53 |$216.14 |
|   SPY|05/31/2023|    417.85|110811800|  418.28|  419.22|  416.22|
|   QQQ|05/31/2023|    347.99| 65105380|  348.37|   350.6|  346.51|
+------+----------+----------+---------+--------+--------+--------+



In [13]:
## User Defined Functions
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

In [15]:
from datetime import datetime

In [16]:
date_parser = udf(lambda date: datetime.strptime(date,"%m/%d/%Y"), DateType())

In [17]:
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))

In [18]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [19]:
def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, int) or isinstance(value, float):
        return value
    else:
        return None
    
from pyspark.sql.types import FloatType
parser_number = udf(num_parser, FloatType())

In [20]:
stocks = (stocks.withColumn("Open", parser_number(stocks.Open))
                .withColumn("Close", parser_number(stocks["Close/Last"]))
                .withColumn("Low", parser_number(stocks.Low))
                .withColumn("High", parser_number(stocks.High)))

In [21]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [22]:
from pyspark.sql.types import IntegerType
parse_int = udf(lambda value: int(value), IntegerType())

In [23]:
## Changing the datatype of the column
stocks = stocks.withColumn("Volume", parse_int(stocks.Volume))

In [24]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [25]:
cleaned_stocks = stocks.select(["Ticker", "ParsedDate", "Volume", "Open", "Low", "High", "Close"])

In [26]:
cleaned_stocks.show(5)

+------+----------+-------+------+------+------+------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|
+------+----------+-------+------+------+------+------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|
+------+----------+-------+------+------+------+------+
only showing top 5 rows



In [27]:
## Calculating basic stastics about data => Calculate average stock price
cleaned_stocks.describe(["Volume", "Open", "Low", "High", "Close"]).show()

23/06/11 16:23:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|             Close|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953464E7|101.16125813324399|100.26590135955216|101.96625521621753|101.14891782168563|
|    min|              961133|             12.07|              11.8|             12.45|             11.93|
|    max|           914080943|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



## 2. Basic Stock Analysis

In [42]:
## Calculate maximum stock price for various stocks
cleaned_stocks.groupBy("Ticker").max("Open").show(15)

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|  MSFT|   344.62|
|  META|   381.68|
|  TSLA|   411.47|
|  AAPL|   182.63|
|  AMZN|    187.2|
| GOOGL|   151.25|
|  NVDA|   405.95|
|     V|   250.05|
|   TSM|   141.61|
|   SPY|   479.22|
|   QQQ|   405.57|
+------+---------+



In [45]:
cleaned_stocks.groupBy("Ticker").max("Open").withColumnRenamed("max(Open)", "MaxStockPrice").show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|  MSFT|       344.62|
|  META|       381.68|
|  TSLA|       411.47|
|  AAPL|       182.63|
|  AMZN|        187.2|
| GOOGL|       151.25|
|  NVDA|       405.95|
|     V|       250.05|
|   TSM|       141.61|
|   SPY|       479.22|
|   QQQ|       405.57|
+------+-------------+



In [46]:
import pyspark.sql.functions as func
cleaned_stocks.groupBy("Ticker").agg(func.max("Open").alias("MaxStockPrice")).show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|  MSFT|       344.62|
|  META|       381.68|
|  TSLA|       411.47|
|  AAPL|       182.63|
|  AMZN|        187.2|
| GOOGL|       151.25|
|  NVDA|       405.95|
|     V|       250.05|
|   TSM|       141.61|
|   SPY|       479.22|
|   QQQ|       405.57|
+------+-------------+



In [48]:
cleaned_stocks.groupBy("Ticker").agg(
    func.max("Open").alias("MaxStockPrice"),
    func.sum("Volume").alias("TotalVolume")
).show(15)

+------+-------------+------------+
|Ticker|MaxStockPrice| TotalVolume|
+------+-------------+------------+
| BRK-B|       361.39|  5862401321|
|  MSFT|       344.62| 37976660472|
|  META|       381.68| 30148848043|
|  TSLA|       411.47|171802975076|
|  AAPL|       182.63|139310061360|
|  AMZN|        187.2|104503287430|
| GOOGL|       151.25| 43956560981|
|  NVDA|       405.95| 58787218324|
|     V|       250.05| 10410997871|
|   TSM|       141.61| 12506470104|
|   SPY|       479.22|107925285300|
|   QQQ|       405.57| 60437153773|
+------+-------------+------------+



In [50]:
## Calculate maximum price of stocks each year => Basic date manipulation operation
cleaned_stocks = (cleaned_stocks.withColumn("Year", func.year(cleaned_stocks.ParsedDate))
                                .withColumn("Month", func.month(cleaned_stocks.ParsedDate))
                                .withColumn("Day", func.dayofmonth(cleaned_stocks.ParsedDate))
                                .withColumn("Week", func.weekofyear(cleaned_stocks.ParsedDate))
                 )

In [51]:
cleaned_stocks.show(10)

+------+----------+-------+------+------+------+------+----+-----+---+----+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|Year|Month|Day|Week|
+------+----------+-------+------+------+------+------+----+-----+---+----+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|2023|    5| 31|  22|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|2023|    5| 30|  22|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|2023|    5| 26|  21|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|2023|    5| 25|  21|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|2023|    5| 24|  21|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|323.11|2023|    5| 23|  21|
| BRK-B|2023-05-22|2763422|330.75|328.35|331.49|329.13|2023|    5| 22|  21|
| BRK-B|2023-05-19|4323538| 331.0|329.12|333.94|330.39|2023|    5| 19|  20|
| BRK-B|2023-05-18|2808329|326.87|325.85|329.98|329.76|2023|    5| 18|  20|
| BRK-B|2023-05-17|3047626|325.02|324.82|328.26|327.39|2023|    5| 17|  20|
+------+----

In [53]:
yearly = cleaned_stocks.groupBy(['Ticker', 'Year']).agg(func.max("Open").alias("YearlHigh"), func.min("Open").alias("YearlyLow"))

In [54]:
yearly.show()

+------+----+---------+---------+
|Ticker|Year|YearlHigh|YearlyLow|
+------+----+---------+---------+
| BRK-B|2023|    331.0|   294.68|
| BRK-B|2018|    224.0|   185.43|
| BRK-B|2021|   300.88|   228.21|
| BRK-B|2020|   233.92|    165.3|
| BRK-B|2019|   227.27|   194.78|
| BRK-B|2022|   361.39|   260.58|
|  MSFT|2019|   159.45|    99.55|
|  MSFT|2021|   344.62|   212.17|
|  MSFT|2020|   229.27|   137.01|
|  MSFT|2018|   115.42|    95.14|
|  MSFT|2023|   335.23|    223.0|
|  MSFT|2022|   335.35|   217.55|
|  META|2020|   300.16|   139.75|
|  META|2021|   381.68|    247.9|
|  META|2018|   215.72|    123.1|
|  META|2022|   339.95|    90.08|
|  META|2019|   208.67|   128.99|
|  META|2023|   265.25|   122.82|
|  TSLA|2019|     29.0|    12.07|
|  TSLA|2021|   411.47|   184.18|
+------+----+---------+---------+
only showing top 20 rows



In [55]:
## Calculate average stock price for stock each month
monthly = cleaned_stocks.groupBy(['Ticker', 'Year', 'Month']).agg(func.max("Open").alias("MonthHigh"), func.min("Open").alias("MonthLow"))
weekly = cleaned_stocks.groupBy(['Ticker', 'Year', 'Week']).agg(func.max("Open").alias("WeekHigh"), func.min("Open").alias("WeekLow"))

In [56]:
monthly.show()

+------+----+-----+---------+--------+
|Ticker|Year|Month|MonthHigh|MonthLow|
+------+----+-----+---------+--------+
| BRK-B|2022|   10|   297.98|  260.58|
| BRK-B|2018|    9|   222.13|  209.21|
| BRK-B|2021|   10|   290.85|  273.02|
| BRK-B|2020|   10|   216.74|  200.03|
| BRK-B|2019|    9|   212.24|  201.19|
| BRK-B|2021|    6|   292.91|   275.0|
| BRK-B|2022|   11|   317.52|  286.02|
| BRK-B|2022|    7|   297.42|   272.5|
| BRK-B|2022|    6|    316.0|  267.38|
| BRK-B|2020|    8|   218.62|  197.28|
| BRK-B|2021|   12|   300.88|  276.68|
| BRK-B|2019|    1|   204.68|  194.78|
| BRK-B|2019|    6|    213.0|  197.62|
| BRK-B|2020|    7|   195.06|  178.26|
| BRK-B|2021|    8|   291.81|  279.05|
| BRK-B|2018|    6|    196.3|  185.43|
| BRK-B|2022|    5|   325.85|  303.93|
| BRK-B|2020|   12|   231.26|  222.02|
| BRK-B|2020|    4|    194.0|   175.0|
| BRK-B|2018|    8|   211.08|   197.2|
+------+----+-----+---------+--------+
only showing top 20 rows



In [57]:
weekly.show()

+------+----+----+--------+-------+
|Ticker|Year|Week|WeekHigh|WeekLow|
+------+----+----+--------+-------+
| BRK-B|2022|  14|   352.0| 341.17|
| BRK-B|2022|  10|  326.59| 322.49|
| BRK-B|2021|  14|  264.22| 260.02|
| BRK-B|2018|  48|  217.23|  209.3|
| BRK-B|2020|  19|  180.05|  173.4|
| BRK-B|2021|  32|  291.81| 287.01|
| BRK-B|2021|  10|  264.22|  255.6|
| BRK-B|2020|  10|  217.39| 203.48|
| BRK-B|2020|  28|   183.1| 178.26|
| BRK-B|2019|  30|  208.37| 205.34|
| BRK-B|2019|  23|  204.95| 197.62|
| BRK-B|2018|  40|   219.7| 214.43|
| BRK-B|2022|  40|   278.2| 269.52|
| BRK-B|2022|  38|   278.9| 266.01|
| BRK-B|2020|  52|   224.6| 222.02|
| BRK-B|2019|   9|  203.15|  201.7|
| BRK-B|2021|   6|  241.43|  236.0|
| BRK-B|2022|  11|  344.12| 328.65|
| BRK-B|2018|  28|   190.9| 188.11|
| BRK-B|2022|  28|  279.38|  273.0|
+------+----+----+--------+-------+
only showing top 20 rows



In [59]:
weekly.withColumn("Spread", weekly['WeekHigh'] - weekly['WeekLow']).show()

+------+----+----+--------+-------+---------+
|Ticker|Year|Week|WeekHigh|WeekLow|   Spread|
+------+----+----+--------+-------+---------+
| BRK-B|2022|  14|   352.0| 341.17|10.829987|
| BRK-B|2022|  10|  326.59| 322.49| 4.100006|
| BRK-B|2021|  14|  264.22| 260.02| 4.200012|
| BRK-B|2018|  48|  217.23|  209.3|7.9299927|
| BRK-B|2020|  19|  180.05|  173.4| 6.650009|
| BRK-B|2021|  32|  291.81| 287.01| 4.799988|
| BRK-B|2021|  10|  264.22|  255.6| 8.619995|
| BRK-B|2020|  10|  217.39| 203.48|13.910004|
| BRK-B|2020|  28|   183.1| 178.26|4.8400116|
| BRK-B|2019|  30|  208.37| 205.34|3.0299988|
| BRK-B|2019|  23|  204.95| 197.62| 7.330002|
| BRK-B|2018|  40|   219.7| 214.43|5.2700043|
| BRK-B|2022|  40|   278.2| 269.52| 8.680023|
| BRK-B|2022|  38|   278.9| 266.01|12.889984|
| BRK-B|2020|  52|   224.6| 222.02|2.5800018|
| BRK-B|2019|   9|  203.15|  201.7| 1.449997|
| BRK-B|2021|   6|  241.43|  236.0|5.4299927|
| BRK-B|2022|  11|  344.12| 328.65|15.470001|
| BRK-B|2018|  28|   190.9| 188.11

## 3. Joins

In [83]:
# Joins
cleaned_stocks.join(yearly, 
                                      (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                                      'inner'
                                     ).show()



+------+----------+-------+------+------+------+------+----+-----+---+----+------+----+---------+---------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|Year|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+------+----------+-------+------+------+------+------+----+-----+---+----+------+----+---------+---------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|2023|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|2023|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|2023|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|2023|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|2023|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|323.11|2023|    5| 23|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-22|2763422|3

In [84]:
cleaned_stocks.join(yearly, 
                    (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                    'inner'
                   ).drop(yearly.Year, yearly.Ticker).show()

+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|ParsedDate| Volume|  Open|   Low|  High| Close|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|2023-05-31|6175417|321.12|319.39|322.41|321.08|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-30|3232461|321.86| 319.0|322.47|322.19|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-26|3229873|320.44|319.67|322.63| 320.6|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-25|4251935|320.56|317.71|320.56|319.02|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-23|4031342|328.19|322.97|329.27|323.11|    5| 23|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-22|2763422|330.75|328.35|331.49|329.13|    5| 22|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-19|4323538| 331.0|329.12|333.94

In [85]:
historic_stocks = cleaned_stocks.join(yearly, 
                    (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                    'inner'
                   ).drop(yearly.Year, yearly.Ticker)

In [86]:
historic_stocks.show(5)

+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|ParsedDate| Volume|  Open|   Low|  High| Close|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|2023-05-31|6175417|321.12|319.39|322.41|321.08|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-30|3232461|321.86| 319.0|322.47|322.19|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-26|3229873|320.44|319.67|322.63| 320.6|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-25|4251935|320.56|317.71|320.56|319.02|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
only showing top 5 rows



In [87]:
cond = [(historic_stocks.Ticker==weekly.Ticker) & (historic_stocks.Year == weekly.Year) & (historic_stocks.Week == weekly.Week)]
historic_stocks = historic_stocks.join(weekly, cond, 'inner').drop(weekly.Year, historic_stocks.Ticker, weekly.Week)

In [88]:
historic_stocks.show(5)

+----------+-------+------+------+------+------+-----+---+----+---------+---------+------+----+--------+-------+
|ParsedDate| Volume|  Open|   Low|  High| Close|Month|Day|Year|YearlHigh|YearlyLow|Ticker|Week|WeekHigh|WeekLow|
+----------+-------+------+------+------+------+-----+---+----+---------+---------+------+----+--------+-------+
|2023-05-31|6175417|321.12|319.39|322.41|321.08|    5| 31|2023|    331.0|   294.68| BRK-B|  22|  321.86| 321.12|
|2023-05-30|3232461|321.86| 319.0|322.47|322.19|    5| 30|2023|    331.0|   294.68| BRK-B|  22|  321.86| 321.12|
|2023-05-26|3229873|320.44|319.67|322.63| 320.6|    5| 26|2023|    331.0|   294.68| BRK-B|  21|  330.75| 320.44|
|2023-05-25|4251935|320.56|317.71|320.56|319.02|    5| 25|2023|    331.0|   294.68| BRK-B|  21|  330.75| 320.44|
|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|    5| 24|2023|    331.0|   294.68| BRK-B|  21|  330.75| 320.44|
+----------+-------+------+------+------+------+-----+---+----+---------+---------+------+----+-

In [89]:
historic_stocks = historic_stocks.join(monthly, ['Ticker', 'Year', 'Month'])

In [90]:
historic_stocks.columns

['Ticker',
 'Year',
 'Month',
 'ParsedDate',
 'Volume',
 'Open',
 'Low',
 'High',
 'Close',
 'Day',
 'YearlHigh',
 'YearlyLow',
 'Week',
 'WeekHigh',
 'WeekLow',
 'MonthHigh',
 'MonthLow']

In [91]:
final_stocks = historic_stocks.select(['Ticker', 'Year',
 'Month', 'Day', 'Week', 'Volume','Open','Low','High','Close', 'YearlHigh','YearlyLow','WeekHigh','WeekLow',
 'MonthHigh','MonthLow'])

In [92]:
final_stocks.show(5)

                                                                                

+------+----+-----+---+----+-------+------+------+------+------+---------+---------+--------+-------+---------+--------+
|Ticker|Year|Month|Day|Week| Volume|  Open|   Low|  High| Close|YearlHigh|YearlyLow|WeekHigh|WeekLow|MonthHigh|MonthLow|
+------+----+-----+---+----+-------+------+------+------+------+---------+---------+--------+-------+---------+--------+
| BRK-B|2023|    5| 31|  22|6175417|321.12|319.39|322.41|321.08|    331.0|   294.68|  321.86| 321.12|    331.0|  320.44|
| BRK-B|2023|    5| 30|  22|3232461|321.86| 319.0|322.47|322.19|    331.0|   294.68|  321.86| 321.12|    331.0|  320.44|
| BRK-B|2023|    5| 26|  21|3229873|320.44|319.67|322.63| 320.6|    331.0|   294.68|  330.75| 320.44|    331.0|  320.44|
| BRK-B|2023|    5| 25|  21|4251935|320.56|317.71|320.56|319.02|    331.0|   294.68|  330.75| 320.44|    331.0|  320.44|
| BRK-B|2023|    5| 24|  21|3075393|322.71|319.56| 323.0| 320.2|    331.0|   294.68|  330.75| 320.44|    331.0|  320.44|
+------+----+-----+---+----+----

In [34]:
# SQL Queries using pyspark

In [94]:
final_stocks.createOrReplaceTempView('stockData')

In [95]:
spark.sql("SELECT * FROM stockData where Ticker='MSFT' and Year='2023'").show(5)

                                                                                

+------+----+-----+---+----+--------+------+------+------+------+---------+---------+--------+-------+---------+--------+
|Ticker|Year|Month|Day|Week|  Volume|  Open|   Low|  High| Close|YearlHigh|YearlyLow|WeekHigh|WeekLow|MonthHigh|MonthLow|
+------+----+-----+---+----+--------+------+------+------+------+---------+---------+--------+-------+---------+--------+
|  MSFT|2023|    5| 31|  22|45950550|332.29|327.33|335.94|328.39|   335.23|    223.0|  335.23| 332.29|   335.23|  305.72|
|  MSFT|2023|    5| 30|  22|29503070|335.23|330.52|335.74|331.21|   335.23|    223.0|  335.23| 332.29|   335.23|  305.72|
|  MSFT|2023|    5| 26|  21|36630630|324.02|323.88| 333.4|332.89|   335.23|    223.0|  324.02| 314.73|   335.23|  305.72|
|  MSFT|2023|    5| 25|  21|43301740|323.24| 320.0| 326.9|325.92|   335.23|    223.0|  324.02| 314.73|   335.23|  305.72|
|  MSFT|2023|    5| 24|  21|23384890|314.73|312.61| 316.5|313.85|   335.23|    223.0|  324.02| 314.73|   335.23|  305.72|
+------+----+-----+---+-

## 4. Advanced Analysis

In [35]:
## Calculate moving average

In [36]:
## Calculate top 10 highest close price for each stock in a year

## 5. Saving Data

In [37]:
## Parquet format

In [38]:
## CSV Format

## 6. Referring to Spark Documentation

In [39]:
## Showing functions that are part of doc and general guide

## 7. Performance and Optimization

In [40]:
# Accumulator and Broadcast
# Correct partition
# Shuffle
# Dataframe optimization
# 

23/06/11 16:23:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
