In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import input_file_name, substring_index, to_timestamp, year, month, dayofweek, weekofyear, date_format, lit
from pyspark.sql.types import IntegerType, DoubleType
spark = SparkSession.builder.config("spark.jars", "./drivers/postgresql-42.2.18.jar").getOrCreate()

In [20]:
df = spark.read.json('./data/nyse-listed_json.json')
df2 = spark.read.json('./data/other-listed_json.json')


In [21]:
df = df.withColumn('Exchange', lit('N'))
df.show()

+----------+--------------------+--------+
|ACT Symbol|        Company Name|Exchange|
+----------+--------------------+--------+
|         A|Agilent Technolog...|       N|
|        AA|Alcoa Inc. Common...|       N|
|      AA$B|Alcoa Inc. Deposi...|       N|
|       AAC|AAC Holdings, Inc...|       N|
|       AAN|Aaron's, Inc. Com...|       N|
|       AAP|Advance Auto Part...|       N|
|       AAT|American Assets T...|       N|
|       AAV|Advantage Oil & G...|       N|
|        AB|Allianceberstein ...|       N|
|       ABB|ABB Ltd Common Stock|       N|
|      ABBV|AbbVie Inc. Commo...|       N|
|       ABC|AmerisourceBergen...|       N|
|      ABEV|Ambev S.A. Americ...|       N|
|       ABG|Asbury Automotive...|       N|
|       ABM|ABM Industries In...|       N|
|       ABR|Arbor Realty Trus...|       N|
|     ABR$A|Arbor Realty Trus...|       N|
|     ABR$B|Arbor Realty Trus...|       N|
|     ABR$C|Arbor Realty Trus...|       N|
|      ABRN|Arbor Realty Trus...|       N|
+----------

In [22]:
df2.show()

+----------+----------+--------------------+---+--------+-------------+--------------+--------------------+----------+
|ACT Symbol|CQS Symbol|        Company Name|ETF|Exchange|NASDAQ Symbol|Round Lot Size|       Security Name|Test Issue|
+----------+----------+--------------------+---+--------+-------------+--------------+--------------------+----------+
|         A|         A|Agilent Technolog...|  N|       N|            A|         100.0|Agilent Technolog...|         N|
|        AA|        AA|Alcoa Inc. Common...|  N|       N|           AA|         100.0|Alcoa Inc. Common...|         N|
|       AA$|       AAp|Alcoa Inc. $3.75 ...|  N|       A|          AA-|         100.0|Alcoa Inc. $3.75 ...|         N|
|      AA$B|      AApB|Alcoa Inc. Deposi...|  N|       N|         AA-B|         100.0|Alcoa Inc. Deposi...|         N|
|       AAC|       AAC|AAC Holdings, Inc...|  N|       N|          AAC|         100.0|AAC Holdings, Inc...|         N|
|      AADR|      AADR|WCM BNY Mellon Fo...|  Y|

In [23]:
df3 = spark.read.csv('./data/ETFs/*.txt', sep=',', header=True).withColumn("filename", input_file_name())
df3 = df3.withColumn("act_symbol", substring_index(substring_index(df3.filename, "/", -1), ".", 1)) \
    .withColumn('date_formatted', to_timestamp(df3.Date, 'yyyy-MM-dd')) \
    .withColumn('full_date', date_format(df3.Date, "yyyyMMdd").cast(IntegerType()))

df3 = df3.drop("filename", "OpenInt", "Date")

+------+------+------+------+--------+----------+-------------------+---------+
|Open  |High  |Low   |Close |Volume  |act_symbol|date_formatted     |full_date|
+------+------+------+------+--------+----------+-------------------+---------+
|45.722|45.75 |44.967|45.665|11700414|qqq       |1999-03-10 00:00:00|19990310 |
|45.994|46.26 |44.988|45.88 |21670048|qqq       |1999-03-11 00:00:00|19990311 |
|45.721|45.749|44.406|44.77 |19553768|qqq       |1999-03-12 00:00:00|19990312 |
|45.101|46.103|44.625|46.052|14245348|qqq       |1999-03-15 00:00:00|19990315 |
|46.253|46.643|45.749|46.447|10971066|qqq       |1999-03-16 00:00:00|19990316 |
|46.443|46.5  |45.969|46.106|8867842 |qqq       |1999-03-17 00:00:00|19990317 |
|46.055|47.033|46.042|47.003|10843416|qqq       |1999-03-18 00:00:00|19990318 |
|47.623|47.623|45.775|45.806|16013553|qqq       |1999-03-19 00:00:00|19990319 |
|45.992|46.102|45.156|45.24 |11239060|qqq       |1999-03-22 00:00:00|19990322 |
|44.991|45.16 |43.567|43.65 |24517597|qq

