<a href="https://colab.research.google.com/github/MbogoriL/spark-project/blob/main/Introduction_to_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Prerequisites

## 1. Data Importation and Exploration


In [None]:
# Installing pyspark
# ---
#
!pip install pyspark



In [None]:
# Next, we run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
from google.colab import files
uploaded = files.upload()

Saving saf_stock.csv to saf_stock (3).csv


In [None]:
df = spark.read.csv('saf_stock.csv',header=True, inferSchema=True)
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
f = open('saf_stock.csv')

for i in range(0,4):
    print(f.readline())

Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539



In [None]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [None]:
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

In [None]:
first_five = df.head(5)
for r in first_five:
    print(r.Date)

2012-01-03
2012-01-04
2012-01-05
2012-01-06
2012-01-09


In [None]:
df[['Date', 'Open', 'High']].show(3)


+----------+------------------+---------+
|      Date|              Open|     High|
+----------+------------------+---------+
|2012-01-03|         59.970001|61.060001|
|2012-01-04|60.209998999999996|60.349998|
|2012-01-05|         59.349998|59.619999|
+----------+------------------+---------+
only showing top 3 rows



In [None]:
df[df['Open'] < df['High']].show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



## 2. Data Preparation

Format all the data to 2 decimal places i.e. format_number()

In [None]:
from pyspark.sql.functions import col
import pyspark.sql.functions as func

In [None]:
df.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



In [None]:
df = df.withColumn("Open", func.round(df["Open"], 2)).withColumn("High", func.round(df["High"], 2)).withColumn("Low", func.round(df["Low"], 2)).withColumn("Close", func.round(df["Close"], 2)).withColumn("Adj Close", func.round(df["Adj Close"], 2))
df.show(5)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day

In [None]:
df_hv = df.withColumn('HV Ratio', df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show(5)

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714574387472E-6|
|6.290848821573389...|
|4.669413073103491E-6|
|7.367338339901356E-6|
|8.915604928660188E-6|
+--------------------+
only showing top 5 rows



## 3. Data Analysis


What day had the Peak High in Price?

In [None]:
from pyspark.sql.functions import mean, min, max

In [None]:
df.show(2)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 2 rows



In [None]:
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

'2015-01-13'

What is the mean of the Close column?


In [None]:
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+



What is the max and min of the Volume column?


In [None]:
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



How many days was the Close lower than 60 dollars?


In [None]:
df[df['Close'] < 60].count()


81

What percentage of the time was the High greater than 80 dollars?

In [None]:
df[df['High'] < 80].count()/df.count() * 100

90.85850556438791

What is the Pearson correlation between High and Volume?


In [None]:
from pyspark.sql.functions import corr
df.select(corr(df['High'], df['Volume'])).show()

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.33843260582148915|
+--------------------+



What is the max High per year?


In [None]:
from pyspark.sql.functions import (dayofmonth, hour,
 dayofyear, month,
year, weekofyear,
 format_number, date_format)

year_df = df.withColumn('Year', year(df['Date']))
year_df.groupBy('Year').max()['Year', 'max(High)'].show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|    90.97|
|2013|    81.37|
|2014|    88.09|
|2012|     77.6|
|2016|    75.19|
+----+---------+



What is the average Close for each Calendar Month?

In [None]:
month_df = df.withColumn('Month', month(df['Date']))

month_df = month_df.groupBy('Month').mean()

month_df['Month', 'avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|   12|72.84792452830189|
|    1|71.44801980198022|
|    6|72.49537735849057|
|    3|71.77794392523363|
|    5|72.30971698113206|
|    9|72.18411764705883|
|    4|72.97361904761907|
|    8|73.02981818181819|
|    7|74.43971962616824|
|   10|71.57854545454546|
|   11|72.11108910891085|
|    2|71.30680412371134|
+-----+-----------------+



Converting Spark DataFrames to Pandas DataFrames

In [None]:
pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.97,61.06,59.87,60.33,12668800,52.62
1,2012-01-04,60.21,60.35,59.47,59.71,9593300,52.08
2,2012-01-05,59.35,59.62,58.37,59.42,12768200,51.83
3,2012-01-06,59.42,59.45,58.87,59.0,8069400,51.46
4,2012-01-09,59.03,59.55,58.92,59.18,6679300,51.62
