In [1]:
#initiate spark
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
c = pyspark.SparkConf().setAppName("test_app").setMaster("local")
sc = pyspark.SparkContext(conf = c)
spark = SparkSession(sc)

In [2]:
#help(sc)

# Load Data 

## RDD

In [3]:
# Define a variable for datasets
Tesla_data = 'TSLA.csv'
Amazon_data = 'AMZN.csv'
Google_data = 'GOOG.csv'

In [4]:
def CreateRDD(file: str):
    
    '''Function to load file into RDD'''
    
    file_rdd = sc.textFile(file)
    return file_rdd
    

In [5]:
Tesla_rdd = CreateRDD('TSLA.csv')
Amazon_rdd = CreateRDD('AMZN.csv')
Google_rdd = CreateRDD('GooG.csv')

## DataFrame

In [6]:
def CreateDF(file: str):
    
    '''Function to load file into RDD'''
    
    file_df = spark.read.csv(file,header=True,inferSchema=True)
    return file_df

In [7]:
Tesla_df = CreateDF('TSLA.csv')
Amazon_df = CreateDF('AMZN.csv')
Google_df = CreateDF('GooG.csv')

#### Check Schema of Each File

In [8]:
Tesla_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [9]:
Amazon_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [10]:
Google_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



#### Checking Number of Rows

In [11]:
Tesla_df.count(),Google_df.count(),Amazon_df.count()

(253, 253, 253)

#### You can also make you dataframe more Presentatable like Pandas Dataframe

In [12]:
# Without Pandas DataFrame
Tesla_df.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2019-07-15|     248.0|254.419998|244.860001|     253.5|     253.5|11000100|
|2019-07-16|249.300003|253.529999|247.929993|252.380005|252.380005| 8149000|
|2019-07-17|255.669998|258.309998|253.350006|254.860001|254.860001| 9764700|
|2019-07-18|255.050003|    255.75|251.889999|253.539993|253.539993| 4764500|
|2019-07-19|255.690002|259.959991|254.619995|258.179993|258.179993| 7048400|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



In [13]:
# With Pandas dataFrame
import pandas as pd
Tesla_df.toPandas().head(5)

Unnamed: 0,Date,Open,High,Low,Close,AdjClose,Volume
0,2019-07-15,248.0,254.419998,244.860001,253.5,253.5,11000100
1,2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000
2,2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700
3,2019-07-18,255.050003,255.75,251.889999,253.539993,253.539993,4764500
4,2019-07-19,255.690002,259.959991,254.619995,258.179993,258.179993,7048400


## Explore and Query Data Using DataFrameAPI 

In [14]:
# 1 . Find the average closing  per year and per month for all stock dataset
#import pyspark.sql.function
from pyspark.sql.functions import *

In [23]:
avg_year_tesla = Tesla_df.select(year("Date").alias("Year_"),"Close").groupby("Year_").avg("Close").sort("Year_")
avg_month_tesla = Tesla_df.select(month("Date").alias("Month_"),"Close").groupby("Month_").avg("Close").sort("Month_")
avg_year_Google = Google_df.select(year("Date").alias("Year_"),"Close").groupby("Year_").avg("Close").sort("Year_")
avg_month_Google = Google_df.select(month("Date").alias("Month_"),"Close").groupby("Month_").avg("Close").sort("Month_")
avg_year_Amazon = Amazon_df.select(year("Date").alias("Year_"),"Close").groupby("Year_").avg("Close").sort("Year_")
avg_month_Amazon = Amazon_df.select(month("Date").alias("Month_"),"Close").groupby("Month_").avg("Close").sort("Month_")


In [25]:
avg_year_tesla.show()

+-----+------------------+
|Year_|        avg(Close)|
+-----+------------------+
| 2019|283.62126070588243|
| 2020| 761.8206739179104|
+-----+------------------+



In [27]:
avg_month_tesla.show(5)

