In [1]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Stocks:ETL").getOrCreate()

In [3]:
spark.version

'2.4.5'

In [4]:
stocks_dir = '/dataset/stocks-small'

In [5]:
import sys

from pyspark.sql import SparkSession

# UDF
from pyspark.sql.types import StringType
#
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [6]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(stocks_dir)

In [7]:
df.count()
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- OpenInt: integer (nullable = true)



In [8]:
df.show()

+-------------------+------+------+------+------+------+-------+
|               Date|  Open|  High|   Low| Close|Volume|OpenInt|
+-------------------+------+------+------+------+------+-------+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378|467056|      0|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|      0|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|      0|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|      0|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087|655676|      0|
|1962-01-09 00:00:00|6.1208|6.2376|6.1208|6.1621|592806|      0|
|1962-01-10 00:00:00|6.1707|6.2041|6.1707|6.1707|359274|      0|
|1962-01-11 00:00:00|6.1875|6.2376|6.1875|6.2376|386220|      0|
|1962-01-12 00:00:00|6.2543|6.2962|6.2543|6.2543|529933|      0|
|1962-01-15 00:00:00|6.2708|6.2962|6.2708|6.2792|305383|      0|
|1962-01-16 00:00:00|6.2708|6.2708|6.2128|6.2128|305383|      0|
|1962-01-17 00:00:00|6.1875|6.1875|6.0956|6.1125|502984|      0|
|1962-01-18 00:00:00|6.12

In [9]:
df = df.withColumn('filename', F.input_file_name())

In [10]:
df.show(truncate=False)

+-------------------+------+------+------+------+------+-------+---------------------------------------+
|Date               |Open  |High  |Low   |Close |Volume|OpenInt|filename                               |
+-------------------+------+------+------+------+------+-------+---------------------------------------+
|1962-01-02 00:00:00|6.413 |6.413 |6.3378|6.3378|467056|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373|6.087 |655676|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-09 00:00:00|6.1208|6.2376|6.1208|6.1621|592806|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-10 00:00:00|6.1707|6.2041|6.1707|6.1707|359274

In [11]:
df_lookup = spark.read.csv('/dataset/yahoo-symbols-201709.csv')

In [12]:
df_lookup.show()

+------+--------------------+--------+--------------------+-------+
|   _c0|                 _c1|     _c2|                 _c3|    _c4|
+------+--------------------+--------+--------------------+-------+
|Ticker|                Name|Exchange|       Category Name|Country|
|  OEDV|Osage Exploration...|     PNK|                null|    USA|
|  AAPL|          Apple Inc.|     NMS|Electronic Equipment|    USA|
|   BAC|Bank of America C...|     NYQ|  Money Center Banks|    USA|
|  AMZN|    Amazon.com, Inc.|     NMS|Catalog & Mail Or...|    USA|
|     T|           AT&T Inc.|     NYQ|Telecom Services ...|    USA|
|  GOOG|       Alphabet Inc.|     NMS|Internet Informat...|    USA|
|    MO|  Altria Group, Inc.|     NYQ|          Cigarettes|    USA|
|   DAL|Delta Air Lines, ...|     NYQ|      Major Airlines|    USA|
|    AA|   Alcoa Corporation|     NYQ|            Aluminum|    USA|
|   AXP|American Express ...|     NYQ|     Credit Services|    USA|
|    DD|E. I. du Pont de ...|     NYQ|Agricultur

In [13]:
def extract_symbol_from(filename):
    return filename.split('/')[-1].split('.')[0].upper()

In [14]:
# filename = 'file:///dataset/stocks-small/ibm.us.txt' # => IBM
extract_symbol_from('file:///dataset/stocks-small/ibm.us.txt')

'IBM'

In [15]:
extract_symbol = F.udf(lambda filename: extract_symbol_from(filename), StringType())

In [16]:
stocks_folder = stocks_dir
df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(stocks_folder) \
        .withColumn("name", extract_symbol(F.input_file_name()))

In [17]:
df.show(5)

+-------------------+------+------+------+------+------+-------+----+
|               Date|  Open|  High|   Low| Close|Volume|OpenInt|name|
+-------------------+------+------+------+------+------+-------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378|467056|      0| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|      0| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|      0| IBM|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|      0| IBM|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087|655676|      0| IBM|
+-------------------+------+------+------+------+------+-------+----+
only showing top 5 rows



In [18]:
df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(stocks_folder) \
        .withColumn("name", extract_symbol(F.input_file_name())) \
        .withColumnRenamed("Date", "dateTime") \
        .withColumnRenamed("Open", "open") \
        .withColumnRenamed("High", "high") \
        .withColumnRenamed("Low", "low") \
        .withColumnRenamed("Close", "close") \
        .drop("Volume", "OpenInt")

In [19]:
df_stocks = df

In [20]:
df_stocks.show(5)

+-------------------+------+------+------+------+----+
|           dateTime|  open|  high|   low| close|name|
+-------------------+------+------+------+------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295| IBM|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041| IBM|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087| IBM|
+-------------------+------+------+------+------+----+
only showing top 5 rows



In [21]:
lookup_file = '/dataset/yahoo-symbols-201709.csv'

In [22]:
symbols_lookup = spark.read. \
        option("header", True). \
        option("inferSchema", True). \
        csv(lookup_file). \
        select("Ticker", "Category Name"). \
        withColumnRenamed("Ticker", "symbol"). \
        withColumnRenamed("Category Name", "category")

In [23]:
df_stocks.show(3)
symbols_lookup.show(3)

+-------------------+------+------+------+------+----+
|           dateTime|  open|  high|   low| close|name|
+-------------------+------+------+------+------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295| IBM|
+-------------------+------+------+------+------+----+
only showing top 3 rows

