## Explore Stock Prices using Spark SQL

In [1]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import pandas 

In [3]:
#initializing SparkContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

### Load data into Spark

In [4]:
tesla_file = 'TSLA.csv'
tesla_rdd = sc.textFile(tesla_file)

amazon_file = 'AMZN.csv'
google_file = 'GOOG.csv'

### RDD

In [5]:
tesla_rdd.take(5)

['Date,Open,High,Low,Close,Adj Close,Volume',
 '2020-02-10,160.000000,163.998001,150.479996,154.255997,154.255997,123446000',
 '2020-02-11,153.757996,156.701996,151.600006,154.876007,154.876007,58487500',
 '2020-02-12,155.574005,157.949997,152.673996,153.457993,153.457993,60112500',
 '2020-02-13,148.367996,163.600006,147.000000,160.800003,160.800003,131446500']

In [6]:
csv_rdd = tesla_rdd.map(lambda row: row.split(","))

In [7]:
csv_rdd.take(2)

[['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume'],
 ['2020-02-10',
  '160.000000',
  '163.998001',
  '150.479996',
  '154.255997',
  '154.255997',
  '123446000']]

### DataFrame

In [8]:
header = csv_rdd.first()
tesla_df = csv_rdd.filter(lambda row : row != header).toDF(header)

In [9]:
tesla_df.show(5)

+----------+----------+----------+----------+----------+----------+---------+
|      Date|      Open|      High|       Low|     Close| Adj Close|   Volume|
+----------+----------+----------+----------+----------+----------+---------+
|2020-02-10|160.000000|163.998001|150.479996|154.255997|154.255997|123446000|
|2020-02-11|153.757996|156.701996|151.600006|154.876007|154.876007| 58487500|
|2020-02-12|155.574005|157.949997|152.673996|153.457993|153.457993| 60112500|
|2020-02-13|148.367996|163.600006|147.000000|160.800003|160.800003|131446500|
|2020-02-14|157.444000|162.593994|157.100006|160.005997|160.005997| 78468500|
+----------+----------+----------+----------+----------+----------+---------+
only showing top 5 rows



In [10]:
amazon_df = sqlContext.read.load(amazon_file,
                                format='com.databricks.spark.csv',
                                header='true',
                                inferSchema='true')

In [11]:
amazon_df.show(5)

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|  Adj Close| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2020-02-10| 2085.01001|2135.600098|2084.959961|2133.909912|2133.909912|5056200|
|2020-02-11|2150.899902|2185.949951|     2136.0|2150.800049|2150.800049|5746000|
|2020-02-12|2163.199951|    2180.25|2155.290039|     2160.0|     2160.0|3334300|
|2020-02-13| 2144.98999|2170.280029|     2142.0|2149.870117|2149.870117|3031800|
|2020-02-14|2155.679932|2159.040039|2125.889893|2134.870117|2134.870117|2606200|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 5 rows



In [12]:
amazon_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [13]:
amazon_df.count()

253

In [14]:
amazon_df.toPandas().head(5)

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2020-02-10,2085.01001,2135.600098,2084.959961,2133.909912,2133.909912,5056200
1,2020-02-11,2150.899902,2185.949951,2136.0,2150.800049,2150.800049,5746000
2,2020-02-12,2163.199951,2180.25,2155.290039,2160.0,2160.0,3334300
3,2020-02-13,2144.98999,2170.280029,2142.0,2149.870117,2149.870117,3031800
4,2020-02-14,2155.679932,2159.040039,2125.889893,2134.870117,2134.870117,2606200


### Explore and Query Data

#### DataFrames operations

In [15]:
google_df = sqlContext.read.load(google_file,
                                format='com.databricks.spark.csv',
                                header='true',
                                inferSchema='true')

In [16]:
google_df.select(F.year('Date').alias('yr'), 'Close').groupby('yr').avg('Close').sort('yr').show()

+----+------------------+
|  yr|        avg(Close)|
+----+------------------+
|2020|1485.8534564933911|
|2021|1869.0388558461536|
+----+------------------+



In [17]:
amazon_df.select(F.year('Date').alias('year'),
                F.month('Date').alias('month'),
                'Low',)\
         .groupby('year', 'month').avg('Low').sort('year', 'month')\
         .show()

