# Explore Stock Prices using Spark SQL

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=e94e1ccfd46c4f8587d5e85ad2e8593a3de8de6f9b5a11131914f8bff228c9b6
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## The Spark application and Spark Context

<img src="https://spark.apache.org/docs/latest/img/cluster-overview.png" width=500>

In [3]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [4]:
type(sc)

pyspark.context.SparkContext

In [5]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'bina

## Load data into Spark

### RDD

Resilient Distributed Dataset

- Immutable collection of elements
- Partitioned across cluster

<img src="https://miro.medium.com/max/1152/1*l2MUHFvWfcdiUbh7Y-fM5Q.png" width=500>

Interaction through:

- Transformations: functions executed on demand. Produces new RDD.
- Actions: Results of RDD computations and transformations.

<img src="https://miro.medium.com/max/1090/1*2uwvLC1HsWpOsmRw4ZOp2w.png" width=500>

In [6]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [15]:
tesla_file = '/content/drive/MyDrive/Colab Notebooks/TSLA.csv'
tesla_rdd = sc.textFile(tesla_file)
tesla_rdd.take(5)

['Date,Open,High,Low,Close,AdjClose,Volume',
 '2019-07-15,248.000000,254.419998,244.860001,253.500000,253.500000,11000100',
 '2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000',
 '2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700',
 '2019-07-18,255.050003,255.750000,251.889999,253.539993,253.539993,4764500']

In [16]:
csv_rdd = tesla_rdd.map(lambda row: row.split(","))

In [17]:
results = csv_rdd.collect()

In [18]:
results[:4]

[['Date', 'Open', 'High', 'Low', 'Close', 'AdjClose', 'Volume'],
 ['2019-07-15',
  '248.000000',
  '254.419998',
  '244.860001',
  '253.500000',
  '253.500000',
  '11000100'],
 ['2019-07-16',
  '249.300003',
  '253.529999',
  '247.929993',
  '252.380005',
  '252.380005',
  '8149000'],
 ['2019-07-17',
  '255.669998',
  '258.309998',
  '253.350006',
  '254.860001',
  '254.860001',
  '9764700']]

## Spark DataFrames

- Immutable distributed collection of data
- Data organized into named columns

Operations:

- Filter
- Group by
- Compute aggregations
- SQL quries

In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



In [21]:
li = [('Alice', 1)]
df = sqlContext.createDataFrame(li, ['name', 'age'])
df.collect()

[Row(name='Alice', age=1)]

In [22]:
tesla_df = csv_rdd.toDF(['date', 'open', 'high', 'low', 'close', 'adjclose', 'volume'])
tesla_df.take(5)

[Row(date='Date', open='Open', high='High', low='Low', close='Close', adjclose='AdjClose', volume='Volume'),
 Row(date='2019-07-15', open='248.000000', high='254.419998', low='244.860001', close='253.500000', adjclose='253.500000', volume='11000100'),
 Row(date='2019-07-16', open='249.300003', high='253.529999', low='247.929993', close='252.380005', adjclose='252.380005', volume='8149000'),
 Row(date='2019-07-17', open='255.669998', high='258.309998', low='253.350006', close='254.860001', adjclose='254.860001', volume='9764700'),
 Row(date='2019-07-18', open='255.050003', high='255.750000', low='251.889999', close='253.539993', adjclose='253.539993', volume='4764500')]

In [23]:
amazon_file = '/content/drive/MyDrive/Colab Notebooks/AMZN.csv'
amazon_df = sqlContext.read.load(
    amazon_file,
    format='csv',
    header='true',
    inferSchema='true')

In [24]:
amazon_df.take(2)

[Row(Date='2019-07-15', Open=2021.400024, High=2022.900024, Low=2001.550049, Close=2020.98999, AdjClose=2020.98999, Volume=2981300),
 Row(Date='2019-07-16', Open=2010.579956, High=2026.319946, Low=2001.219971, Close=2009.900024, AdjClose=2009.900024, Volume=2618200)]

In [25]:
amazon_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [26]:
amazon_df.count()

253

In [27]:
amazon_df.show()

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
|2019-07-22|1971.140015|     1989.0| 1958.26001|1985.630005|1985.630005|2900000|
|2019-07-23| 1995.98999|1997.790039|1973.130005| 1994.48999| 1994.48999|2703500|
|2019-07-24|1969.300049|2001.300049|1965.869995|2000.810059|2000.810059|2631300|
|2019-07-25|     2001.0|2001.199951|1972.719971|1973.819946|1973.819946|4136500|
|2019-07-26|     1942.0|1950

In [29]:
import pandas
amazon_df.toPandas().head(5)

Unnamed: 0,Date,Open,High,Low,Close,AdjClose,Volume
0,2019-07-15,2021.400024,2022.900024,2001.550049,2020.98999,2020.98999,2981300
1,2019-07-16,2010.579956,2026.319946,2001.219971,2009.900024,2009.900024,2618200
2,2019-07-17,2007.050049,2012.0,1992.030029,1992.030029,1992.030029,2558800
3,2019-07-18,1980.01001,1987.5,1951.550049,1977.900024,1977.900024,3504300
4,2019-07-19,1991.209961,1996.0,1962.22998,1964.52002,1964.52002,3185600