+------+--------------------+
|symbol|            category|
+------+--------------------+
|  OEDV|                null|
|  AAPL|Electronic Equipment|
|   BAC|  Money Center Banks|
+------+--------------------+
only showing top 3 rows



In [24]:
joined_df = df_stocks \
    .withColumnRenamed('dateTime', "full_date") \
    .filter("full_date >= \"2017-09-01\"") \
    .withColumn("year", F.year("full_date")) \
    .withColumn("month", F.month("full_date")) \
    .withColumn("day", F.dayofmonth("full_date")) \
    .withColumnRenamed("name", "symbol") \
    .join(symbols_lookup, ["symbol"])

In [25]:
joined_df.show(3)

+------+-------------------+------+------+------+------+----+-----+---+--------------------+
|symbol|          full_date|  open|  high|   low| close|year|month|day|            category|
+------+-------------------+------+------+------+------+----+-----+---+--------------------+
|   IBM|2017-01-03 00:00:00|160.76| 161.6|159.81|160.95|2017|    1|  3|Information Techn...|
|   IBM|2017-01-04 00:00:00|161.51|163.53|161.11|162.94|2017|    1|  4|Information Techn...|
|   IBM|2017-01-05 00:00:00|162.93|163.06|161.01|162.41|2017|    1|  5|Information Techn...|
+------+-------------------+------+------+------+------+----+-----+---+--------------------+
only showing top 3 rows



In [26]:
window20 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-20, 0))
window50 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-50, 0))
window100 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-100, 0))

In [27]:
stocks_moving_avg_df = joined_df \
    .withColumn("ma20", F.avg("close").over(window20)) \
    .withColumn("ma50", F.avg("close").over(window50)) \
    .withColumn("ma100", F.avg("close").over(window100))

In [28]:
# Moving Average
stocks_moving_avg_df.select('symbol', 'close', 'ma20').show(25)

+------+------+------------------+
|symbol| close|              ma20|
+------+------+------------------+
|  AAPL|114.31|            114.31|
|  AAPL|114.19|            114.25|
|  AAPL|114.77|114.42333333333333|
|  AAPL|116.04|          114.8275|
|  AAPL|117.11|115.28399999999999|
|  AAPL|117.23|115.60833333333333|
|  AAPL|117.86|115.92999999999999|
|  AAPL|117.37|            116.11|
|  AAPL|117.16|116.22666666666666|
|  AAPL| 118.1|116.41399999999999|
|  AAPL|118.09|116.56636363636362|
|  AAPL|117.89|116.67666666666666|
|  AAPL| 118.1|116.78615384615384|
|  AAPL|118.19|116.88642857142857|
|  AAPL|118.07|116.96533333333332|
|  AAPL|119.95|117.15187499999999|
|  AAPL|120.01|            117.32|
|  AAPL|120.02|            117.47|
|  AAPL| 119.7|117.58736842105263|
|  AAPL|119.43|117.67949999999999|
|  AAPL| 126.7| 118.1090476190476|
|  AAPL| 126.5|118.68952380952379|
|  AAPL|127.03| 119.3009523809524|
|  AAPL|128.23|119.94190476190477|
|  AAPL|129.44|120.58000000000004|
+------+------+-----

In [29]:
output_dir = '/dataset/output.parquet'

In [30]:
stocks_moving_avg_df \
    .write \
    .mode('overwrite') \
    .partitionBy("year", "month", "day") \
    .parquet(output_dir)

In [31]:
df_parquet = spark.read.parquet(output_dir)

In [32]:
df_parquet.count()

4142

In [33]:
df_parquet.createOrReplaceTempView("stocks")

In [34]:
badHighestClosingPrice = spark.sql("SELECT symbol, MAX(close) AS price FROM stocks WHERE full_date >= '2017-09-01' AND full_date < '2017-10-01' GROUP BY symbol")
badHighestClosingPrice.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[symbol#559], functions=[max(close#564)])
+- Exchange hashpartitioning(symbol#559, 200)
   +- *(1) HashAggregate(keys=[symbol#559], functions=[partial_max(close#564)])
      +- *(1) Project [symbol#559, close#564]
         +- *(1) Filter ((isnotnull(full_date#560) && (cast(full_date#560 as string) >= 2017-09-01)) && (cast(full_date#560 as string) < 2017-10-01))
            +- *(1) FileScan parquet [symbol#559,full_date#560,close#564,year#569,month#570,day#571] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/dataset/output.parquet], PartitionCount: 218, PartitionFilters: [], PushedFilters: [IsNotNull(full_date)], ReadSchema: struct<symbol:string,full_date:timestamp,close:double>


In [35]:
highestClosingPrice = spark.sql("SELECT symbol, MAX(close) AS price FROM stocks WHERE year=2017 AND month=9 GROUP BY symbol")
highestClosingPrice.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[symbol#559], functions=[max(close#564)])
+- Exchange hashpartitioning(symbol#559, 200)
   +- *(1) HashAggregate(keys=[symbol#559], functions=[partial_max(close#564)])
      +- *(1) Project [symbol#559, close#564]
         +- *(1) FileScan parquet [symbol#559,close#564,year#569,month#570,day#571] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/dataset/output.parquet], PartitionCount: 20, PartitionFilters: [isnotnull(year#569), isnotnull(month#570), (year#569 = 2017), (month#570 = 9)], PushedFilters: [], ReadSchema: struct<symbol:string,close:double>


In [36]:
# Write to Postgres
stocks_moving_avg_df \
    .drop("year", "month", "day") \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.stocks") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()