In [49]:
import pyspark as ps
from pyspark.sql.functions import year, month
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

"Libraries imported"

'Libraries imported'

# Loading Spark Pointer

In [4]:
sc = ps.SparkContext.getOrCreate()
type(sc)

pyspark.context.SparkContext

# Parallelizing data : Example

In [5]:
myList = [i for i in range(20) if i%2 == 0]

parallelized_myList = sc.parallelize(myList)
parallelized_myList

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

# Reading .CSV file

In [17]:
file_name = "AMZN.csv"

amzn_rdd = sc.textFile(file_name)
type(amzn_rdd)

pyspark.rdd.RDD

In [18]:
amzn_rdd.take(4)

['Date,Open,High,Low,Close,AdjClose,Volume',
 '2019-07-15,2021.400024,2022.900024,2001.550049,2020.989990,2020.989990,2981300',
 '2019-07-16,2010.579956,2026.319946,2001.219971,2009.900024,2009.900024,2618200',
 '2019-07-17,2007.050049,2012.000000,1992.030029,1992.030029,1992.030029,2558800']

In [19]:
#to make them into columns
amzn_rdd_CSV = amzn_rdd.map(lambda r:r.split(","))
amzn_rdd_CSV

PythonRDD[14] at RDD at PythonRDD.scala:53

In [20]:
#displays in list format
amzn_rdd_CSV.collect()

