<a href="https://colab.research.google.com/github/WKhisa/Apache-Spark-DataFrames-Project/blob/main/Apache_Spark_DataFrames_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=6b46214c959323d73d063bd715ddf1a913a0b3599f945667b091f1ef34c38c8d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
#Execute Pyspark session
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkFiles
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# Import and Preview Data

In [4]:
#Import Data
from pyspark import SparkFiles

sqlCtx = SQLContext(sc)
spark.sparkContext.addFile("saf_stock.csv")
saf_stock_df = spark.read.options(header=True, inferSchema='True', delimiter=',', dateFormat='yyyy-mm-dd').csv("file://" + SparkFiles.get("saf_stock.csv"))

saf_stock_df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()
print(tables)



['saf_stock']


In [5]:
#check columns
saf_stock_df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [7]:
saf_stock_df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [8]:
saf_stock_df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

# Data Preparation

In [9]:
# Format data to 2 decimal places
from pyspark.sql.functions import format_number

saf_stock_df = saf_stock_df.select("Date",
               format_number("Open", 2).alias("Open"),
                format_number("High", 2).alias("High"),
                format_number("Low", 2).alias("Low"),
                format_number("Close", 2).alias("Close"),
                format_number("Volume", 2).alias("Volume"),
                format_number("Adj Close", 2).alias("Adj Close")
                )
saf_stock_df.show(3)

+----------+-----+-----+-----+-----+-------------+---------+
|      Date| Open| High|  Low|Close|       Volume|Adj Close|
+----------+-----+-----+-----+-----+-------------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
+----------+-----+-----+-----+-----+-------------+---------+
only showing top 3 rows



In [13]:
#Create new dataframe
newsaf_stock_df = saf_stock_df.withColumn("HV Ratio",saf_stock_df['High']/saf_stock_df['Volume'])
saf_stock_df.show()

+----------+-----+-----+-----+-----+-------------+---------+
|      Date| Open| High|  Low|Close|       Volume|Adj Close|
+----------+-----+-----+-----+-----+-------------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
|2012-01-10|59.43|59.71|58.98|59.04| 6,907,300.00|    51.49|
|2012-01-11|59.06|59.53|59.04|59.40| 6,365,600.00|    51.81|
|2012-01-12|59.79|60.00|59.40|59.50| 7,236,400.00|    51.90|
|2012-01-13|59.18|59.61|59.01|59.54| 7,729,300.00|    51.93|
|2012-01-17|59.87|60.11|59.52|59.85| 8,500,000.00|    52.20|
|2012-01-18|59.79|60.03|59.65|60.01| 5,911,400.00|    52.34|
|2012-01-19|59.93|60.73|59.75|60.61| 9,234,600.00|    52.86|
|2012-01-20|60.75|61.25|60.67|61.01|10,378,800.00|    53.21|
|2012-01-23|60.81|60.98|

# Analysis

What day had the Peak High in Price?


In [14]:
peak_high_day = newsaf_stock_df.orderBy(newsaf_stock_df['High'].desc()).head(1)
print(peak_high_day[0][0])

2015-01-13


What is the mean of the Close column?


In [17]:
from pyspark.sql.functions import mean
print(newsaf_stock_df.select(mean('Close')).show())

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+

None


 What is the max and min of the Volume column?


In [19]:
from pyspark.sql.functions import max, min
print(newsaf_stock_df.select(max("Volume"), min("Volume")).show())

+------------+-------------+
| max(Volume)|  min(Volume)|
+------------+-------------+
|9,994,400.00|10,010,500.00|
+------------+-------------+

None


How many days was the Close lower than 60 dollars?

In [20]:
print(newsaf_stock_df.filter(newsaf_stock_df['Close'] < 60).count())


81


What percentage of the time was the High greater than 80 dollars?

In [21]:
print((newsaf_stock_df.filter(newsaf_stock_df['High']>80).count()/newsaf_stock_df.count()) * 100)


8.426073131955485


What is the Pearson correlation between High and Volume?

In [22]:
from pyspark.sql.functions import corr

newsaf_stock_df.select(corr('High','Volume')).show()

+------------------+
|corr(High, Volume)|
+------------------+
|              null|
+------------------+



What is the max High per year?

In [26]:
from pyspark.sql.functions import year
yeardf = newsaf_stock_df.withColumn("Year",year(newsaf_stock_df['Date']))
max_df = yeardf.groupBy('Year').max()
newsaf_stock_df.withColumn("Year", year(newsaf_stock_df["Date"])).groupBy('Year').agg(max("High").alias("High")).show()


+----+-----+
|Year| High|
+----+-----+
|2012|77.60|
|2013|81.37|
|2014|88.09|
|2015|90.97|
|2016|75.19|
+----+-----+



What is the average Close for each Calendar Month?

In [23]:
from pyspark.sql.functions import month, avg

newsaf_stock_df.withColumn("Month", month(newsaf_stock_df["Date"])).groupBy('Month').agg(avg("Close").alias("Close")).show()


+-----+-----------------+
|Month|            Close|
+-----+-----------------+
|    1|72.38844992050863|
+-----+-----------------+