+------+-----------------+
|Month_|       avg(Close)|
+------+-----------------+
|     1|528.6590503809524|
|     2|797.4468415263159|
|     3|559.1013613181818|
|     4|663.5985761428572|
|     5|     799.42549745|
+------+-----------------+
only showing top 5 rows



In [28]:
avg_year_Google.show()

+-----+------------------+
|Year_|        avg(Close)|
+-----+------------------+
| 2019|1245.3833654621849|
| 2020|1362.8286906865671|
+-----+------------------+



In [29]:
avg_month_Google.show()

+------+------------------+
|Month_|        avg(Close)|
+------+------------------+
|     1|1436.6537968571424|
|     2|1464.1105184736841|
|     3|1188.3940984545457|
|     4|1234.1404797142854|
|     5|1381.1137511999998|
|     6|1431.0477184545452|
|     7|1303.4918102272727|
|     8|1180.6868120454546|
|     9|     1220.83952035|
|    10|1232.7117442608696|
|    11|     1304.27899165|
|    12|1340.8676351904762|
+------+------------------+



In [30]:
avg_year_Amazon.show()

+-----+------------------+
|Year_|        avg(Close)|
+-----+------------------+
| 2019|1800.6161329411755|
| 2020| 2236.414478798507|
+-----+------------------+



In [31]:
avg_month_Amazon.show(5)

+------+------------------+
|Month_|        avg(Close)|
+------+------------------+
|     1|1884.2376128571425|
|     2|2066.1752672631574|
|     3|1872.3104358636365|
|     4|2228.7052408571426|
|     5|2394.1840209499996|
+------+------------------+
only showing top 5 rows



In [None]:
# 2 . Find the average Low  per year and per month for all stock dataset

In [33]:
avg_year_tesla_low = Tesla_df.select(year("Date").alias("Year_"),"Low").groupby("Year_").avg("Low").sort("Year_")
avg_month_tesla_low = Tesla_df.select(month("Date").alias("Month_"),"Low").groupby("Month_").avg("Low").sort("Month_")
avg_year_Google_low = Google_df.select(year("Date").alias("Year_"),"Low").groupby("Year_").avg("Low").sort("Year_")
avg_month_Google_low = Google_df.select(month("Date").alias("Month_"),"Low").groupby("Month_").avg("Low").sort("Month_")
avg_year_Amazon_low = Amazon_df.select(year("Date").alias("Year_"),"Low").groupby("Year_").avg("Low").sort("Year_")
avg_month_Amazon_low = Amazon_df.select(month("Date").alias("Month_"),"Low").groupby("Month_").avg("Low").sort("Month_")


In [36]:
avg_year_tesla_low.show()

+-----+-----------------+
|Year_|         avg(Low)|
+-----+-----------------+
| 2019|278.6834451428571|
| 2020|736.3253731791043|
+-----+-----------------+



In [37]:
avg_month_tesla_low.show(5)

+------+-----------------+
|Month_|         avg(Low)|
+------+-----------------+
|     1| 514.690000952381|
|     2|766.4410560526314|
|     3|534.8704542272727|
|     4|641.0638093333333|
|     5|      780.3454987|
+------+-----------------+
only showing top 5 rows



## Using Spark SQL to perform Statistical Analysis on the data 

In [38]:
Tesla_df.createOrReplaceTempView("Tesla_Tv")

In [43]:
Amazon_df.createOrReplaceTempView("Amazon_Tv")

In [44]:
Google_df.createOrReplaceTempView("Google_Tv")

In [None]:
# 1. Lets calculate previous day Volume 

In [60]:
Previous_day_Tesla = spark.sql("""
    SELECT *, LAG(Volume, 1, 0) OVER (PARTITION BY MONTH(Date) ORDER BY Date) as Previous_Day_Volume
    FROM Tesla_Tv
""")
# display Result
Previous_day_Tesla.show(5)

