# <font color='#2F4F4F'>1. Defining the Question</font>

## a) Specifying the Data Analysis Question

Perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2017

## b) Defining the Metric for Success

We will have succeeded if we can build an apache spark dataframe and do the required data analysis in part C below

## c) Understanding the context 
As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.
You will need to perform the following:

Data Importation and Exploration
*   Start a spark session and load the stock file while inferring the data 
types.
*   Determine the column names
*   Make observations about the schema
*   Show the first 5 rows
*   Use the describe method to learn about the data frame

Data Preparation
*   Format all the data to 2 decimal places i.e. format_number()
*   Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

Data Analysis
*   What day had the Peak High in Price?
*   What is the mean of the Close column?
*   What is the max and min of the Volume column?
*   How many days was the Close lower than 60 dollars?
*   What percentage of the time was the High greater than 80 dollars?
*    What is the Pearson correlation between High and Volume?
*    What is the max High per year?
*    What is the average Close for each Calendar Month?


## d) Recording the Experimental Design
* Importation of Pre-Requisites
* Data Importation and Exploration
* Data Preparation
* Data Analysis

## e) Data Relevance
Was the data relevant for our analysis? YES

 # Pre-Requisites

In [1]:
#installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 66.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=c162d30fa465b478d16d632a50116d2abcbb2bd7bf82eacdcae7a0c087050a5e
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
# running a local spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# Data Importation & Exploration

In [3]:
# importingn the data
from google.colab import files
uploaded = files.upload()

S = open('saf_stock.csv')

for i in range(0,4):
    print(S.readline())

Saving saf_stock.csv to saf_stock.csv
Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539



In [19]:
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
# df = sqlCtx.read.csv("saf_stock.csv",header=True)
df = sqlCtx.read.csv("saf_stock.csv", inferSchema=True, header=True)
# Print the type
print(type(df))



<class 'pyspark.sql.dataframe.DataFrame'>


In [5]:
#checking the first 5 rows
df.show(n=5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [6]:
# displaying the column names
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [21]:
# displaying the schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [22]:
# converting data types from string to float for numerical values and to date for the date column
from pyspark.sql.types import StringType, DateType, FloatType

df = df \
  .withColumn("Open" ,
              df["Open"]
              .cast(FloatType()))   \
  .withColumn("High" ,
              df["High"]
              .cast(FloatType()))   \
  .withColumn("Low" ,
              df["Low"]
              .cast(FloatType()))   \
  .withColumn("Close" ,
              df["Close"]
              .cast(FloatType()))   \
  .withColumn("Volume" ,
              df["Volume"]
              .cast(FloatType()))   \
  .withColumn("Adj Close",
              df["Adj Close"]
              .cast(FloatType()))    \
  .withColumn("Date"  ,
              df["Date"]
              .cast(DateType())) \
  
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- Adj Close: float (nullable = true)



In [9]:
#statistical analysis of the data
from pyspark.sql import SQLContext
df.describe().show()

+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|72.35785375452572| 72.8393880756178|71.91860094964979|72.38844997363553|8222093.478537361|67.23883840200064|
| stddev|6.768090251767697|6.768186825250206|6.744075739203606|6.756859160119612|4519780.791987604|6.722609385249684|
|    min|            56.39|            57.06|             56.3|            56.42|        2094900.0|         50.36369|
|    max|             90.8|            90.97|            89.25|            90.47|      8.0898096E7|        84.914215|
+-------+-----------------+-----------------+-----------

# Data Preparation

In [28]:
# Converting the values to 2 decimal places
from pyspark.sql.functions import format_number
format_number(df['Open'].cast('float'),2).alias('Open'),
format_number(df['High'].cast('float'), 2).alias('High'),
format_number(df['Low'].cast('float'), 2).alias('Low'),
format_number(df['Close'].cast('float'), 2).alias('Close'),
format_number(df['Volume'].cast('float'), 2).alias('Volume'),
format_number(df['Adj Close'].cast('float'), 2).alias('Adj Close'),


(Column<'format_number(CAST(Adj Close AS FLOAT), 2) AS `Adj Close`'>,)

In [31]:
# Creating a new data frame with a column called HV Ratio that is the ratio of the
# High Price versus volume of stock traded for a day

df1 = df.withColumn("HV Ratio",df['High']/df['Volume'])
print(df1.select('HV Ratio').show())

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714682786927E-6|
|6.290848662516662E-6|
| 4.66941298944916E-6|
| 7.36733843444859E-6|
|8.915604814435727E-6|
|8.644477449144044E-6|
|9.351828386844425E-6|
| 8.29141562102703E-6|
|7.712212051589609E-6|
|7.071764777688419...|
|1.015495462653464...|
|  6.5763540967921E-6|
| 5.90145296180676E-6|
|8.547679390846264E-6|
|8.420709512685392E-6|
|1.041448335142357...|
|8.316075435382035E-6|
|9.721183804158345E-6|
|8.029435987746889E-6|
|6.307432228123159E-6|
+--------------------+
only showing top 20 rows

None


# Data Analysis

In [33]:
# Day with peak high in price
print(df1.orderBy(df1['High'].desc()).head(1)[0][0])

2015-01-13


In [35]:
# Mean of the close column
from pyspark.sql.functions import mean
print(df1.select(mean('Close')).show())

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844997363553|
+-----------------+

None


In [37]:
# max and min of the Volume column
from pyspark.sql.functions import max, min
print(df1.select(max('Volume'),min('Volume')).show())

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|8.0898096E7|  2094900.0|
+-----------+-----------+

None


In [38]:
# days when the close was lower than 60 dollars
print(df1.filter(df1['Close'] < 60).count())

81


In [39]:
# percentage of the time when high was greater than 80 dollars
print((df1.filter(df1['High']>80).count()/df1.count()) * 100)

9.141494435612083


In [40]:
# Pearson correlation between High and Volume
from pyspark.sql.functions import corr
print(df1.select(corr('High','Volume')).show())

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326095027024|
+-------------------+

None


In [45]:
# max high per year
from pyspark.sql.functions import year
yeardf = df1.withColumn("Year",year(df1['Date']))
max_df = yeardf.groupBy('Year').max()
print(max_df.select('Year','max(High)').show())


+----+---------+
|Year|max(High)|
+----+---------+
|2015|    90.97|
|2013|    81.37|
|2014|    88.09|
|2012|     77.6|
|2016|    75.19|
+----+---------+

None


In [47]:
# average close for each Calendar Month /average close per month
from pyspark.sql.functions import month
monthdf = df1.withColumn('Month',month('Date'))
monthavg = monthdf.select(['Month','Close']).groupBy('Month').mean()
print(monthavg.select('Month','avg(Close)').orderBy('Month').show())

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1| 71.4480196131338|
|    2|71.30680438169499|
|    3|71.77794376266337|
|    4|72.97361900692894|
|    5|72.30971685445533|
|    6| 72.4953774506191|
|    7|74.43971944078106|
|    8| 73.0298185521906|
|    9|72.18411782208611|
|   10| 71.5785454489968|
|   11|72.11108927207418|
|   12|72.84792482628012|
+-----+-----------------+

None
