#Apache Spark DataFrames Project


Project Deliverable

You will be required to submit:

● A GitHub repository with your project written in Pyspark.


##Instructions

As a Data professional, you need to perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2017.
You will need to perform the following:


**Data Importation and Exploration**

● Start a spark session and load the stock file while inferring the data types.

● Determine the column names

● Make observations about the schema.

● Show the first 5 rows

● Use the describe method to learn about the data frame

**Data Preparation**

● Format all the data to 2 decimal places i.e. format_number()

● Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day

**Data Analysis**

● What day had the Peak High in Price?

● What is the mean of the Close column?

● What is the max and min of the Volume column?

● How many days was the Close lower than 60 dollars?

● What percentage of the time was the High greater than 80 dollars?

● What is the Pearson correlation between High and Volume?

● What is the max High per year?

● What is the average Close for each Calendar Month?

**Data description**

● Dataset URL (CSV File): https://bit.ly/3pmchka


##Pre-requisites

In [1]:
# Installing pyspark
# ---

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=4201b46d1f0cf2adbe6f36beafe2f1b09824d8deb3b9d1e454fc7942fe751b00
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Run a local spark session
# ---

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

##Data Importation and Exploration

In [3]:
# Start a spark session and load the stock file while inferring the data types.

stock_file_df = spark.read.csv("saf_stock.csv", header=True, inferSchema=True)

In [4]:
# Determine the column names
print("Column Names:")
print(stock_file_df.columns)

Column Names:
['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [5]:
# Make observations about the schema
print("Schema:")
stock_file_df.printSchema()


Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
# Show the first 5 rows
stock_file_df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [7]:
# Use the describe method to learn about the data frame

stock_file_df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

##Data Preparation

In [8]:
# Format all the data to 2 decimal places i.e format_number()

from pyspark.sql.functions import format_number
numeric_columns = [col[0] for col in stock_file_df.dtypes if col[1] in ['double', 'int']]
stock_file_df = stock_file_df.select(stock_file_df["Date"], *(format_number(col, 2).alias(col) for col in numeric_columns))
stock_file_df.show(5)


+-------------------+-----+-----+-----+-----+-------------+---------+
|               Date| Open| High|  Low|Close|       Volume|Adj Close|
+-------------------+-----+-----+-----+-----+-------------+---------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
+-------------------+-----+-----+-----+-----+-------------+---------+
only showing top 5 rows



In [9]:
# Create a new column "HV Ratio" that is the ratio of the High Price versus volume of stock traded for a day

from pyspark.sql.functions import regexp_replace, col
stock_file_df = stock_file_df.withColumn("Volume", regexp_replace(col("Volume"), ",", ""))
stock_file_df = stock_file_df.withColumn("HV Ratio", stock_file_df.High/ stock_file_df.Volume)
stock_file_df.show(5)

+-------------------+-----+-----+-----+-----+-----------+---------+--------------------+
|               Date| Open| High|  Low|Close|     Volume|Adj Close|            HV Ratio|
+-------------------+-----+-----+-----+-----+-----------+---------+--------------------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800.00|    52.62|4.819714574387472E-6|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300.00|    52.08|6.290848821573389...|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200.00|    51.83|4.669413073103491E-6|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8069400.00|    51.46|7.367338339901356E-6|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300.00|    51.62|8.915604928660188E-6|
+-------------------+-----+-----+-----+-----+-----------+---------+--------------------+
only showing top 5 rows



##Data Analysis

In [10]:
from pyspark.sql import SQLContext

sqlCtx = SQLContext(spark)
stock_file_df.createOrReplaceTempView("stock_data")

print("Tables in SQLContext:", sqlCtx.tableNames())


Tables in SQLContext: ['stock_data']




In [11]:
# What day had the Peak High in Price?
day_peak_high_price = sqlCtx.sql("SELECT Date, High FROM stock_data ORDER BY High DESC LIMIT 1")
day_peak_high_price.show()

+-------------------+-----+
|               Date| High|
+-------------------+-----+
|2015-01-13 00:00:00|90.97|
+-------------------+-----+



In [15]:
# What is the mean of the Close column?
mean_query = """
select mean(Close)
from stock_data
"""
sqlCtx.sql(mean_query).show()


+-----------------+
|      mean(Close)|
+-----------------+
|72.38844992050863|
+-----------------+



In [16]:
#What is the max and min of the Volume column?
max_min_query = """
select min(Volume) as min_Volume, max(Volume) as max_Volume
from stock_data
"""
sqlCtx.sql(max_min_query).show()
     

+-----------+----------+
| min_Volume|max_Volume|
+-----------+----------+
|10010500.00|9994400.00|
+-----------+----------+



In [17]:
# How many days was the Close lower than 60 dollars?
days_result = sqlCtx.sql("SELECT COUNT(Date) FROM stock_data WHERE Close < 60")

days_result.show()


+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



In [18]:
# What percentage of the time was the High greater than 80 dollars?

percent_result = sqlCtx.sql("SELECT COUNT(*) FROM stock_data WHERE High > 80")
high_count = percent_result.collect()[0][0]

percent_result = sqlCtx.sql("SELECT COUNT(*) FROM stock_data")
total_count = percent_result.collect()[0][0]

percentage = (high_count / total_count) * 100

print("Percentage of days with High price > 80: {:.2f}%".format(percentage))


Percentage of days with High price > 80: 8.43%


In [19]:
# What is the Pearson correlation between High and Volume?

from pyspark.sql.functions import corr
pearson_result = stock_file_df.select(corr("High", "Volume").alias("correlation")).collect()[0][0]
print("Pearson correlation between High and Volume:", pearson_result)


Pearson correlation between High and Volume: -0.33843260582148915


In [20]:
# What is the max High per year?
max_result = sqlCtx.sql("SELECT year(Date) as year, max(High) as max_high FROM stock_data GROUP BY year(Date) ORDER BY max_high DESC")

# Show the result
max_result.show()

+----+--------+
|year|max_high|
+----+--------+
|2015|   90.97|
|2014|   88.09|
|2013|   81.37|
|2012|   77.60|
|2016|   75.19|
+----+--------+



In [21]:
# What is the average Close for each Calendar Month?
avg_query = """
select mean(Close) as mean_per_month, MONTH(Date) as month_no
from stock_data
group by month_no
order by month_no
"""
sqlCtx.sql(avg_query).show()

+-----------------+--------+
|   mean_per_month|month_no|
+-----------------+--------+
|71.44801980198022|       1|
|71.30680412371134|       2|
|71.77794392523363|       3|
|72.97361904761907|       4|
|72.30971698113206|       5|
|72.49537735849057|       6|
|74.43971962616824|       7|
|73.02981818181819|       8|
|72.18411764705883|       9|
|71.57854545454546|      10|
|72.11108910891085|      11|
|72.84792452830189|      12|
+-----------------+--------+