## DataFrames Operations

In [30]:
google_file = '/content/drive/MyDrive/Colab Notebooks/GOOG.csv'
google_df = sqlContext.read.load(
    google_file,
    format='csv',
    header='true',
    inferSchema='true')

In [31]:
google_df.select("Date", "Close").show()

+----------+-----------+
|      Date|      Close|
+----------+-----------+
|2019-07-15|1150.339966|
|2019-07-16|1153.579956|
|2019-07-17|1146.349976|
|2019-07-18|1146.329956|
|2019-07-19|1130.099976|
|2019-07-22|1138.069946|
|2019-07-23|1146.209961|
|2019-07-24|1137.810059|
|2019-07-25|1132.119995|
|2019-07-26|1250.410034|
|2019-07-29|1239.410034|
|2019-07-30|1225.140015|
|2019-07-31|1216.680054|
|2019-08-01| 1209.01001|
|2019-08-02| 1193.98999|
|2019-08-05|1152.319946|
|2019-08-06|1169.949951|
|2019-08-07| 1173.98999|
|2019-08-08|1204.800049|
|2019-08-09| 1188.01001|
+----------+-----------+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import year, month, dayofmonth

In [35]:
google_df.select(year("Date").alias("yr"), "Close").show(5)

+----+-----------+
|  yr|      Close|
+----+-----------+
|2019|1150.339966|
|2019|1153.579956|
|2019|1146.349976|
|2019|1146.329956|
|2019|1130.099976|
+----+-----------+
only showing top 5 rows



In [36]:
google_df.select(year("Date").alias("year"), "AdjClose").groupby("year").avg("AdjClose").sort("year").show()

+----+------------------+
|year|     avg(AdjClose)|
+----+------------------+
|2019|1245.3833654621849|
|2020|1362.8286906865671|
+----+------------------+



In [39]:
amazon_df.select(year("Date").alias("year"), month("Date").alias("month"), "Low")\
    .groupby("year", "month")\
    .avg("Low")\
    .sort("year", "month")\
    .show()

+----+-----+------------------+
|year|month|          avg(Low)|
+----+-----+------------------+
|2019|    7|1948.1946176153847|
|2019|    8|1778.2340920454546|
|2019|    9|     1784.68599235|
|2019|   10|1736.6047840869564|
|2019|   11|1764.3005004499998|
|2019|   12|1773.8466622380954|
|2020|    1| 1870.354288809524|
|2020|    2|2045.0563194210524|
|2020|    3|1825.1590964090908|
|2020|    4|2185.9347679523808|
|2020|    5|     2364.48549805|
|2020|    6| 2579.099553909091|
|2020|    7|2976.1799859999996|
+----+-----+------------------+



### Spark Optimization and Physical Plan

<img src="https://databricks.com/wp-content/uploads/2015/04/Screen-Shot-2015-04-12-at-8.41.26-AM.png" width=600>