In [24]:
df4 = spark.read.csv('./data/Stocks/*.txt', sep=',', header=True).withColumn("filename", input_file_name())
df4 = df4.withColumn("act_symbol", substring_index(substring_index(df4.filename, "/", -1), ".", 1)) \
    .withColumn('date_formatted', to_timestamp(df4.Date, 'yyyy-MM-dd')) \
    .withColumn('full_date', date_format(df4.Date, "yyyyMMdd").cast(IntegerType())) 
        
df4 = df4.drop("filename", "OpenInt", "Date")

+------+------+------+------+-------+----------+-------------------+---------+
|Open  |High  |Low   |Close |Volume |act_symbol|date_formatted     |full_date|
+------+------+------+------+-------+----------+-------------------+---------+
|0.6277|0.6362|0.6201|0.6201|2575579|ge        |1962-01-02 00:00:00|19620102 |
|0.6201|0.6201|0.6122|0.6201|1764749|ge        |1962-01-03 00:00:00|19620103 |
|0.6201|0.6201|0.6037|0.6122|2194010|ge        |1962-01-04 00:00:00|19620104 |
|0.6122|0.6122|0.5798|0.5957|3255244|ge        |1962-01-05 00:00:00|19620105 |
|0.5957|0.5957|0.5716|0.5957|3696430|ge        |1962-01-08 00:00:00|19620108 |
|0.5957|0.6037|0.5878|0.5957|2778285|ge        |1962-01-09 00:00:00|19620109 |
|0.5957|0.6037|0.5957|0.5957|2337096|ge        |1962-01-10 00:00:00|19620110 |
|0.5957|0.5957|0.5878|0.5957|1943605|ge        |1962-01-11 00:00:00|19620111 |
|0.5957|0.6037|0.5878|0.5878|2015151|ge        |1962-01-12 00:00:00|19620112 |
|0.5957|0.5957|0.5957|0.5957|2527879|ge        |1962

In [25]:
datedf = df4.select("date_formatted", "full_date").distinct() \
    .withColumn("day_of_week", dayofweek("date_formatted")) \
    .withColumn("week_of_year", weekofyear("date_formatted")) \
    .withColumn("month", month("date_formatted")) \
    .withColumn("year", year("date_formatted")) \
    .drop("date_formatted")

In [27]:
def create_table(df: DataFrame, table_name: str):
    user = "postgres"
    password = "25March1994"
    url = "jdbc:postgresql://localhost:5432/capstone"
    properties = {
        "user": user,
        "password": password
    }
    df.write.jdbc(url=url, table=table_name, mode="overwrite", properties=properties)

In [28]:
create_table(datedf, "trade_day")

In [30]:
stock_daily_df = df3.unionByName(df4).dropDuplicates()
stock_daily_df = stock_daily_df \
    .withColumnRenamed("Open", "open") \
    .withColumnRenamed("High", "high") \
    .withColumnRenamed("Low", "low") \
    .withColumnRenamed("Close", "close") \
    .withColumnRenamed("Volume", "volume") \
    .drop("date_formatted") 

In [32]:
stock_daily_df = stock_daily_df \
    .withColumn("open", stock_daily_df.open.cast(DoubleType())) \
    .withColumn("high", stock_daily_df.high.cast(DoubleType())) \
    .withColumn("low", stock_daily_df.low.cast(DoubleType())) \
    .withColumn("close", stock_daily_df.close.cast(DoubleType())) \
    .withColumn("volume", stock_daily_df.volume.cast(IntegerType()))

root
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- act_symbol: string (nullable = false)
 |-- full_date: integer (nullable = true)



In [42]:
create_table(stock_daily_df, "daily_stock")

In [43]:
company_df = df.union(df2.select("ACT Symbol", "Company Name", "Exchange"))

In [44]:
company_df = company_df.dropDuplicates() \
    .withColumnRenamed("ACT Symbol", "act_symbol") \
    .withColumnRenamed("Company Name", "company_name") \
    .withColumnRenamed("Exchange", "exchange_code")


In [45]:
create_table(company_df, "company_details")

In [46]:
exhange_df = spark.read.json('./data/exchange_list.json')
country_df = spark.read.json('./data/country_list.json')

In [47]:
create_table(exhange_df, "exchange_details")

In [48]:
create_table(country_df, "country_details")

In [11]:
def read_table(table_name: str, spark: SparkSession):
    user = "postgres"
    password = "25March1994"
    url = "jdbc:postgresql://localhost:5432/capstone"
    properties = {
        "user": user,
        "password": password
    }
    df = spark.read.jdbc(url=url, table=table_name, properties=properties)
    return df

In [12]:
df = read_table("company_details", spark)

In [None]:
spark