**Apache Spark DataFrames Assignment Project**

Problem statement##
As a Data professional, you need to perform an analysis by answering questions about some stock market data on Safaricom from the years 2012-2017

Required actions
Data Importation and Exploration:

● Start a spark session and load the stock file while inferring the data types.

● Determine the column names

● Make observations about the schema.

● Show the first 5 rows

● Use the describe method to learn about the data frame

Data Preparation

● Format all the data to 2 decimal places i.e. format_number()

● Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

Data Analysis

● What day had the Peak High in Price?

● What is the mean of the Close column?

● What is the max and min of the Volume column?

● How many days was the Close lower than 60 dollars?

● What percentage of the time was the High greater than 80 dollars?

● What is the Pearson correlation between High and Volume?

● What is the max High per year?

● What is the average Close for each Calendar Month?

Data Source :Dataset URL (CSV File): https://bit.ly/3pmchka

In [1]:
#pyspark instalation

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=e03c359db2670bb62c81dba3f3569ef1f092c62f9e9567339530cfca2ddb0b93
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
#run the spark session
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("stock_market_data").getOrCreate()
sql_context = SQLContext(spark)




In [3]:
# Start a Spark session and read a file with inferred data types
stock_saf_df = spark.read.csv("saf_stock.csv", header=True, inferSchema=True)

In [4]:
# Print the column names of the DataFrame
print(stock_saf_df.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [5]:
# Print the schema of the DataFrame
stock_saf_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
# Show the first 5 rows of a DataFrame
stock_saf_df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [7]:
# Describe the DataFrame
stock_saf_df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

**Data Cleaning and Transformation**

In [8]:
# Format numeric columns to 2 decimal places
from pyspark.sql.functions import format_number

numeric_cols = [col[0] for col in stock_saf_df.dtypes if col[1] in ['double', 'int']]
df = stock_saf_df.select("Date", *[format_number(col, 2).alias(col) for col in numeric_cols])

# Show the first 5 rows of the formatted DataFrame
df.show(5)


+----------+-----+-----+-----+-----+-------------+---------+
|      Date| Open| High|  Low|Close|       Volume|Adj Close|
+----------+-----+-----+-----+-----+-------------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
+----------+-----+-----+-----+-----+-------------+---------+
only showing top 5 rows



In [9]:
from pyspark.sql.functions import regexp_replace, col

# Remove commas from Volume column
stock_saf_df = stock_saf_df.withColumn("Volume", regexp_replace(col("Volume"), ",", ""))

# Calculate HV ratio and create new column
stock_saf_df = stock_saf_df.withColumn("HV Ratio", stock_saf_df.High/ stock_saf_df.Volume)

# Show the first 5 rows of the resulting dataframe
stock_saf_df.show(5)


+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|8.915604778943901E-6|
+----------+------------------+---------

**Data Analysis: Steps and Overview.**

In [10]:
# Create SQL context
sql_ctx = SQLContext(spark)

# Create temporary view of DataFrame
stock_saf_df.createOrReplaceTempView("stock_data")

# Get list of tables in SQL context
print("Tables in SQLContext:", sql_ctx.tableNames())


Tables in SQLContext: ['stock_data']


In [11]:
# Query DataFrame to get date and high price of day with highest high price
peak_high_price = stock_saf_df.select("Date", "High").orderBy("High", ascending=False).limit(1)

# Display the resulting data
peak_high_price.show()


+----------+---------+
|      Date|     High|
+----------+---------+
|2015-01-13|90.970001|
+----------+---------+



In [12]:
# Calculate mean of the close column
close_mean = stock_saf_df.selectExpr("mean(Close)").collect()[0][0]

# Display the resulting mean
print("Mean of close column:", close_mean)


Mean of close column: 72.38844998012726


In [13]:
# Get minimum and maximum values of Volume column
volume_stats = stock_saf_df.selectExpr("min(Volume)", "max(Volume)").first()

# Display the resulting statistics
print("Minimum Volume:", volume_stats[0])
print("Maximum Volume:", volume_stats[1])


Minimum Volume: 10010500
Maximum Volume: 9994400


In [14]:
# Count the number of days with close price < 60
days_60_count = stock_saf_df.filter("Close < 60").count()

# Display the resulting count
print("Number of days with close price < 60:", days_60_count)


Number of days with close price < 60: 81


In [15]:
# Calculate percentage of time where High is >80$
high_count = stock_saf_df.filter("High > 80").count()
total_count = stock_saf_df.count()
percentage = (high_count / total_count) * 100

# Display the resulting percentage
print("% time where High is >80$: {:.2f}%".format(percentage))


% time where High is >80$: 9.14%


In [16]:
# Import corr function from PySpark
from pyspark.sql.functions import corr

# Calculate Pearson correlation coefficient between High and Volume
correlation = stock_saf_df.select(corr("High", "Volume")).collect()[0][0]

# Display the resulting correlation coefficient
print("Pearson correlation between High and Volume is:", correlation)


Pearson correlation between High and Volume is: -0.3384326061737161


In [17]:
# Import the functions for grouping and aggregating data
from pyspark.sql.functions import year, max

# Group the stock_saf_df DataFrame by year and calculate the maximum High for each year
high_max = stock_saf_df.groupBy(year("Date").alias("year")).agg(max("High").alias("max_high")).orderBy("max_high", ascending=False)

# Show the result
high_max.show()




+----+---------+
|year| max_high|
+----+---------+
|2015|90.970001|
|2014|88.089996|
|2013|81.370003|
|2012|77.599998|
|2016|75.190002|
+----+---------+



# New Section