## Apache Spark DataFrames Project
## Instructions
As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.

You will need to perform the following:

Data Importation and Exploration

● Start a spark session and load the stock file while inferring the data types.
● Determine the column names
● Make observations about the schema.
● Show the first 5 rows
● Use the describe method to learn about the data frame

Data Preparation

● Format all the data to 2 decimal places i.e. format_number()
● Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day

Data Analysis

● What day had the Peak High in Price?
● What is the mean of the Close column?
● What is the max and min of the Volume column?
● How many days was the Close lower than 60 dollars?
● What percentage of the time was the High greater than 80 dollars?
● What is the Pearson correlation between High and Volume?
● What is the max High per year?
● What is the average Close for each Calendar Month?

## Project Deliverable

In [14]:
# Installing pyspark
# ---
#
!pip install pyspark
import pandas as pd



In [2]:
# Run a local spark session
# ---
#
from pyspark import SparkFiles
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## 1. 2. Load data from url and save in csv file

In [5]:
!wget -O saf_stock.csv https://bit.ly/3pmchka
f = open('saf_stock.csv')

for i in range(0,5):
    print(f.readline())

--2023-07-12 17:21:29--  https://bit.ly/3pmchka
Resolving bit.ly (bit.ly)... 67.199.248.10, 67.199.248.11
Connecting to bit.ly (bit.ly)|67.199.248.10|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://archive.org/download/saf_stock/saf_stock.csv [following]
--2023-07-12 17:21:29--  https://archive.org/download/saf_stock/saf_stock.csv
Resolving archive.org (archive.org)... 207.241.224.2
Connecting to archive.org (archive.org)|207.241.224.2|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://ia804605.us.archive.org/18/items/saf_stock/saf_stock.csv [following]
--2023-07-12 17:21:29--  https://ia804605.us.archive.org/18/items/saf_stock/saf_stock.csv
Resolving ia804605.us.archive.org (ia804605.us.archive.org)... 207.241.235.94
Connecting to ia804605.us.archive.org (ia804605.us.archive.org)|207.241.235.94|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 90266 (88K) [text/csv]
Saving to

#Determine the column names

In [23]:
df = pd.read_csv('saf_stock.csv')
df.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'], dtype='object')

#Read json data from csv file and print its schema

In [24]:
#Reading in Data
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.csv("saf_stock.csv")

df.registerTempTable('saf_stock')
tables = sqlCtx.tableNames()
print(tables)
# Print the type
print(type(df))

#Schema
#Call the printSchema() method on the Spark DataFrame df to display the schema that Spark inferred.
df = sqlCtx.read.csv("saf_stock.csv", header=True, inferSchema=True)
df.printSchema()




['saf_stock']
<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#Show the first 5 rows

In [25]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



#Use the describe method to learn about the data frame

In [26]:
query = 'select * from saf_stock'
sqlCtx.sql(query).describe().show(5)


+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|       _c0|               _c1|              _c2|              _c3|              _c4|              _c5|              _c6|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1259|              1259|             1259|             1259|             1259|             1259|             1259|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|      Date|              Open|             High|              Low|            Clo

In [44]:
import pyspark.sql.functions as F
from pyspark.sql import SQLContext

from pyspark.sql.functions import lit,when,col,expr,round
df_prep= (
 df.withColumn('Open', F.format_number('Open', 2))
 .withColumn('High', F.format_number('High', 2))
 .withColumn('Low', F.format_number('Low', 2))
 .withColumn('Close', F.format_number('Close', 2))
 .withColumn('Volume', F.format_number('Volume', 2))
 .withColumn('Adj Close', F.format_number('Adj Close', 2)))
df_prep.show(5)

+----------+-----+-----+-----+-----+-------------+---------+
|      Date| Open| High|  Low|Close|       Volume|Adj Close|
+----------+-----+-----+-----+-----+-------------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
+----------+-----+-----+-----+-----+-------------+---------+
only showing top 5 rows



#Create a new data frame with a column called HV Ratio that is the ratio of the
#High Price versus volume of stock traded for a day

In [46]:
from pyspark.sql.functions import lit,when,col,expr,round
df_prep= (
 df.withColumn('HV',expr("High/Volume")))

df_prep.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|                  HV|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|8.915604778943901E-6|
+----------+------------------+---------

#Data Analysis

In [49]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)


#Register a table in SQL
table = df_prep.registerTempTable("saf_stock1")

table = sqlCtx.tableNames()
print(table)

['saf_stock', 'saf_stock1']




What day had the Peak High in Price?

In [53]:
query = "SELECT \
         Date,max(High) AS Peak_Price \
     FROM saf_stock1 GROUP BY Date \
    ORDER BY Peak_Price DESC LIMIT 1 "
sqlCtx.sql(query).show()

+----------+----------+
|      Date|Peak_Price|
+----------+----------+
|2015-01-13| 90.970001|
+----------+----------+



What is the mean of the Close column?

In [54]:
query = "\
SELECT\
    MEAN(Close) AS MEAN\
        FROM saf_stock1"

sqlCtx.sql(query).show()

+-----------------+
|             MEAN|
+-----------------+
|72.38844998012726|
+-----------------+



What is the max and min of the Volume column?

In [55]:
query = "SELECT\
     MIN(Volume) Min_Volume, MAX(Volume) Max_Volume\
         FROM saf_stock1 "

sqlCtx.sql(query).show()

+----------+----------+
|Min_Volume|Max_Volume|
+----------+----------+
|   2094900|  80898100|
+----------+----------+



How many days was the Close lower than 60 dollars?

In [57]:
query = "SELECT  COUNT(Date) FROM saf_stock1 WHERE Close <= 60 "
sqlCtx.sql(query).show()

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



What percentage of the time was the High greater than 80 dollars?

In [61]:
query = "SELECT ROUND((COUNT(High)/1258*100),2) higher_than_80  FROM saf_stock1  WHERE High >= 80"

sqlCtx.sql(query).show()

+--------------+
|higher_than_80|
+--------------+
|          9.14|
+--------------+



What is the Pearson correlation between High and Volume?


In [62]:
query = "SELECT ROUND(corr(High,Volume),2) Pearson_Correlation\
          FROM saf_stock1"

sqlCtx.sql(query).show()

+-------------------+
|Pearson_Correlation|
+-------------------+
|              -0.34|
+-------------------+



What is the max High per year?

In [63]:
query = "SELECT  EXTRACT(YEAR FROM Date) Year, MAX(High) Max_High FROM saf_stock1\
     GROUP BY Year ORDER BY Max_High DESC"
sqlCtx.sql(query).show()

+----+---------+
|Year| Max_High|
+----+---------+
|2015|90.970001|
|2014|88.089996|
|2013|81.370003|
|2012|77.599998|
|2016|75.190002|
+----+---------+



What is the average Close for each Calendar Month?

In [64]:
query = "SELECT EXTRACT(MONTH FROM Date) Month,ROUND(AVG(Close),2) Avg_Close FROM saf_stock1\
    GROUP BY Month ORDER BY Month ASC"

sqlCtx.sql(query).show()

+-----+---------+
|Month|Avg_Close|
+-----+---------+
|    1|    71.45|
|    2|    71.31|
|    3|    71.78|
|    4|    72.97|
|    5|    72.31|
|    6|     72.5|
|    7|    74.44|
|    8|    73.03|
|    9|    72.18|
|   10|    71.58|
|   11|    72.11|
|   12|    72.85|
+-----+---------+