[['Date', 'Open', 'High', 'Low', 'Close', 'AdjClose', 'Volume'],
 ['2019-07-15',
  '2021.400024',
  '2022.900024',
  '2001.550049',
  '2020.989990',
  '2020.989990',
  '2981300'],
 ['2019-07-16',
  '2010.579956',
  '2026.319946',
  '2001.219971',
  '2009.900024',
  '2009.900024',
  '2618200'],
 ['2019-07-17',
  '2007.050049',
  '2012.000000',
  '1992.030029',
  '1992.030029',
  '1992.030029',
  '2558800'],
 ['2019-07-18',
  '1980.010010',
  '1987.500000',
  '1951.550049',
  '1977.900024',
  '1977.900024',
  '3504300'],
 ['2019-07-19',
  '1991.209961',
  '1996.000000',
  '1962.229980',
  '1964.520020',
  '1964.520020',
  '3185600'],
 ['2019-07-22',
  '1971.140015',
  '1989.000000',
  '1958.260010',
  '1985.630005',
  '1985.630005',
  '2900000'],
 ['2019-07-23',
  '1995.989990',
  '1997.790039',
  '1973.130005',
  '1994.489990',
  '1994.489990',
  '2703500'],
 ['2019-07-24',
  '1969.300049',
  '2001.300049',
  '1965.869995',
  '2000.810059',
  '2000.810059',
  '2631300'],
 ['2019-07-25',

# Another way to load csv into pandas df 

In [26]:
#create an entry point for SQLContext
sqlContext = ps.sql.SQLContext(sc)
type(sqlContext)

pyspark.sql.context.SQLContext

In [28]:
#load csv -syntax
sqlContext.read.load(file_name, 
                              format = 'com.databricks.spark.csv',
                              header = 'true',
                              inferSchema = 'true')

DataFrame[Date: string, Open: double, High: double, Low: double, Close: double, AdjClose: double, Volume: int]

In [45]:
#now let's pipeline this process
files = ["AMZN.csv","GOOG.csv","TSLA.csv"]

def load_file(file_name):
    format_str = 'com.databricks.spark.csv'
    return sqlContext.read.load( file_name, 
                               format = format_str,
                               header = 'true',
                               inferSchema = 'true')

amazon_df = load_file(files[0])
google_df = load_file(files[1])
tesla_df = load_file(files[2])
google_df.printSchema() #woaah, that's um.. okay

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [36]:
#print a few rows
amazon_df.take(2)

[Row(Date='2019-07-15', Open=2021.400024, High=2022.900024, Low=2001.550049, Close=2020.98999, AdjClose=2020.98999, Volume=2981300),
 Row(Date='2019-07-16', Open=2010.579956, High=2026.319946, Low=2001.219971, Close=2009.900024, AdjClose=2009.900024, Volume=2618200)]

In [46]:
#print number of rows
google_df.count()

201

In [38]:
#printing a cleaner version of take
amazon_df.show(2)

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 2 rows



In [55]:
#and to print a more fimiliar format
google_df.toPandas().head(4)

Unnamed: 0,Date,Open,High,Low,Close,AdjClose,Volume
0,2019-09-26,1241.959961,1245.0,1232.267944,1241.390015,1241.390015,1538000
1,2019-09-27,1243.01001,1244.02002,1214.449951,1225.089966,1225.089966,1353900
2,2019-09-30,1220.969971,1226.0,1212.300049,1219.0,1219.0,1404100
3,2019-10-01,1219.0,1231.22998,1203.579956,1205.099976,1205.099976,1273500


In [57]:
#selecting stuff for viewing. NOTE: no grouping is done here, so chill
google_df.select(year("Date").alias("Year"),"High","Close").show()

+----+-----------+-----------+
|Year|       High|      Close|
+----+-----------+-----------+
|2019|     1245.0|1241.390015|
|2019| 1244.02002|1225.089966|
|2019|     1226.0|     1219.0|
|2019| 1231.22998|1205.099976|
|2019| 1196.97998|1176.630005|
|2019|1189.060059|1187.829956|
|2019|1211.439941|     1209.0|
|2019|1218.203979|1207.680054|
|2019|1206.079956|1189.130005|
|2019|1208.349976|1202.310059|
|2019|     1215.0|1208.670044|
|2019|1228.390015|1215.449951|
|2019|1226.329956|1217.140015|
|2019|1247.329956| 1243.01001|
|2019| 1254.73999|1243.640015|
|2019|1263.324951|1253.069946|
|2019|1258.890015| 1245.48999|
|2019|1254.629028|1246.150024|
|2019|1250.599976|1242.800049|
|2019|1259.890015|1259.130005|
+----+-----------+-----------+
only showing top 20 rows



In [67]:
#if you want to group'em by year
google_df.select(year("Date").alias("Year"),"High","Close").groupby("Year").avg("Close").show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2019|1287.7858249402982|
|2020|1360.5085199242424|
+----+------------------+



# Using SparkSQL

In [68]:
#register dataframes into tables
amazon_df.registerTempTable("amazon_stocks")
google_df.registerTempTable("google_stocks")
tesla_df.registerTempTable("tesla_stocks")
"Temporary Tables Registered"

'Temporary Tables Registered'

In [69]:
#using sql with Spark
query="SELECT * FROM tesla_stocks"
sqlContext.sql(query).show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2019-07-15|     248.0|254.419998|244.860001|     253.5|     253.5|11000100|
|2019-07-16|249.300003|253.529999|247.929993|252.380005|252.380005| 8149000|
|2019-07-17|255.669998|258.309998|253.350006|254.860001|254.860001| 9764700|
|2019-07-18|255.050003|    255.75|251.889999|253.539993|253.539993| 4764500|
|2019-07-19|255.690002|259.959991|254.619995|258.179993|258.179993| 7048400|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



# (if-and-when-you-want-to) load ur dataframe into Parquet files

In [None]:
#google_df.write.format("parquet").save(name+".parquet")

In [70]:
query = "SELECT * from amazon_stocks limit 3"
sqlContext.sql(query).show()

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
+----------+-----------+-----------+-----------+-----------+-----------+-------+



# 1 - Find avg lowest price for amazon per month

In [74]:
query = "select year(Date) as Year, month(Date) as Month, \
                avg(Low) from amazon_stocks \
                group by Year, Month \
                order by Year, Month"
sqlContext.sql(query).show()

+----+-----+------------------+
|Year|Month|          avg(Low)|
+----+-----+------------------+
|2019|    7|1948.1946176153847|
|2019|    8|1778.2340920454546|
|2019|    9|     1784.68599235|
|2019|   10|1736.6047840869564|
|2019|   11|1764.3005004499998|
|2019|   12|1773.8466622380954|
|2020|    1| 1870.354288809524|
|2020|    2|2045.0563194210524|
|2020|    3|1825.1590964090908|
|2020|    4|2185.9347679523808|
|2020|    5|     2364.48549805|
|2020|    6| 2579.099553909091|
|2020|    7|2976.1799859999996|
+----+-----+------------------+



# 2 - When did the closing price of Google go either up or down more than $20 in a day?

In [88]:
query = "Select Date, Open, Close, abs(Close - Open) as Difference \
                from google_stocks \
                where abs(Close - Open) > 20 order by Difference"
sqlContext.sql(query).show()

+----------+-----------+-----------+------------------+
|      Date|       Open|      Close|        Difference|
+----------+-----------+-----------+------------------+
|2020-04-27|     1296.0|1275.880005|20.119995000000017|
|2020-05-26| 1437.27002| 1417.02002|             20.25|
|2019-10-02| 1196.97998|1176.630005|20.349975000000086|
|2020-03-10|     1260.0|1280.390015| 20.39001499999995|
|2020-05-14| 1335.02002|1356.130005| 21.10998500000005|
|2020-04-03|1119.015015|1097.880005|21.135009999999966|
|2020-03-06|1277.060059|1298.410034|21.349975000000086|
|2020-03-30|1125.040039|1146.819946|21.779907000000094|
|2020-05-18|    1361.75|1383.939941| 22.18994100000009|
|2020-03-19|1093.050049|1115.290039|22.239990000000034|
|2020-04-02| 1098.26001|1120.839966| 22.57995600000004|
|2019-10-15|1220.400024| 1243.01001|22.609985999999935|
|2020-06-22|     1429.0|1451.859985| 22.85998500000005|
|2020-02-21|1508.030029|1485.109985| 22.92004399999996|
|2020-06-30|1390.439941|1413.609985| 23.17004399

# 3 - Min and Max adjusted price per year for Tesla

In [89]:
query = "Select min(AdjClose) as Min, \
                max(AdjClose) as Max, Year(Date) as Year \
                from tesla_stocks \
                group by Year"
sqlContext.sql(query).show()

+----------+-----------+----+
|       Min|        Max|Year|
+----------+-----------+----+
|211.399994| 430.940002|2019|
|361.220001|1544.650024|2020|
+----------+-----------+----+



# 4 - Which of the companies had the highest Closing Price on December 2019 ?


In [137]:
query = "select year(tesla_stocks.Date) as Year, \
                month(tesla_stocks.Date) as Month, \
                max(tesla_stocks.Close) as TClose, \
                max(g.Close) as GClose,\
                max(a.Close) as AClose \
                from tesla_stocks \
                join google_stocks g on g.Date = tesla_stocks.Date\
                join amazon_stocks a on a.Date = tesla_stocks.Date\
                group by Year, Month having Year = '2019' \
                and Month= 12"
sqlContext.sql(query).show()

+----+-----+----------+-----------+-----------+
|Year|Month|    TClose|     GClose|     AClose|
+----+-----+----------+-----------+-----------+
|2019|   12|430.940002|1361.170044|1869.800049|
+----+-----+----------+-----------+-----------+



I guess from this table we can concur that Amazon had the highest Close Price in December 2019. I haven't yet figured out how to print the max combining these three columns 😖

# 5 - What's the difference between the highest and lowest Adjusted Closing price in the Stock Market in November 2019?

In [144]:
query = "select year(t.Date) as Year, \
                month(t.Date) as Month, \
                round(abs(max(t.AdjClose) - min(t.AdjClose))) as TDiff, \
                round(abs(max(g.AdjClose) - min(g.AdjClose))) as GDiff, \
                round(abs(max(a.AdjClose) - min(a.AdjClose))) as ADiff \
                from tesla_stocks t \
                join google_stocks g on g.Date = t.Date \
                join amazon_stocks a on a.Date = t.Date \
                group by Year, Month having Year = '2019'\
                and Month = 11"
sqlContext.sql(query).show()

+----+-----+-----+-----+-----+
|Year|Month|TDiff|GDiff|ADiff|
+----+-----+-----+-----+-----+
|2019|   11| 46.0| 61.0| 84.0|
+----+-----+-----+-----+-----+



Again, from the table we can see that Amazon has the highest difference in Adjusted CLose price in November 2019.

# 6 - For the months January 2020 to April 2020, what's the average Close Price for all the three companies?

In [149]:
query = "select year(t.Date) as Year, \
                month(t.Date) as Month, \
                round(avg(t.Close)) as Tesla_Avg, \
                round(avg(g.Close)) as Google_Avg, \
                round(avg(a.Close)) as Amazon_Avg \
                from tesla_stocks t \
                join google_stocks g on g.Date = t.Date \
                join amazon_stocks a on a.Date = t.Date \
                group by Year, Month having Year ='2020' and \
                Month in (1,2,3,4) \
                order by Month"
sqlContext.sql(query).show()

+----+-----+---------+----------+----------+
|Year|Month|Tesla_Avg|Google_Avg|Amazon_Avg|
+----+-----+---------+----------+----------+
|2020|    1|    529.0|    1437.0|    1884.0|
|2020|    2|    797.0|    1464.0|    2066.0|
|2020|    3|    559.0|    1188.0|    1872.0|
|2020|    4|    664.0|    1234.0|    2229.0|
+----+-----+---------+----------+----------+



# 7 - What's the average Open Price in the first quarter of 2020 for Tesla?

In [163]:
query = "  select Year, round(avg(Avg_Open)) as First_Quarter \
                from (select year(Date) as Year,\
                month(Date) as Month, \
                avg(Open) as Avg_Open \
                from tesla_stocks \
                group by Year, Month having Year ='2020' and \
                Month in (1,2,3,4) ) group by Year"
sqlContext.sql(query).show()

+----+-------------+
|Year|First_Quarter|
+----+-------------+
|2020|        637.0|
+----+-------------+



# 8 - What percentage of Google's average Close price did Tesla record in April 2020?

In [187]:
query = "select year(t.Date) as Year, \
                month(t.Date) as Month, \
                round(avg(t.Close)) as Tesla_avg, \
                round(avg(g.Close)) as Google_avg, \
                round((avg(t.Close) / avg(g.Close))*100.0) as Diff_percentage\
                from tesla_stocks t \
                join google_stocks g on g.Date = t.Date \
                group by Year, Month having Year = '2020' and \
                Month = 4"
sqlContext.sql(query).show()

+----+-----+---------+----------+---------------+
|Year|Month|Tesla_avg|Google_avg|Diff_percentage|
+----+-----+---------+----------+---------------+
|2020|    4|    664.0|    1234.0|           54.0|
+----+-----+---------+----------+---------------+