+----+-----+------------------+
|year|month|          avg(Low)|
+----+-----+------------------+
|2020|    2| 2053.253574928571|
|2020|    3|1825.1590964090908|
|2020|    4|2185.9347679523808|
|2020|    5|     2364.48549805|
|2020|    6| 2579.099553909091|
|2020|    7|2991.4899903636356|
|2020|    8|3207.5033250476195|
|2020|    9|3109.7995023809526|
|2020|   10| 3185.981367681817|
|2020|   11|3102.2170043999995|
|2020|   12|3166.3018133636365|
|2021|    1|3171.6500051052626|
|2021|    2|3298.1542968571425|
+----+-----+------------------+



### SparkSQL

In [18]:
amazon_df.registerTempTable('amazon_stocks')
tesla_df.registerTempTable('tesla_stocks')
google_df.registerTempTable('google_stocks')

In [19]:
query_amazon = 'select year(Date) as year, month(Date) as month, round(avg(Close), 2) as avg_close from amazon_stocks group by year, month'

In [20]:
sqlContext.sql(query_amazon).show(5)

+----+-----+---------+
|year|month|avg_close|
+----+-----+---------+
|2020|    6|  2613.55|
|2020|   11|  3141.81|
|2020|    3|  1872.31|
|2020|    9|  3160.75|
|2020|   12|  3197.75|
+----+-----+---------+
only showing top 5 rows



In [21]:
query_google = 'select Date, Open, Close, round(abs(Open - Close),2) as diff from google_stocks where abs(Open - Close) > 4'

In [22]:
sqlContext.sql(query_google).show(5)

+----------+-----------+-----------+-----+
|      Date|       Open|      Close| diff|
+----------+-----------+-----------+-----+
|2020-02-10|1474.319946|1508.680054|34.36|
|2020-02-14|1515.599976| 1520.73999| 5.14|
|2020-02-18|     1515.0|1519.670044| 4.67|
|2020-02-21|1508.030029|1485.109985|22.92|
|2020-02-24|1426.109985|1421.589966| 4.52|
+----------+-----------+-----------+-----+
only showing top 5 rows



In [23]:
query_tesla = 'SELECT year(Date) year,\
                round(max(`Adj Close`), 2) max_adj_close,\
                round(min(`Adj Close`), 2) min_adj_close \
                FROM tesla_stocks\
                GROUP BY year(Date)'

In [24]:
sqlContext.sql(query_tesla).show()

+----+-------------+-------------+
|year|max_adj_close|min_adj_close|
+----+-------------+-------------+
|2020|        96.31|       100.43|
|2021|       883.09|       729.77|
+----+-------------+-------------+



In [25]:
query_join = 'SELECT t.Date, round(t.Close, 2) tesla_close , round(a.Close, 2) amazon_close, round(g.Close, 2) google_close\
              FROM tesla_stocks t \
              JOIN google_stocks g on t.Date = g.Date \
              JOIN amazon_stocks a on a.Date = t.Date'

In [26]:
join_close_df = sqlContext.sql(query_join)

In [27]:
join_close_df.show(5)

+----------+-----------+------------+------------+
|      Date|tesla_close|amazon_close|google_close|
+----------+-----------+------------+------------+
|2020-02-10|     154.26|     2133.91|     1508.68|
|2020-02-11|     154.88|      2150.8|     1508.79|
|2020-02-12|     153.46|      2160.0|     1518.27|
|2020-02-13|      160.8|     2149.87|     1514.66|
|2020-02-14|     160.01|     2134.87|     1520.74|
+----------+-----------+------------+------------+
only showing top 5 rows



### Save Spark DataFrames
#### Parquet - column oriented storage

In [28]:
# save parquet (overwrite if exists)
join_close_df.write.format('parquet').mode('overwrite').save('join_stock.parquet')

In [29]:
# load saved parquet back
final_df = sqlContext.read.parquet('join_stock.parquet')
final_df.show(5)

+----------+-----------+------------+------------+
|      Date|tesla_close|amazon_close|google_close|
+----------+-----------+------------+------------+
|2020-02-10|     154.26|     2133.91|     1508.68|
|2020-02-11|     154.88|      2150.8|     1508.79|
|2020-02-12|     153.46|      2160.0|     1518.27|
|2020-02-13|      160.8|     2149.87|     1514.66|
|2020-02-14|     160.01|     2134.87|     1520.74|
+----------+-----------+------------+------------+
only showing top 5 rows



In [30]:
# check schema
final_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- tesla_close: double (nullable = true)
 |-- amazon_close: double (nullable = true)
 |-- google_close: double (nullable = true)