In [40]:
amazon_df.select(year("Date").alias("year"), month("Date").alias("month"), "Low")\
    .groupby("year", "month")\
    .avg("Low")\
    .sort("year", "month")\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [year#317 ASC NULLS FIRST, month#318 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(year#317 ASC NULLS FIRST, month#318 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#325]
      +- HashAggregate(keys=[year#317, month#318], functions=[avg(Low#37)])
         +- Exchange hashpartitioning(year#317, month#318, 200), ENSURE_REQUIREMENTS, [id=#322]
            +- HashAggregate(keys=[year#317, month#318], functions=[partial_avg(Low#37)])
               +- Project [year(cast(Date#34 as date)) AS year#317, month(cast(Date#34 as date)) AS month#318, Low#37]
                  +- FileScan csv [Date#34,Low#37] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/AMZN.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:string,Low:double>




## Spark SQL

In [43]:
amazon_df.createOrReplaceTempView("amazon_stocks")
google_df.createOrReplaceTempView("google_stocks")
tesla_df.createOrReplaceTempView("tesla_stocks")

In [44]:
sqlContext.sql("SELECT * FROM amazon_stocks").show(5)

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 5 rows



In [45]:
sqlContext.sql("""
SELECT
    YEAR(amazon_stocks.Date) as yr,
    MONTH(amazon_stocks.Date) as mo,
    AVG(amazon_stocks.AdjClose)
FROM amazon_stocks
GROUP BY
    YEAR(amazon_stocks.Date),
    MONTH(amazon_stocks.Date)
""").show()

+----+---+------------------+
|  yr| mo|     avg(AdjClose)|
+----+---+------------------+
|2019| 10|1752.3317498695653|
|2020|  6|      2613.5454545|
|2020|  3|1872.3104358636365|
|2019|  8|1793.6027220909093|
|2020|  4|2228.7052408571426|
|2020|  1|1884.2376128571425|
|2019|  9|     1799.12099615|
|2019| 12|1785.7728446190476|
|2020|  7| 3053.100016222222|
|2020|  2|2066.1752672631574|
|2019|  7|1964.6846265384618|
|2019| 11|      1774.2939941|
|2020|  5|2394.1840209499996|
+----+---+------------------+



In [46]:
sqlContext.sql("""
SELECT
    google_stocks.Date,
    google_stocks.Open,
    google_stocks.Close,
    ABS(google_stocks.Close - google_stocks.Open) as spydif
FROM google_stocks
WHERE
    ABS(google_stocks.Close - google_stocks.Open) > 4
""").show()

+----------+-----------+-----------+------------------+
|      Date|       Open|      Close|            spydif|
+----------+-----------+-----------+------------------+
|2019-07-16|     1146.0|1153.579956| 7.579956000000038|
|2019-07-17|1150.969971|1146.349976| 4.619995000000017|
|2019-07-18| 1141.73999|1146.329956| 4.589966000000004|
|2019-07-19|1148.189941|1130.099976| 18.08996500000012|
|2019-07-22|1133.449951|1138.069946| 4.619995000000017|
|2019-07-24|1131.900024|1137.810059|  5.91003499999988|
|2019-07-25|1137.819946|1132.119995|5.6999510000000555|
|2019-07-26|1224.040039|1250.410034|26.369995000000017|
|2019-07-31|     1223.0|1216.680054| 6.319946000000073|
|2019-08-01|1214.030029| 1209.01001|5.0200190000000475|
|2019-08-02| 1200.73999| 1193.98999|              6.75|
|2019-08-05|1170.040039|1152.319946|17.720092999999906|
|2019-08-06|1163.310059|1169.949951| 6.639892000000145|
|2019-08-07|     1156.0| 1173.98999|17.989990000000034|
|2019-08-08|1182.829956|1204.800049|21.970092999

In [52]:
sqlContext.sql("""
SELECT
    YEAR(google_stocks.Date) as year,
    MAX(google_stocks.AdjClose),
    MIN(google_stocks.AdjClose)
FROM google_stocks
GROUP BY
    YEAR(google_stocks.Date)
""").show()


+----+-------------+-------------+
|year|max(AdjClose)|min(AdjClose)|
+----+-------------+-------------+
|2019|  1361.170044|  1130.099976|
|2020|   1541.73999|  1056.619995|
+----+-------------+-------------+



In [54]:
joinclose=sqlContext.sql("""
SELECT
    tesla_stocks.Date,
    tesla_stocks.AdjClose as teslaclose,
    amazon_stocks.AdjClose as amazonclose,
    google_stocks.AdjClose as googleclose
FROM tesla_stocks
INNER JOIN google_stocks
    ON tesla_stocks.Date = google_stocks.Date
INNER JOIN amazon_stocks
    ON tesla_stocks.Date = amazon_stocks.Date
""").cache()

joinclose.show()

+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|253.500000| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|230.750000|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173

In [56]:
joinclose.createOrReplaceTempView("joinclose")

In [57]:
sqlContext.sql("""
SELECT
    YEAR(joinclose.Date) as year,
    AVG(joinclose.teslaclose) as avgteslaclose,
    AVG(joinclose.amazonclose) as avgamazonclose,
    AVG(joinclose.googleclose) as avggoogleclose
FROM joinclose
GROUP BY YEAR(joinclose.Date)
ORDER BY YEAR(joinclose.Date)
""").show()

+----+------------------+------------------+------------------+
|year|     avgteslaclose|    avgamazonclose|    avggoogleclose|
+----+------------------+------------------+------------------+
|2019|283.62126070588243|1800.6161329411755|1245.3833654621849|
|2020| 761.8206739179104|2236.4144787985074| 1362.828690686567|
+----+------------------+------------------+------------------+



## Save Spark DataFrames

### Parquet Files

<img src="https://i.ytimg.com/vi/1j8SdS7s_NY/maxresdefault.jpg" width=500>

In [58]:
joinclose.write.format("parquet").save("joinstocks.parquet")

In [59]:
final_df = sqlContext.read.parquet("joinstocks.parquet")
final_df.show()

+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2020-01-15|518.500000| 1862.02002|1439.199951|
|2020-01-16|513.489990|1877.939941|1451.699951|
|2020-01-17|510.500000|1864.719971|1480.390015|
|2020-01-21|547.200012|     1892.0|1484.400024|
|2020-01-22|569.559998|1887.459961|1485.949951|
|2020-01-23|572.200012|1884.579956|1486.650024|
|2020-01-24|564.820007|1861.640015|1466.709961|
|2020-01-27|558.020020|1828.339966|1433.900024|
|2020-01-28|566.900024|    1853.25|1452.560059|
|2020-01-29|580.989990|     1858.0|1458.630005|
|2020-01-30|640.809998|1870.680054|1455.839966|
|2020-01-31|650.570007|2008.719971| 1434.22998|
|2020-02-03|780.000000|2004.199951|1485.939941|
|2020-02-04|887.059998|2049.669922|1447.069946|
|2020-02-05|734.700012|2039.869995| 1448.22998|
|2020-02-06|748.960022| 2050.22998| 1476.22998|
|2020-02-07|748.070007|2079.280029| 1479.22998|
|2020-02-10|771.280029|2133.909912|1508.