+----------+----------+----------+----------+----------+----------+--------+-------------------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|Previous_Day_Volume|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
|2020-01-02|     424.5|430.700012|421.709991| 430.26001| 430.26001| 9532100|                  0|
|2020-01-03|     440.5|     454.0|436.920013| 443.01001| 443.01001|17778500|            9532100|
|2020-01-06|440.470001|451.559998|     440.0|451.540009|451.540009|10133000|           17778500|
|2020-01-07|461.399994|471.630005|453.359985|469.059998|469.059998|17882100|           10133000|
|2020-01-08|473.700012| 498.48999|468.230011|492.140015|492.140015|31144300|           17882100|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
only showing top 5 rows



In [61]:
Previous_day_Amazon = spark.sql("""
    SELECT *, LAG(Volume, 1, 0) OVER (PARTITION BY MONTH(Date) ORDER BY Date) as Previous_Day_Volume
    FROM Amazon_Tv
""")
# display Result
Previous_day_Tesla.show(5)

+----------+----------+----------+----------+----------+----------+--------+-------------------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|Previous_Day_Volume|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
|2020-01-02|     424.5|430.700012|421.709991| 430.26001| 430.26001| 9532100|                  0|
|2020-01-03|     440.5|     454.0|436.920013| 443.01001| 443.01001|17778500|            9532100|
|2020-01-06|440.470001|451.559998|     440.0|451.540009|451.540009|10133000|           17778500|
|2020-01-07|461.399994|471.630005|453.359985|469.059998|469.059998|17882100|           10133000|
|2020-01-08|473.700012| 498.48999|468.230011|492.140015|492.140015|31144300|           17882100|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
only showing top 5 rows



In [62]:
Previous_day_Google = spark.sql("""
    SELECT *, LAG(Volume, 1, 0) OVER (PARTITION BY MONTH(Date) ORDER BY Date) as Previous_Day_Volume
    FROM Google_Tv
""")
# display Result
Previous_day_Tesla.show(5)

+----------+----------+----------+----------+----------+----------+--------+-------------------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|Previous_Day_Volume|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
|2020-01-02|     424.5|430.700012|421.709991| 430.26001| 430.26001| 9532100|                  0|
|2020-01-03|     440.5|     454.0|436.920013| 443.01001| 443.01001|17778500|            9532100|
|2020-01-06|440.470001|451.559998|     440.0|451.540009|451.540009|10133000|           17778500|
|2020-01-07|461.399994|471.630005|453.359985|469.059998|469.059998|17882100|           10133000|
|2020-01-08|473.700012| 498.48999|468.230011|492.140015|492.140015|31144300|           17882100|
+----------+----------+----------+----------+----------+----------+--------+-------------------+
only showing top 5 rows



#### Perform a Join Operation Using Spark SQL

In [67]:
# Compare Different Daily Close for each Stock Company 

Final_Table = spark.sql("""
    SELECT T.Date AS Date, T.Close AS Close_Tesla , A.Close AS Close_Amazon, G.Close AS Close_Google
    FROM Tesla_Tv AS T 
    JOIN Amazon_Tv AS A ON T.Date = A.Date
    JOIN Google_Tv AS G ON A.Date = G.Date
""")

In [68]:
Final_Table.show(5)

+----------+-----------+------------+------------+
|      Date|Close_Tesla|Close_Amazon|Close_Google|
+----------+-----------+------------+------------+
|2019-07-15|      253.5|  2020.98999| 1150.339966|
|2019-07-16| 252.380005| 2009.900024| 1153.579956|
|2019-07-17| 254.860001| 1992.030029| 1146.349976|
|2019-07-18| 253.539993| 1977.900024| 1146.329956|
|2019-07-19| 258.179993|  1964.52002| 1130.099976|
+----------+-----------+------------+------------+
only showing top 5 rows



In [70]:
type(Final_Table)

pyspark.sql.dataframe.DataFrame

## Save Final Table In a Parquet Format