<a href="https://colab.research.google.com/github/fkihu/Model-Quality-and-Improvement-Assignment/blob/main/Week_9D4_Spark_SQL_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instructions
As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017. Data source: https://bit.ly/3pmchka

This analysis will be done on PySpark

## Data importation and Exploration

In [3]:
# Starting the spark session and loading the stock file 

# Installing pyspark
!pip install pyspark

# Running a local Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# Downloading the dataset
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.csv("saf_stock.csv")
df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()






In [4]:
print(tables)

['saf_stock']


In [19]:
df = sqlCtx.read.csv("saf_stock.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [20]:
df.columns
# display(df)

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6']

Observations:

1. The schema did not automatically pick the labels in the dataframe.
2. The column names are 'c0', 'c1', 'c2', 'c3', 'c4', 'c5' and 'c6'

In [39]:
# Inferring the schema and setting the column headings

# File location and type
file_location = "saf_stock.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df2 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Displaying the Dataframe
display(df2)

DataFrame[Date: string, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

The dataframe now has the correct column names.

In [34]:
# Display Dataframe's Schema
df2.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [35]:
# Showing the first five rows of the dataframe
df2.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [36]:
# Displaying the statistic summary of the data.

df2.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

## Data Preparation

In [37]:
# Format all the data to 2 decimal places i.e. format_number()

# from pyspark.sql.functions import avg, format_number 

# Importing format_number
from pyspark.sql.functions import format_number, col

# Selecting the columns to format
cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

# Formatting the columns
df2 = df2.select('Date', *[format_number(col(col_name), 2).name(col_name) for col_name in cols]).show()
# df2.describe().show

+----------+-----+-----+-----+-----+-------------+---------+
|      Date| Open| High|  Low|Close|       Volume|Adj Close|
+----------+-----+-----+-----+-----+-------------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
|2012-01-10|59.43|59.71|58.98|59.04| 6,907,300.00|    51.49|
|2012-01-11|59.06|59.53|59.04|59.40| 6,365,600.00|    51.81|
|2012-01-12|59.79|60.00|59.40|59.50| 7,236,400.00|    51.90|
|2012-01-13|59.18|59.61|59.01|59.54| 7,729,300.00|    51.93|
|2012-01-17|59.87|60.11|59.52|59.85| 8,500,000.00|    52.20|
|2012-01-18|59.79|60.03|59.65|60.01| 5,911,400.00|    52.34|
|2012-01-19|59.93|60.73|59.75|60.61| 9,234,600.00|    52.86|
|2012-01-20|60.75|61.25|60.67|61.01|10,378,800.00|    53.21|
|2012-01-23|60.81|60.98|

In [None]:
# Create a new data frame with a column called HV Ratio that is the ratio of the
# High Price versus volume of stock traded for a day

# TO REVISIT THIS CODE AS IT GENERATED AN ERROR

# new_df = df2.withColumn('HV Ratio', df2.High/df2.Volume).select('High', 'Volume', 'HV Ratio').show()

## Data Analysis

In [40]:
# What day had the Peak High in Price?
from pyspark.sql.functions import desc
df2.select('Date', 'High').orderBy(desc('High')).show(1)

+----------+---------+
|      Date|     High|
+----------+---------+
|2015-01-13|90.970001|
+----------+---------+
only showing top 1 row



Observation: The day that had the peak high in price was 13th January 2015.

In [42]:
# What is the mean of the Close column?
df2.select('Close').describe().show()

+-------+-----------------+
|summary|            Close|
+-------+-----------------+
|  count|             1258|
|   mean|72.38844998012726|
| stddev|6.756859163732991|
|    min|        56.419998|
|    max|        90.470001|
+-------+-----------------+



Observation: The mean of the Close column is 72.38844998012726

In [43]:
# What is the max and min of the Volume column?
df2.select('Volume').describe().show()

+-------+-----------------+
|summary|           Volume|
+-------+-----------------+
|  count|             1258|
|   mean|8222093.481717011|
| stddev|  4519780.8431556|
|    min|          2094900|
|    max|         80898100|
+-------+-----------------+



Observation: The min and max of the Volume column is 2094900 and 80898100 respectively.

In [45]:
# How many days was the Close lower than 60 dollars?
df2.filter((df2['Close'] < 60)).count()

81

Observation: There were 81 days where the Close was less than 60 dollars.

In [46]:
# What percentage of the time was the High greater than 80 dollars?

# Getting the count of the days when the High was greater than 80 dollars.
df2.filter((df2['High'] > 80)).count() #115 Days

# There are a total of 1258 records in the dataframe.

# Computing the percentage of the time when High was greater than 80 dollars.

answer = 115/1258
print(format(answer,'.2%')) 

9.14%


Observation: the percentage of the time that the High was greater than 80 dollars was 9.14%.

In [47]:
# What is the Pearson correlation between High and Volume?
df2.select('High', 'Volume').corr('High', 'Volume')

-0.3384326061737161

Observation: The correlation coefficient between High and Volume is -0.3384326061737161

In [48]:
# What is the max High per year?

# Import relevant libraries
from pyspark.sql.functions import dayofmonth,hour,dayofyear,weekofyear,month,year,format_number,date_format,mean, date_format, datediff, to_date, lit

# Adding the Year column
new_df2 = df2.withColumn('Year', year(df2['Date']))

# Showing the max High per year
new_df2.groupBy('Year').max('High').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



Observation: the year with the highest max(High) was 2015 while 2016 had the lowest.

In [49]:
# What is the average Close for each Calendar Month?

# Adding the Month column
new_df2 = df2.withColumn('Month', month(df2['Date']))

# Showing the average Close per month
from pyspark.sql.functions import asc
new_df2.groupBy('Month').mean('Close').orderBy(asc('Month')).show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



Observation: The month with the highest closing average is July (74.43971943925233), while February has the lowest (71.306804443299).