#Apache Spark DataFrames Project
##Project Deliverable
You will be required to submit:-
###### A GitHub repository with your project written in Pyspark.
######Instructions:-
*As a Data professional, you need to perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2017.*
######You will need to perform the following:
######Data Importation and Exploration


> i. Start a spark session and load the stock file while inferring the data types.

> ii. Determine the column names

> iii. Make observations about the schema.

>iv. Show the first 5 rows

>v.  Use the describe method to learn about the data frame

######Data Preparation
>i. Format all the data to 2 decimal places i.e. format_number()

>ii. Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

######Data Analysis
>i. What day had the Peak High in Price?

>ii. What is the mean of the Close column?

>iii. What is the max and min of the Volume column?

>iv. How many days was the Close lower than 60 dollars?

>v. What percentage of the time was the High greater than 80 dollars?

>vi.What is the Pearson correlation between High and Volume?

>vii. What is the max High per year?

>viii. What is the average Close for each Calendar Month?

######Data description
● Dataset URL (CSV File): https://bit.ly/3pmchka

Checking validity of the data source by reading the url as a CSV

In [None]:
#To read this CSV into a Python dataframe, you can use the pandas library. 
#Here's the code to do that:

import pandas as pd

url = 'https://bit.ly/3pmchka'
df = pd.read_csv(url)

print(df.head())


         Date       Open       High        Low      Close    Volume  Adj Close
0  2012-01-03  59.970001  61.060001  59.869999  60.330002  12668800  52.619235
1  2012-01-04  60.209999  60.349998  59.470001  59.709999   9593300  52.078475
2  2012-01-05  59.349998  59.619999  58.369999  59.419998  12768200  51.825539
3  2012-01-06  59.419998  59.450001  58.869999  59.000000   8069400  51.459220
4  2012-01-09  59.029999  59.549999  58.919998  59.180000   6679300  51.616215


Pre-requisites

In [None]:
# Installing pyspark
# ---
#
!pip install pyspark

# Next, we run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=1ef714482a32ed13fdb73d9ad38f26aaa369e7bcece003ec18f3915400a1034f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

Data Importation and Exploration

Start a spark session and load the stock file while inferring the data types

In [None]:
#Importing the required libraries

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Start a Spark session
spark = SparkSession.builder.appName('SafaricomStockAnalysis').getOrCreate()

# Create SQL context object
sqlCtx = SQLContext(spark.sparkContext)

# Read in the data using the SQL context object
df = sqlCtx.read.csv('saf_stock.csv', header=True, inferSchema=True)



Determine the column names


In [None]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

Make observations about the schema.


In [None]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



● Show the first 5 rows

In [None]:
df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



Use the describe method to learn about the data frame

In [None]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

Data Preparation

Format all the data to 2 decimal places i.e. format_number()


Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day

In [None]:
# Import preparation modules
import pyspark.sql.functions as F
from pyspark.sql.functions import lit,when,col,expr,round

# Create a new data frame and round off the columns to two decimal places while adding the new HV column

df_prepared=(
df.withColumn('HV',expr("High/Volume"))#Create the new column HV which is a ratio if High to Volume ratio of stocks traded\
    .withColumn('Open', F.format_number('Open', 2))# Round the Open Column to two decimal places\
    .withColumn('High', F.format_number('High', 2))# Round the high column to two decimal places\
    .withColumn('Low', F.format_number('Low', 2))# Round the Low column to two decimal places\
    .withColumn('Close', F.format_number('Close', 2))# Round the close column to two decimal places\
    .withColumn('Volume', round('Volume', 2))#round the volume column to two decimal places.Round function used in this case to solve comma issues\
    .withColumn('Adj Close', F.format_number('Adj Close', 2))# Round the close column to two decimal places\
    .withColumn('HV',F.format_number('HV', 10)))# Round the new HV column to 10 decimal places
 
df_prepared.show(5)

+-------------------+-----+-----+-----+-----+--------+---------+------------+
|               Date| Open| High|  Low|Close|  Volume|Adj Close|          HV|
+-------------------+-----+-----+-----+-----+--------+---------+------------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|0.0000048197|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|0.0000062908|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|0.0000046694|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8069400|    51.46|0.0000073673|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|0.0000089156|
+-------------------+-----+-----+-----+-----+--------+---------+------------+
only showing top 5 rows



Data Analysis

In [None]:
#Register a table in SQL
table = df_prepared.registerTempTable("Data_Analytics")

In [None]:
#Confirm that SAF has been registered
tables = sqlCtx.tableNames()

print(tables)

['data_analytics']


#Question 1: What day had the Peak High in Price?

In [None]:
q = "SELECT \
         Date,max(High) AS Peak_High_Price \
     FROM Data_Analytics GROUP BY Date \
    ORDER BY Peak_High_Price DESC LIMIT 1 "
sqlCtx.sql(q).show()

+-------------------+---------------+
|               Date|Peak_High_Price|
+-------------------+---------------+
|2015-01-13 00:00:00|          90.97|
+-------------------+---------------+



#Question 2: What is the mean of the Close column?


In [None]:
q = "\
SELECT\
    MEAN(Close) AS MEAN\
        FROM Data_Analytics"

sqlCtx.sql(q).show()

+-----------------+
|             MEAN|
+-----------------+
|72.38844992050863|
+-----------------+



# Question 3:- What is the max and min of the Volume column?


