# **Apache Spark DataFrames Project**

**Instructions**
As a Data professional, you need to perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2017.

**Data Importation and Exploration**


In [1]:
# Installing pyspark
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=66a904a4f27a549f2ed87ebceb85cb0989c3bfe9af814fda4810beb7b79ef498
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# We run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [15]:
#Downloaded the stock csv file from https://bit.ly/3pmchka
#-----
#reading file and printing the column names

saf_stock = open('saf_stock.csv')
saf_stock.readline()

'Date,Open,High,Low,Close,Volume,Adj Close\n'

In [34]:
#Load the stocks file and infer the data types
#Making Observations about the schema

from pyspark.sql import SQLContext

sqlCtx = SQLContext(sc)

# Read csv data into a DataFrame object `saf_df`

saf_df = spark.read.option("header","true").option("inferSchema","true").csv("saf_stock.csv")

#Making Observations about the schema
saf_df.printSchema()
saf_df.show()

# Print the type
#print(type(saf_df))



root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|1276

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

Using the desscribe method to learn about the data frame

In [37]:
print(saf_df.describe())

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]


In [36]:
from pyspark.sql.functions import format_number

result = saf_df.describe()
print(result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'), 2).alias('High'),
              format_number(result['Low'].cast('float'), 2).alias('Low'),
              format_number(result['Close'].cast('float'), 2).alias('Close'),
              result['Volume'].cast('int').alias('Volume'),
              format_number(result['Adj Close'].cast('float'), 2).alias('Adj Close')
              ).show())


+-------+--------+--------+--------+--------+--------+---------+
|summary|    Open|    High|     Low|   Close|  Volume|Adj Close|
+-------+--------+--------+--------+--------+--------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|    84.91|
+-------+--------+--------+--------+--------+--------+---------+

None


In [38]:
saf_df2 = saf_df.withColumn("HV Ratio",saf_df['High']/saf_df['Volume'])
print(saf_df2.select('HV Ratio').show())

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows

None


What day had the Peak High in Price?

In [42]:
print(saf_df.orderBy(saf_df['High'].desc()).head(1)[0][0])

2015-01-13 00:00:00


● What is the mean of the Close column?

In [43]:
from pyspark.sql.functions import mean

print(saf_df.select(mean('Close')).show())

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+

None


What is the max and min of the Volume column?

In [44]:
from pyspark.sql.functions import max, min

print(saf_df.select(max('Volume'),min('Volume')).show())

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+

None


How many days was the Close lower than 60 dollars?

In [45]:
print(saf_df.filter(saf_df['Close'] < 60).count())

81


What percentage of the time was the High greater than 80 dollars?


In [46]:
print((saf_df.filter(saf_df['High']>80).count()/saf_df.count()) * 100)

9.141494435612083


What is the Pearson correlation between High and Volume?

In [47]:
from pyspark.sql.functions import corr

print(saf_df.select(corr('High','Volume')).show())

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+

None


What is the max High per year?

In [48]:
from pyspark.sql.functions import year

saf_df.withColumn("Year",year(saf_df['Date'])).groupBy('Year').max().select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



What is the average Close for each Calendar Month?

In [49]:
from pyspark.sql.functions import month

saf_df.withColumn('Month',month('Date')).select(['Month','Close']).groupBy('Month').mean().select('Month','avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

