1. Reading and Cleaning the Stock Price Data

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=a97f0c88df5af1a6ce0551577125562dc4e95d351ac17ea4b64b137ad3cc5d2b
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Stock Price Analysis').getOrCreate()

In [4]:

from google.colab import drive
drive.mount('/content/drive')

# Assuming your CSV file is in 'My Drive/stock_prices.csv'
file_path = '/content/drive/MyDrive/Stock_price_data'  # Replace with your actual file path
df = spark.read.csv(file_path, header=True, inferSchema=True)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Reading CSV Data => Stocks
stocks = spark.read.csv("/content/drive/MyDrive/Stock_price_data", header=True, inferSchema=True)

In [6]:
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [7]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [8]:
stocks.select("Ticker").show(5)

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 5 rows



In [9]:
stocks.select(["Ticker", "Date", "Open"]).show(5)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
+------+----------+--------+
only showing top 5 rows



In [10]:
# Filter Data
stocks.filter(stocks.Ticker == "MSFT").show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  MSFT|05/24/2023|  $313.85 |23384890|$314.73 |$316.50 |$312.61 |
|  MSFT|05/23/2023|  $315.26 |30797170|$320.03 |$322.72 |$315.25 |
|  MSFT|05/22/2023|  $321.18 |24115660|$318.60 |$322.59 |$318.01 |
|  MSFT|05/19/2023|  $318.34 |27546700|$316.74 |$318.75 |$316.37 |
|  MSFT|05/18/2023|  $318.52 |27275990|$314.53 |$319.04 |$313.72 |
|  MSFT|05/17/2023|  $314.00 |24315010|$312.29 |$314.43 |$310.74 |
+------+----------+----------+--------+--------+--------+--------+
only showing top 10 rows



In [11]:
stocks.filter((stocks.Ticker == "MSFT") & (stocks.Date == "05/31/2023")).show(5)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
+------+----------+----------+--------+--------+--------+--------+



In [12]:
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V")) & (stocks.Date == "05/31/2023")).show(15)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [13]:
stocks.filter((stocks.Ticker.isin (["MSFT", "QQQ", "SPY", "V", "TSLA"])) & (stocks.Date == "05/31/2023")).show(15)

+------+----------+----------+---------+--------+--------+--------+
|Ticker|      Date|Close/Last|   Volume|    Open|    High|     Low|
+------+----------+----------+---------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 | 45950550|$332.29 |$335.94 |$327.33 |
|  TSLA|05/31/2023|  $203.93 |150711700|$199.78 |$203.95 |$195.12 |
|     V|05/31/2023|  $221.03 | 20460620|$219.96 |$221.53 |$216.14 |
|   SPY|05/31/2023|    417.85|110811800|  418.28|  419.22|  416.22|
|   QQQ|05/31/2023|    347.99| 65105380|  348.37|   350.6|  346.51|
+------+----------+----------+---------+--------+--------+--------+



In [14]:
# User Defined Functions
stocks.printSchema()


root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [15]:
from pyspark.sql.functions import udf
from datetime import datetime
from pyspark.sql.types import DateType

In [16]:
# User drfined function to parse the dates
date_parser = udf(lambda date: datetime.strptime(date, "%m/%d/%Y"), DateType())

In [17]:
stocks = stocks.withColumn("ParseDate", date_parser(stocks.Date))

In [18]:
stocks.show(3)

+------+----------+----------+-------+--------+--------+--------+----------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low| ParseDate|
+------+----------+----------+-------+--------+--------+--------+----------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |2023-05-31|
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |2023-05-30|
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |2023-05-26|
+------+----------+----------+-------+--------+--------+--------+----------+
only showing top 3 rows



In [19]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParseDate: date (nullable = true)



In [21]:
def num_parser(value):
  if isinstance(value, str):
    return float(value.strip("$"))
  elif isinstance(value, int) or isinstance(value, float):
    return value
  else:
    return None

from pyspark.sql.types import FloatType
parser_number = udf(num_parser, FloatType())



In [23]:
stocks = (stocks.withColumn("Open", parser_number(stocks.Open))
                .withColumn("Close", parser_number(stocks["Close/Last"]))
                .withColumn("High", parser_number(stocks.High))
                .withColumn("Low", parser_number(stocks.Low)))






In [24]:
stocks.show()

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low| ParseDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|05/23/2023|  $323.11 |4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|05/22/2023|  $329.13 |2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|05/19/2023|  $330.39 |4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|05/18/2023|  $329.76 |2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|05/17/2023|  $327.39 |3047626|325.02|328.26|324.82|2023-

In [25]:
# Changing the datatype pf the column

from pyspark.sql.types import IntegerType
parse_int = udf(lambda value: int(value), IntegerType())

In [26]:
stocks = stocks.withColumn("Volume", parse_int(stocks.Volume))

In [27]:
stocks.show(5)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low| ParseDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
+------+----------+----------+-------+------+------+------+----------+------+
only showing top 5 rows



In [28]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParseDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [35]:
cleaned_stocks = stocks.select(["Ticker", "ParseDate", "Volume", "Open", "Low", "High", "Close"])

In [33]:
cleaned_stocks.show(5)

+------+----------+-------+------+------+------+------+
|Ticker|      Date| Volume|  Open|   Low|  High| Close|
+------+----------+-------+------+------+------+------+
| BRK-B|05/31/2023|6175417|321.12|319.39|322.41|321.08|
| BRK-B|05/30/2023|3232461|321.86| 319.0|322.47|322.19|
| BRK-B|05/26/2023|3229873|320.44|319.67|322.63| 320.6|
| BRK-B|05/25/2023|4251935|320.56|317.71|320.56|319.02|
| BRK-B|05/24/2023|3075393|322.71|319.56| 323.0| 320.2|
+------+----------+-------+------+------+------+------+
only showing top 5 rows



In [36]:
cleaned_stocks.describe(["Volume", "Open", "Low", "High", "Close"]).show()

+-------+-------------------+------------------+------------------+------------------+------------------+
|summary|             Volume|              Open|               Low|              High|             Close|
+-------+-------------------+------------------+------------------+------------------+------------------+
|  count|              13849|             13849|             13849|             13849|             13849|
|   mean|4.652450420210846E7|186.72403883040533|184.54500949025558|188.81925691578033|186.74616294031463|
| stddev|5.180538849328058E7|102.20488183430554|101.32247364642582| 102.9996912996364|102.19532804005233|
|    min|             961133|             12.07|              11.8|             12.45|             11.93|
|    max|          914080943|            479.22|            476.06|            479.98|            477.71|
+-------+-------------------+------------------+------------------+------------------+------------------+

