In [1]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import functions as F, SparkSession
import pandas as pd

In [3]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("pyspark-postgres")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [4]:
spark.version

'2.4.5'

In [5]:
file_path = '/data/stock/stock_prices.csv'

In [6]:
df_stocks = spark.read.csv(path=file_path, inferSchema=True, header=True)

In [7]:
df_stocks.count()

497472

In [8]:
df_stocks.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



In [9]:
df_stocks.show()

+------+-------------------+-------+-------+--------+-------+--------+
|symbol|               date|   open|   high|     low|  close|  volume|
+------+-------------------+-------+-------+--------+-------+--------+
|   AAL|2014-01-02 00:00:00|  25.07|  25.82|   25.06|  25.36| 8998943|
|  AAPL|2014-01-02 00:00:00|79.3828|79.5756| 78.8601|79.0185|58791957|
|   AAP|2014-01-02 00:00:00| 110.36| 111.88|  109.29| 109.74|  542711|
|  ABBV|2014-01-02 00:00:00|  52.12|  52.33|   51.52|  51.98| 4569061|
|   ABC|2014-01-02 00:00:00|  70.11|  70.23|   69.48|  69.89| 1148391|
|   ABT|2014-01-02 00:00:00|  38.09|   38.4|    38.0|  38.23| 4967472|
|   ACN|2014-01-02 00:00:00|   81.5|  81.92|   81.09|  81.13| 2405384|
|  ADBE|2014-01-02 00:00:00|  59.06|  59.53|   58.94|  59.29| 2746370|
|   ADI|2014-01-02 00:00:00|  49.52|  49.75|   49.04|  49.28| 2799092|
|   ADM|2014-01-02 00:00:00|  43.22|  43.29|   42.79|  42.99| 2753765|
|   ADP|2014-01-02 00:00:00|  80.17|  80.45|   79.38|  79.86| 1965869|
|  ADS

In [10]:
file_path = '/data/stock/yahoo-symbols-201709.csv'

In [11]:
df_lookup = spark.read.csv(path=file_path, inferSchema=True, header=True)\
                            .withColumnRenamed('Ticker', 'symbol')\
                            .withColumnRenamed('Name', 'name')\
                            .withColumnRenamed('Exchange', 'exchange')\
                            .withColumnRenamed('Category Name', 'category')\
                            .withColumnRenamed('Country', 'country')

In [12]:
df_lookup.count()

106328

In [13]:
df_lookup.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- exchange: string (nullable = true)
 |-- category: string (nullable = true)
 |-- country: string (nullable = true)



In [14]:
df_lookup.show()

+------+--------------------+--------+--------------------+-------+
|symbol|                name|exchange|            category|country|
+------+--------------------+--------+--------------------+-------+
|  OEDV|Osage Exploration...|     PNK|                null|    USA|
|  AAPL|          Apple Inc.|     NMS|Electronic Equipment|    USA|
|   BAC|Bank of America C...|     NYQ|  Money Center Banks|    USA|
|  AMZN|    Amazon.com, Inc.|     NMS|Catalog & Mail Or...|    USA|
|     T|           AT&T Inc.|     NYQ|Telecom Services ...|    USA|
|  GOOG|       Alphabet Inc.|     NMS|Internet Informat...|    USA|
|    MO|  Altria Group, Inc.|     NYQ|          Cigarettes|    USA|
|   DAL|Delta Air Lines, ...|     NYQ|      Major Airlines|    USA|
|    AA|   Alcoa Corporation|     NYQ|            Aluminum|    USA|
|   AXP|American Express ...|     NYQ|     Credit Services|    USA|
|    DD|E. I. du Pont de ...|     NYQ|Agricultural Chem...|    USA|
|  BABA|Alibaba Group Hol...|     NYQ|Specialty 

In [15]:
df_countries = df_lookup.groupby('country').count()
df_countries = df_countries.orderBy(df_countries['count'].desc()).na.drop()
df_countries.show()

+--------------+-----+
|       country|count|
+--------------+-----+
|           USA|22169|
|       Germany|21365|
|        France|11176|
|         India| 8984|
|        Canada| 4145|
|United Kingdom| 4102|
|        Taiwan| 2700|
|     Australia| 2146|
|        Brazil| 2109|
|     Hong Kong| 1949|
|   South Korea| 1720|
|      Malaysia| 1654|
|      Thailand| 1349|
|         China| 1236|
|     Singapore|  978|
|        Sweden|  898|
|   Switzerland|  845|
|        Mexico|  767|
|     Indonesia|  538|
|         Italy|  521|
+--------------+-----+
only showing top 20 rows



