# PySpark Warm-Up

## Authors
* **Alireza Arbabi**
* **Hadi Babalou**
* **Ali Padyav**

## Table of Contents

### Firing Up Essential Libraries

In [155]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev, col
from pyspark.sql.types import FloatType

In [156]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))

Building up the spark session:

In [157]:
spark = SparkSession.builder.getOrCreate()

Step 1: Read CSV

In [158]:
data_frame = spark.read.csv("stocks.csv")

Step 2: Check the schema of data

In [159]:
data_frame.show(1)
data_frame.printSchema()

+----+----+----+---+-----+------+---------+
| _c0| _c1| _c2|_c3|  _c4|   _c5|      _c6|
+----+----+----+---+-----+------+---------+
|Date|Open|High|Low|Close|Volume|Adj Close|
+----+----+----+---+-----+------+---------+
only showing top 1 row

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



Let's rename the columns of the dataframe

In [160]:
data_frame = data_frame \
    .withColumnRenamed("_c0", "Date") \
    .withColumnRenamed("_c1", "Open") \
    .withColumnRenamed("_c2", "High") \
    .withColumnRenamed("_c3", "Low") \
    .withColumnRenamed("_c4", "Close") \
    .withColumnRenamed("_c5", "Volume") \
    .withColumnRenamed("_c6", "Adj Close") 

To make our data more clean, let's remove the first row which includes duplicate name of columns

In [161]:
data_frame = data_frame.filter(data_frame['Open'] != 'Open')

Step 3: For those records with closing price less than 500, select opening, closing and volume
 and show them.

In [162]:
filtered_data = data_frame.filter(data_frame['Close'] < 500).select('Open', 'Close', 'Volume')
filtered_data.show(10)

+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        214.379993|150476200|
|        214.379993|        210.969995|138040000|
|            211.75|            210.58|119282800|
|        210.299994|211.98000499999998|111902700|
|212.79999700000002|210.11000299999998|115557400|
|209.18999499999998|        207.720001|148614900|
|        207.870005|        210.650002|151473000|
|210.11000299999998|            209.43|108223500|
|210.92999500000002|            205.93|148516900|
+------------------+------------------+---------+
only showing top 10 rows



Step 4: Find out records with opening price more than 200 and closing price less than 200.

In [163]:
filtered_data = data_frame.filter((data_frame['Close'] < 200) & (data_frame['Open'] < 200)).select('Open', 'Close', 'Volume')
filtered_data.show(10)

+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|192.36999699999998|        194.729998|187469100|
|        195.909998|        195.859997|174585600|
|        195.169994|        199.229994|153832000|
|        196.730003|        192.050003|189413000|
|192.63000300000002|        195.460001|212576700|
|        195.690006|194.11999699999998|119567700|
|        196.419996|196.19000400000002|158221700|
|        195.889997|195.12000700000002| 92590400|
|        194.880001|        198.669994|137586400|
|        199.999998|        197.059998|143773700|
+------------------+------------------+---------+
only showing top 10 rows



Step 5: Extract the year from the date and save it in a new column.

In [164]:
data_frame.show(3)

+----------+----------+----------+------------------+----------+---------+------------------+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|
+----------+----------+----------+------------------+----------+---------+------------------+
only showing top 3 rows



In [165]:
from pyspark.sql.functions import col, split

data_frame = data_frame.withColumn("Year", split(col("date"), "-")[0])

data_frame.show(3)

+----------+----------+----------+------------------+----------+---------+------------------+----+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+----------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|2010|
+----------+----------+----------+------------------+----------+---------+------------------+----+
only showing top 3 rows



Step 6: Now, for each year, show the minimum volumes traded, shown in a column named ‘minVolume’.

In [166]:
data_frame.createOrReplaceTempView("Stock")
result = spark.sql("SELECT Year, Min(Volume) as MinVolume FROM Stock GROUP BY Year")
data_frame = data_frame.join(result, "Year", "inner")
data_frame.show(20)

+----+----------+------------------+------------------+------------------+------------------+---------+------------------+---------+
|Year|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|MinVolume|
+----+----------+------------------+------------------+------------------+------------------+---------+------------------+---------+
|2010|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|100901500|
|2010|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|100901500|
|2010|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|100901500|
|2010|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|100901500|
|2010|2010-01-08|        210.299994|        212.000006|209.0600050000

Step 7: 

In [167]:
data_frame = data_frame.withColumn("YearMonth", col("date").substr(1, 7))
data_frame.show(3)

+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
|Year|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|MinVolume|YearMonth|
+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
|2010|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|100901500|  2010-01|
|2010|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|100901500|  2010-01|
|2010|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|100901500|  2010-01|
+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
only showing top 3 rows



In [168]:
data_frame.createOrReplaceTempView("Stock")
result = spark.sql("SELECT YearMonth, Max(Volume) as MaxVolume FROM Stock GROUP BY YearMonth")
data_frame = data_frame.join(result, "YearMonth", "inner")
data_frame.show(3)

+---------+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
|YearMonth|Year|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|MinVolume|MaxVolume|
+---------+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
|  2010-01|2010|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|100901500|466777500|
|  2010-01|2010|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|100901500|466777500|
|  2010-01|2010|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|100901500|466777500|
+---------+----+----------+----------+----------+------------------+----------+---------+------------------+---------+---------+
only showing top 3 rows



Step 8: For the last step, calculate mean and standard deviation of high price over the whole data frame and show them in two decimal places.

In [182]:
result = spark.sql("SELECT Avg(High) as highAvg, STD(High) as highStd FROM Stock")
highAvg = result.collect()[0][0]
highSTD = result.collect()[0][1]

# Let's round the results in two decimal places

print("The average of high price is: ", round(highAvg, 2))
print("The std of high price is: ", round(highSTD, 2))

The average of high price is:  315.91
The std of high price is:  186.9
