In [81]:
# Installing pyspark
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [82]:
# Starting a spark session and loading the saf_stock file.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [22]:
stock_df = spark.read.csv("saf_stock.csv", header=True, inferSchema=True)


In [79]:
# Fetching the column names then print them.
columns = stock_df.columns
bold = '\033[1m'

print(bold +"SAFARICOM STOCK FILE NAMES:")

for col in columns:
    print(col)

[1mSAFARICOM STOCK FILE NAMES:
Date
Open
High
Low
Close
Volume
Adj Close


In [32]:
#  Make observations about the schema.
stock_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [33]:
# Showing first 5 rows.
stock_df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [83]:
# Using the describe method to learn about the data frame.
stock_df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [43]:
# Formating all the data to 2 decimal places i.e. format_number()

from pyspark.sql.functions import format_number, col

numeric_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']
for col in numeric_cols:
    df = stock_df.withColumn(col, format_number(stock_df[col], 2)).show()

+-------------------+-----+------------------+------------------+------------------+--------+------------------+
|               Date| Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+-----+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|59.97|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.21|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|59.35|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|59.42|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|59.03|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10 00:00:00|59.43|59.709998999999996|             58.98|59.040001000000004| 6907300|   

In [49]:
from pyspark.sql.functions import col

# Create a new column called HV Ratio
df_hv = stock_df.withColumn('HV Ratio', format_number(col('High')/col('Volume'), 2))

df_hv.show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|HV Ratio|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|    0.00|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|    0.00|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|    0.00|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|    0.00|
|2012-01-09 00:00:00|         59.029999|         59.549

Data Analysis

● What day had the Peak High in Price?

● What is the mean of the Close column?

● What is the max and min of the Volume column?

● How many days was the Close lower than 60 dollars?

● What percentage of the time was the High greater than 80 dollars?

● What is the Pearson correlation between High and Volume?

● What is the max High per year?

● What is the average Close for each Calendar Month?

In [59]:
# What day had the Peak High in Price?
from pyspark.sql.functions import desc

High_peak_price = stock_df.orderBy(desc("High")).select("Date").first()[0]
bold = '\033[1m'
print(bold + "THE DAY WITH THE HIGHETS PEAK IN PRICE")
print(":", High_peak_price)

[1mTHE DAY WITH THE HIGHETS PEAK IN PRICE
: 2015-01-13 00:00:00


In [84]:
# What is the mean of the Close column?
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min

mean_close = stock_df.select(mean("Close")).first()[0]
print(bold +"MEAN OF THE CLOSE COLUMN:", mean_close)

[1mMEAN OF THE CLOSE COLUMN: 72.38844998012726


In [85]:
# What is the max and min of the Volume column?
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min

max_volume = stock_df.select(max("Volume")).first()[0]
min_volume = stock_df.select(min("Volume")).first()[0]
print(bold +"THE MAXIMUM VOLUME IS:", max_volume)
print("THE MINIMUM VOLUME IS:", min_volume)

[1mTHE MAXIMUM VOLUME IS: 80898100
THE MINIMUM VOLUME IS: 2094900


In [76]:
# How many days was the Close lower than 60 dollars?
days_lower_than_60 = stock_df.filter("Close < 60").count()
print(bold +"The number of days when the Close was lower than 60 dollars is:", days_lower_than_60, "Days" )


[1mThe number of days when the Close was lower than 60 dollars is: 81 Days


In [99]:
# What percentage of the time was the High greater than 80 dollars?
from pyspark.sql.functions import count

# Find the percentage of the time when the High was greater than 80 dollars
high_greater_than_80 = stock_df.filter("High > 80")
percentage = (high_greater_than_80.count() / stock_df.count()) * 100

print(bold + "The percentage of the time when the High was greater than 80 dollars is: {:.2f}%".format(percentage))



[1mThe percentage of the time when the High was greater than 80 dollars is: 9.14%


In [98]:
# What is the Pearson correlation between High and Volume?
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min

correlation = stock_df.select(corr("High", "Volume")).first()[0]

print(bold + "The Pearson correlation between High and Volume is: {:.2f}".format(correlation))


[1mThe Pearson correlation between High and Volume is: -0.34


In [100]:
# What is the max High per year?
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min

max_high_per_year = stock_df.groupBy(year("Date")).agg(max("High"))
max_high_per_year.show()

+----------+---------+
|year(Date)|max(High)|
+----------+---------+
|      2015|90.970001|
|      2013|81.370003|
|      2014|88.089996|
|      2012|77.599998|
|      2016|75.190002|
+----------+---------+



In [107]:
# What is the average Close for each Calendar Month?
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min, avg, desc

avg_close_per_month = stock_df.groupBy(month("Date")).agg(avg("Close"))
avg_close_per_month.show()

+-----------+-----------------+
|month(Date)|       avg(Close)|
+-----------+-----------------+
|         12|72.84792478301885|
|          1|71.44801958415842|
|          6| 72.4953774245283|
|          3|71.77794377570092|
|          5|72.30971688679247|
|          9|72.18411785294116|
|          4|72.97361900952382|
|          8|73.02981855454546|
|          7|74.43971943925233|
|         10|71.57854545454543|
|         11| 72.1110893069307|
|          2|  71.306804443299|
+-----------+-----------------+

