<a href="https://colab.research.google.com/github/fkivuti/project_pyspark/blob/main/Spark_DataFrame_Project_ipynbWk9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### <b> Prerequisite <b/>



Perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.

## Install necessary libraries

In [1]:
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=0db58d28be446413cdf628dfdc8766667c8634ae896b1d35bc031b71d11527bb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
# Run a local spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## Preview first 5 lines of the csv file

In [3]:
# Print the first 4 lines of the file saf_stock.csv
with open ('saf_stock.csv') as f:
  for i in range(0,5):
    print(f.readline())

Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539

2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922



## Read the data into a dataframe

## Display column headers

In [52]:
# Read the saf_stock.csv file
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sqlCtx`
sqlCtx = SQLContext(sc)

# Read csv data into a DataFrame object `df`
df = sqlCtx.read.csv("saf_stock.csv", header = True)
print(df.columns)



['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [5]:
# Show df type
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


## Register the dataframe as a table

In [6]:
# Register the dataframe as a table and print the name
df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()
print(tables)

# df = sqlCtx.read.csv("saf_stock.csv")
# df.registerTempTable('saf_stock')
# tables = sqlCtx.tableNames()
# print(tables)

['saf_stock']


In [7]:
# Preview first 10 rows of the table
df.show(10)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [9]:
# check the statistcal summary
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

## Data Preparation

In [11]:
#import col function for column manipulation
from pyspark.sql.functions import col, format_number
from pyspark.sql import functions as F

df_rounded=df.select(df['Date'],
              format_number(df['Open'].cast('float'),2).alias('Open'),
              format_number(df['High'].cast('float'), 2).alias('High'),
              format_number(df['Low'].cast('float'), 2).alias('Low'),
              format_number(df['Close'].cast('float'), 2).alias('Close'),
              df['Volume'].cast('int').alias('Volume'),
              format_number(df['Adj Close'].cast('float'), 2).alias('Adj Close')
              )

df_rounded.show(10)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300|    51.49|
|2012-01-11|59.06|59.53|59.04|59.40| 6365600|    51.81|
|2012-01-12|59.79|60.00|59.40|59.50| 7236400|    51.90|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|    51.93|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000|    52.20|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 10 rows



In [16]:
# Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

df_2 = df_rounded.withColumn("HV Ratio",df_rounded['High']/df_rounded['Volume'])
print(df_2.show(10))

+----------+-----+-----+-----+-----+--------+---------+--------------------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|            HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+--------------------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|4.819714574387472E-6|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|6.290848821573389...|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|4.669413073103491E-6|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46|7.367338339901356E-6|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|8.915604928660188E-6|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300|    51.49|8.644477581688938E-6|
|2012-01-11|59.06|59.53|59.04|59.40| 6365600|    51.81| 9.35182857861003E-6|
|2012-01-12|59.79|60.00|59.40|59.50| 7236400|    51.90| 8.29141562102703E-6|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|    51.93|7.712211972623653E-6|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000|    52.20|7.071764705882352...|

## Data Analysis

In [17]:
# What day had the Peak High in Price?

print(df_2.orderBy(df_2['High'].desc()).head(1)[0][0])


2015-01-13


In [22]:
#What is the mean of the Close column?

from pyspark.sql.functions import mean, max, min
mean=df_2.select(mean('Close'))
print(mean.show())

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+

None


In [28]:
# What is the max and min of the Volume column?

maxmin = df_2.select(max('Volume'),min('Volume'))
print(maxmin.show())

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+

None


In [30]:
# How many days was the Close lower than 60 dollars?

lower_60 = df_2.filter(df_2['Close'] < 60).count()
print(lower_60)


81


In [31]:
# What percentage of the time was the High greater than 80 dollars?

greater_80 = (df_2.filter(df_2['High']>80).count()/df.count()) * 100
print(greater_80)

8.426073131955485


In [33]:
# What is the Pearson correlation between High and Volume?

from pyspark.sql.functions import corr

pcorr= df_2.select(corr('High','Volume'))
print(pcorr.show())

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.33843260582148915|
+--------------------+

None


In [46]:
# What is the max High per year?
from pyspark.sql.functions import year

df_2a=df_2.select(df_2['Date'],
              df_2['High'].cast('float').alias('High'),
              df_2['Close'].cast('float').alias('Close')
              )

df_2b = df_2a.withColumn("Year",year(df_2['Date']))
max_high = df_2b.groupBy('Year').max()
print(max_high.select('Year','max(High)').orderBy('Year').show())


+----+---------+
|Year|max(High)|
+----+---------+
|2012|     77.6|
|2013|    81.37|
|2014|    88.09|
|2015|    90.97|
|2016|    75.19|
+----+---------+

None


In [43]:
# What is the average Close for each Calendar Month?

from pyspark.sql.functions import month

df_month = df_2b.withColumn('Month',month('Date'))
avg_month = df_month.select(['Month','Close']).groupBy('Month').mean()
print(avg_month.select('Month','avg(Close)').orderBy('Month').show(10))

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1| 71.4480196131338|
|    2|71.30680438169499|
|    3|71.77794376266337|
|    4|72.97361900692894|
|    5|72.30971685445533|
|    6| 72.4953774506191|
|    7|74.43971944078106|
|    8| 73.0298185521906|
|    9|72.18411782208611|
|   10| 71.5785454489968|
+-----+-----------------+
only showing top 10 rows

None


## Summary Findings

● What day had the Peak High in Price? 2015-01-13

● What is the mean of the Close column?  72.39

● What is the max and min of the Volume column? Max 80898100  Min 2094900

● How many days was the Close lower than 60 dollars? 81

● What percentage of the time was the High greater than 80 dollars? 8.42

● What is the Pearson correlation between High and Volume?  -0.34