In [None]:
q = "SELECT\
     MIN(Volume) Min_Volume,MAX(Volume) Max_Volume\
         FROM Data_Analytics\
 "

sqlCtx.sql(q).show()

+----------+----------+
|Min_Volume|Max_Volume|
+----------+----------+
|   2094900|  80898100|
+----------+----------+



#Question 4: -How many days was the Close lower than 60 dollars?


In [None]:
# How many days was the close lower than 60 dollars?
q = "SELECT\
    COUNT(Date)\
    FROM Data_Analytics\
    WHERE Close <= 60\
    "
sqlCtx.sql(q).show()

+-----------+
|count(Date)|
+-----------+
|        116|
+-----------+



#Question 5: -What percentage of the time was the High greater than 80 dollars?


In [None]:
#What percentage of the time was the High greater than 80 Dollars
# Total column entries were computed earlier. An improved querry of this is to use Common Table Expressions for this querry

q = "SELECT\
     ROUND((COUNT(High)/1258*100),2) Percentage_Greater_Than_80\
         FROM Data_Analytics\
         WHERE High >= 80\
            "

sqlCtx.sql(q).show()

+--------------------------+
|Percentage_Greater_Than_80|
+--------------------------+
|                      9.14|
+--------------------------+



#Question 6: - What is the Pearson correlation between High and Volume?


In [None]:
# What is the Pearson correlation between High and Volume

q = "SELECT ROUND(corr(High,Volume),2) Pearson_Correlation\
          FROM Data_Analytics"

sqlCtx.sql(q).show()

+-------------------+
|Pearson_Correlation|
+-------------------+
|              -0.34|
+-------------------+



#Question 7: -What is the max High per year?

In [None]:
#What is the Max High per Year

q = "SELECT\
     EXTRACT(YEAR FROM Date) Year,\
     MAX(High) Max_High\
     FROM Data_Analytics\
     GROUP BY Year\
     ORDER BY Max_High DESC"
sqlCtx.sql(q).show()

+----+--------+
|Year|Max_High|
+----+--------+
|2015|   90.97|
|2014|   88.09|
|2013|   81.37|
|2012|   77.60|
|2016|   75.19|
+----+--------+



#Question 8: -What is the average Close for each Calendar Month?

In [None]:
#What is the average Close for each Calendar Month?
q = "SELECT\
    EXTRACT(MONTH FROM Date) Month,\
    ROUND(AVG(Close),2) Avg_Close\
    FROM Data_Analytics\
    GROUP BY Month\
    ORDER BY Month ASC"

sqlCtx.sql(q).show()

+-----+---------+
|Month|Avg_Close|
+-----+---------+
|    1|    71.45|
|    2|    71.31|
|    3|    71.78|
|    4|    72.97|
|    5|    72.31|
|    6|     72.5|
|    7|    74.44|
|    8|    73.03|
|    9|    72.18|
|   10|    71.58|
|   11|    72.11|
|   12|    72.85|
+-----+---------+



Another approach to achieve this with one complete piece of code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import format_number

# Start a Spark session
spark = SparkSession.builder.appName('SafaricomStockAnalysis').getOrCreate()

# Create SQL context object
sqlCtx = SQLContext(spark.sparkContext)

# Read in the data using the SQL context object
df = sqlCtx.read.csv('saf_stock.csv', header=True, inferSchema=True)

# Determine the column names
print('Column Names:')
for column in df.columns:
    print(column)

# Make observations about the schema
print('Schema:')
df.printSchema()

# Show the first 5 rows
print('First 5 Rows:')
df.show(5)

# Use the describe method to learn about the data frame
print('Dataframe Summary:')
df.describe().show()

# Format all the data to 2 decimal places

df = df.na.fill(0)
numeric_cols = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
for col in numeric_cols:
    df = df.withColumn(col, df[col].cast('double'))

# Create a new data frame with a column called HV Ratio
hv_ratio = df.select((df['High'] / df['Volume']).alias('HV Ratio'))

# What day had the Peak High in Price?
print('Day with Peak High in Price:')
df.orderBy(df['High'].desc()).select('Date').head(1)

# What is the mean of the Close column?
print('Mean of the Close column:')
df.select(mean('Close')).show()

# What is the max and min of the Volume column?
print('Max and Min of the Volume column:')
df.select(max('Volume'), min('Volume')).show()

# How many days was the Close lower than 60 dollars?
print('Days with Close lower than 60 dollars:')
df.filter(df['Close'] < 60).count()

# What percentage of the time was the High greater than 80 dollars?
high_80 = df.filter(df['High'] > 80).count()
total_days = df.count()
percentage_high_80 = high_80 / total_days * 100
print('Percentage of time High was greater than 80 dollars:')
print(percentage_high_80)

# What is the Pearson correlation between High and Volume?
print('Pearson correlation between High and Volume:')
df.select(corr('High', 'Volume')).show()

# What is the max High per year?
max_high = df.select(year('Date').alias('Year'), 'High').groupBy('Year').agg(max('High').alias('Max High'))
print('Max High per year:')
max_high.show()

# What is the average Close for each Calendar Month?
avg_close = df.select(month('Date').alias('Month'), 'Close').groupBy('Month').agg(mean('Close').alias('Avg Close'))
print('Average Close for each Calendar Month:')
avg_close.show()


Column Names:
Date
Open
High
Low
Close
Volume
Adj Close
Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

First 5 Rows:
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012