<a href="https://colab.research.google.com/github/RBaragu/Apache-Spark-DataFrames-Project/blob/main/Apache_Spark_DataFrames_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. **Defining the Question**


a) Specifying the Data Analysis Question
Perform an analysis on Safaricom stock market data from the years 2012-2016


b) Defining the Metric for Success
The analysis question will be answered by answering questions about some stock market data on Safaricom from the years 2012-2016


c) Understanding the context
As a Data professional, you need to perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2016


d) Recording the Experimental Design
Data Importation and Exploration
Data Preparation
Data Analysis


e) Data Relevance
The dataset includes Safaricom stock market daily data for the period Jan 2012 - Dec 2016, and is therefore relevant in answering the research question

The dataset contains the following info on the daily stock trades:

Date - The calender date
Open - Opening price on the stock
High - The maximum value reached by the stock
Low - The minimum price of the stock
Close - Closing price of the stock
Volume - The number of shares that exchange hands for the stock
Adj Close - Price value that incorporates changes resulting from corporate actions such as dividend payments, stock splits, or new share issuance.

2**.Data Importation and Exploration**


In [1]:
#Start a spark session and load the stock file while inferring the data types.
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 30 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=589dd412552f3fe4d90d97f36f5368b532a4ba537158a049a8703a576e4400ef
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
#Creating local spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext


In [4]:
# Import SQLContext 
from pyspark.sql import SQLContext

# Instantiate the SQLContext object and assign it to the variable sqlCtx and pass object sc

sqlCtx = SQLContext(sc)

# Read in the data into a DataFrame object `saf_stock_df`
stock_df = sqlCtx.read.format("csv").option("header","true").load("saf_stock.csv")

# Print the type
print(type(stock_df))




<class 'pyspark.sql.dataframe.DataFrame'>


In [5]:
#Determine the column names
stock_df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [6]:
#Make observations about the schema
stock_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



The df is in string format and its nullable with 7 columns

In [7]:
#Show the first 5 rows
stock_df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [8]:
#Use the describe method to learn about the data frame
stock_df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

3. **Data Preparation**

In [10]:
#Format all the data to 2 decimal places i.e. format_number()
# Import the format_number() fn
from pyspark.sql.functions import format_number, col

# Import the FloatType datatype
from pyspark.sql.types import FloatType

# Import the DecimalType datatype
from pyspark.sql.types import DecimalType

# Select columns to format
columns = ['Open', 'High', 'Low', 'Close', 'Adj Close']

# Iterate over all the cols and format to 2 decimal places
for col in columns:
  stock_df = stock_df.withColumn(col, format_number(stock_df[col].cast("float"), 2))


# Preview the dataset
stock_df.show(5)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



In [11]:
#Create a new data frame with a column called HV Ratio that is the ratio of the
#High Price versus volume of stock traded for a day
stock_new = stock_df.withColumn("HV_Ratio", stock_df.High/stock_df.Volume)

stock_new.show(5)

+----------+-----+-----+-----+-----+--------+---------+--------------------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|            HV_Ratio|
+----------+-----+-----+-----+-----+--------+---------+--------------------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|4.819714574387472E-6|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|6.290848821573389...|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|4.669413073103491E-6|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|7.367338339901356E-6|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|8.915604928660188E-6|
+----------+-----+-----+-----+-----+--------+---------+--------------------+
only showing top 5 rows



4.**Data Analysis**


In [12]:
#What day had the Peak High in Price?
#create a table table named safstock
stock_new.createOrReplaceTempView('safstock')
saftable = sqlCtx.tableNames()
print(saftable)

['safstock']


In [13]:
high = 'select Date, max(High) from safstock group by 1 order by 2 desc'
sqlCtx.sql(high).show(1)

+----------+---------+
|      Date|max(High)|
+----------+---------+
|2015-01-13|    90.97|
+----------+---------+
only showing top 1 row



In [14]:
#What is the mean of the Close column?
mean = 'select round(avg(Close),2) as Avg_Close from safstock'
sqlCtx.sql(mean).show()

+---------+
|Avg_Close|
+---------+
|    72.39|
+---------+



In [17]:
#What is the max and min of the Volume column?
maxmin = 'select max(Volume) as Max_Volume, min(Volume) as Min_Volume from safstock'
sqlCtx.sql(maxmin).show()

+----------+----------+
|Max_Volume|Min_Volume|
+----------+----------+
|   9994400|  10010500|
+----------+----------+



In [19]:
#How many days was the Close lower than 60 dollars?
lower = 'select count(Date) from safstock where Close < 60'
sqlCtx.sql(lower).show()

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



In [20]:
#What percentage of the time was the High greater than 80 dollars?
higher= """select round(Over_80/Total * 100,2) as Over_80_Prop from 
          (Select count(Date) as Total, sum(Case when High > 80 then 1 else 0 end) as Over_80 
          From safstock)"""
sqlCtx.sql(higher).show()

+------------+
|Over_80_Prop|
+------------+
|        8.43|
+------------+



In [21]:
#What is the Pearson correlation between High and Volume?
correlation = 'select round(corr(High,Volume),2) as High_Vol_Corr from safstock'
sqlCtx.sql(correlation).show()

+-------------+
|High_Vol_Corr|
+-------------+
|        -0.34|
+-------------+



In [22]:
#What is the max High per year?
maxhigh = """select substr(Date,1,4) as Year, max(High) as Year_High from safstock
        group by 1 order by 1"""

sqlCtx.sql(maxhigh).show()

+----+---------+
|Year|Year_High|
+----+---------+
|2012|    77.60|
|2013|    81.37|
|2014|    88.09|
|2015|    90.97|
|2016|    75.19|
+----+---------+



In [23]:
#What is the average Close for each Calendar Month?
average = """select substr(Date,6,2) as Month, round(avg(Close),2) as Monthly_Avg_Close 
            from safstock group by Month order by Month"""
sqlCtx.sql(average).show()

+-----+-----------------+
|Month|Monthly_Avg_Close|
+-----+-----------------+
|   01|            71.45|
|   02|            71.31|
|   03|            71.78|
|   04|            72.97|
|   05|            72.31|
|   06|             72.5|
|   07|            74.44|
|   08|            73.03|
|   09|            72.18|
|   10|            71.58|
|   11|            72.11|
|   12|            72.85|
+-----+-----------------+