In [16]:
df_exchange = df_lookup.groupby('exchange').count()
df_exchange = df_exchange.orderBy(df_exchange['count'].desc()).na.drop()
df_exchange = df_exchange.filter(df_exchange['exchange'] != 'PNK')
df_exchange.show()

+--------+-----+
|exchange|count|
+--------+-----+
|     FRA|10087|
|     BER| 6968|
|     BSE| 6087|
|     STU| 5444|
|     MUN| 4213|
|     LSE| 3944|
|     NYQ| 3384|
|     NSI| 2897|
|     DUS| 2254|
|     VAN| 2226|
|     ASX| 2146|
|     SAO| 2109|
|     TWO| 2031|
|     HKG| 1949|
|     NMS| 1783|
|     TOR| 1663|
|     KLS| 1654|
|     SET| 1349|
|     SHH| 1231|
|     HAM| 1032|
+--------+-----+
only showing top 20 rows



In [17]:
df_joined = df_stocks \
    .withColumn("year", F.year("date")) \
    .withColumn("month", F.month("date")) \
    .withColumn("day", F.dayofmonth("date")) \
    .join(df_lookup, ["symbol"]) \
    .drop('category', 'country')

In [18]:
df_joined.show()

+------+-------------------+-------+-------+--------+-------+--------+----+-----+---+--------------------+--------+
|symbol|               date|   open|   high|     low|  close|  volume|year|month|day|                name|exchange|
+------+-------------------+-------+-------+--------+-------+--------+----+-----+---+--------------------+--------+
|   AAL|2014-01-02 00:00:00|  25.07|  25.82|   25.06|  25.36| 8998943|2014|    1|  2|American Airlines...|     NMS|
|  AAPL|2014-01-02 00:00:00|79.3828|79.5756| 78.8601|79.0185|58791957|2014|    1|  2|          Apple Inc.|     NMS|
|   AAP|2014-01-02 00:00:00| 110.36| 111.88|  109.29| 109.74|  542711|2014|    1|  2|Advance Auto Part...|     NYQ|
|  ABBV|2014-01-02 00:00:00|  52.12|  52.33|   51.52|  51.98| 4569061|2014|    1|  2|         AbbVie Inc.|     NYQ|
|   ABC|2014-01-02 00:00:00|  70.11|  70.23|   69.48|  69.89| 1148391|2014|    1|  2|AmerisourceBergen...|     NYQ|
|   ABT|2014-01-02 00:00:00|  38.09|   38.4|    38.0|  38.23| 4967472|20

In [34]:
df_joined_nms = df_joined[df_joined['symbol'].isin(['AAPL', 'GOOGL', 'PFE'])]
df_joined_nms.show()

+------+-------------------+--------+--------+--------+--------+---------+----+-----+---+-------------+--------+
|symbol|               date|    open|    high|     low|   close|   volume|year|month|day|         name|exchange|
+------+-------------------+--------+--------+--------+--------+---------+----+-----+---+-------------+--------+
|  AAPL|2014-01-02 00:00:00| 79.3828| 79.5756| 78.8601| 79.0185| 58791957|2014|    1|  2|   Apple Inc.|     NMS|
| GOOGL|2014-01-02 00:00:00|558.2877|559.4339|554.6842|557.1166|  3641796|2014|    1|  2|Alphabet Inc.|     NMS|
|   PFE|2014-01-02 00:00:00|   30.47|    30.6|   30.33|   30.46| 17438787|2014|    1|  2|  Pfizer Inc.|     NYQ|
|  AAPL|2014-01-03 00:00:00| 78.9799| 79.0999| 77.2042| 77.2828| 98303870|2014|    1|  3|   Apple Inc.|     NMS|
| GOOGL|2014-01-03 00:00:00|558.0575|559.0235|553.0175|553.0525|  3335123|2014|    1|  3|Alphabet Inc.|     NMS|
|   PFE|2014-01-03 00:00:00|   30.39|   30.83|   30.38|   30.52| 15036749|2014|    1|  3|  Pfize

In [30]:
df_joined_nms = df_joined_nms.dropna()
df_joined_nms = df_joined_nms.drop_duplicates()

In [31]:
output_dir = '/data/output.parquet'

In [32]:
df_joined_nms \
    .write \
    .mode('overwrite') \
    .partitionBy("year", "month", "day") \
    .parquet(output_dir)

In [25]:
df_joined_nms.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- exchange: string (nullable = true)



In [33]:
df_joined_nms \
    .drop("year", "month", "day", 'exchange') \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/seminario") \
    .option("dbtable", "seminario.stocks") \
    .option("user", "seminario") \
    .option("password", "sem1nar10") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()