<a href="https://colab.research.google.com/github/Oughty-Otieno/Apache-Spark-DataFrames-Project/blob/main/Apache_Spark_DataFrames_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Problem statement

As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.
You will need to perform the following:

Data Importation and Exploration

● Start a spark session and load the stock file while inferring the data types.

● Determine the column names

● Make observations about the schema.

● Show the first 5 rows

● Use the describe method to learn about the data frame

Data Preparation

● Format all the data to 2 decimal places i.e. format_number()

● Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

Data Analysis

● What day had the Peak High in Price?

● What is the mean of the Close column?

● What is the max and min of the Volume column?

● How many days was the Close lower than 60 dollars?

● What percentage of the time was the High greater than 80 dollars?

● What is the Pearson correlation between High and Volume?

● What is the max High per year?

● What is the average Close for each Calendar Month?

Data description

Dataset URL (CSV File): https://bit.ly/3pmchk

# Data Importation and Exploration

Start a spark session and load the stock file while inferring the data types.

In [1]:
# To use Pyspark in Colab, We'll first install it
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 63.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=579ee53049cfcf2cf9221b948cc5809308b7e5696c83539e0404e62b93ccc6cd
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
# Next, we'll run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Load the data

In [25]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)


df = sqlCtx.read.csv("/content/saf_stock.csv", header=True)

# Print the type
print(type(df))


<class 'pyspark.sql.dataframe.DataFrame'>


Check the schema

In [26]:
#We can print the schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [27]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



Show the columns


In [28]:
print(df.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


Use the describe method to learn about the data frame

In [29]:
# Use the describe method to learn about the data frame Data Preparation
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

# Data Preparation

Format all the data to 2 decimal places i.e. format_number()

In [30]:
# Format all the data to 2 decimal places i.e. format_number()

from pyspark.sql.functions import format_number

df_2_decimal=df.select(df['Date'],format_number(df['Open'].cast('float'),2).alias('Open'),
              format_number(df['High'].cast('float'), 2).alias('High'),
              format_number(df['Low'].cast('float'), 2).alias('Low'),
              format_number(df['Close'].cast('float'), 2).alias('Close'),
              df['Volume'].cast('int').alias('Volume'),
              format_number(df['Adj Close'].cast('float'), 2).alias('Adj Close')
              )

df_2_decimal.show(5)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day


In [38]:
#Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

df_new = df_2_decimal.withColumn("HV Ratio", format_number((df_2_decimal['High']/df_2_decimal['Volume']).cast('float'),8))
print(df_new.show(5))

+----------+-----+-----+-----+-----+--------+---------+----------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|  HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+----------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|0.00000482|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|0.00000629|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|0.00000467|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|0.00000737|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|0.00000892|
+----------+-----+-----+-----+-----+--------+---------+----------+
only showing top 5 rows

None


# Data Analysis


What day had the Peak High in Price

In [42]:
print(df_new.orderBy(new_df['High'].desc()).head(1))

[Row(Date='2015-01-13', Open='90.80', High='90.97', Low='88.93', Close='89.31', Volume=8215400, Adj Close='83.83', HV Ratio='0.00001107')]


What is the mean of the Close column?

In [43]:
from pyspark.sql.functions import mean, max, min

print(df_new.select(mean('Close')).show())

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+

None


What is the max and min of the Volume column?

In [44]:
# What is the max and min of the Volume column?
print(df_new.select(min('Volume'),max('Volume')).show())

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+

None


 How many days was the Close lower than 60 dollars?

In [45]:
print(df_new.filter(df_new['Close'] < 60).count())

81


 What percentage of the time was the High greater than 80 dollars?

In [47]:
print((df_new.filter(df_new['High']>80).count()/df_new.count()) * 100)

8.426073131955485


 What is the Pearson correlation between High and Volume?

In [48]:
from pyspark.sql.functions import corr

print(df_new.select(corr('High','Volume')).show())

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.33843260582148915|
+--------------------+

None


What is the max High per year?

In [49]:
from pyspark.sql import functions as F
df_new.groupby(F.date_format('Date','yyyy').alias('Year')).agg({'High': 'max'}).sort('Year').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|    77.60|
|2013|    81.37|
|2014|    88.09|
|2015|    90.97|
|2016|    75.19|
+----+---------+



What is the average Close for each Calendar Month?


In [51]:
df_new.groupby(F.date_format('Date','MM').alias('Month')).agg({'Close': 'mean'}).sort('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|   01|71.44801980198022|
|   02|71.30680412371134|
|   03|71.77794392523363|
|   04|72.97361904761907|
|   05|72.30971698113206|
|   06|72.49537735849057|
|   07|74.43971962616824|
|   08|73.02981818181819|
|   09|72.18411764705883|
|   10|71.57854545454546|
|   11|72.11108910891085|
|   12|72.84792452830189|
+-----+-----------------+

