In [1]:
# Set up spark environment
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/project/spark-3.2.1-bin-hadoop3.2"

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark App") \
    .config("spark.jars", "/project/postgresql-42.3.2.jar") \
    .getOrCreate()

# Access data

In [2]:
s_p_companies = spark.read.parquet('/project/parquet_files/s_p_companies.parquet', encoding = 'UTF-8')
s_p_companies.show()

+------+
|ticker|
+------+
|   MMM|
|   AOS|
|   ABT|
|  ABBV|
|  ABMD|
|   ACN|
|  ATVI|
|   ADM|
|  ADBE|
|   ADP|
|   AAP|
|   AES|
|   AFL|
|     A|
|   AIG|
|   APD|
|  AKAM|
|   ALK|
|   ALB|
|   ARE|
+------+
only showing top 20 rows



In [3]:
history = spark.read.parquet('/project/parquet_files/history.parquet', encoding = 'UTF-8')
history.show()

+----------+------+-------------------+-------------------+-------------------+-------------------+-------------------+---------+---------+------------+
|history_id|ticker|               date|               open|               high|                low|              close|   volume|dividents|stock_splits|
+----------+------+-------------------+-------------------+-------------------+-------------------+-------------------+---------+---------+------------+
|         0|  AAPL|1980-12-12 00:00:00|0.10032562166452408|0.10076179581247681|0.10032562166452408|0.10032562166452408|469033600|      0.0|         0.0|
|         1|  AAPL|1980-12-15 00:00:00| 0.0955277555324095| 0.0955277555324095|0.09509158134460449|0.09509158134460449|175884800|      0.0|         0.0|
|         2|  AAPL|1980-12-16 00:00:00|0.08854826203540919|0.08854826203540919|0.08811209350824356|0.08811209350824356|105728000|      0.0|         0.0|
|         3|  AAPL|1980-12-17 00:00:00|0.09029291570186615|0.09072908989790515|0.0

In [4]:
recommendations = spark.read.parquet('/project/parquet_files/recommendations.parquet')
recommendations.show()

+-----------------+------+-----------------+---------------+
|recommendation_id|ticker|            grade|number_of_firms|
+-----------------+------+-----------------+---------------+
|                0|  AAPL|              Buy|            317|
|                1|  AAPL|       Outperform|            164|
|                2|  AAPL|       Overweight|            136|
|                3|  AAPL|          Neutral|             83|
|                4|  AAPL|             Hold|             40|
|                5|  AAPL|     Equal-Weight|             23|
|                6|  AAPL|   Market Perform|             23|
|                7|  AAPL|       Strong Buy|              9|
|                8|  AAPL|             Sell|              8|
|                9|  AAPL|   Sector Perform|              4|
|               10|  AAPL|         Positive|              4|
|               11|  AAPL|                 |              3|
|               12|  AAPL|Market Outperform|              3|
|               13|  AAP

In [5]:
tweets = spark.read.parquet('/project/parquet_files/tweets.parquet')
tweets.show()

+--------+------+------------------------------+
|tweet_id|ticker|                          text|
+--------+------+------------------------------+
|       0|  AAPL|          RT @EnzadonCapita...|
|       1|  AAPL|          I just know $AAPL...|
|       2|  AAPL|          RT @RealJuicyTrad...|
|       3|  AAPL|          RT @TechNewsClub:...|
|       4|  AAPL|          What is the Feder...|
|       5|  AAPL|          Poll: Could this ...|
|       6|  AAPL|私のお気に入りは「AAPL」\nh...|
|       7|  AAPL|          RT @emrahvsibel: ...|
|       8|  AAPL|          $AAPL  Looks like...|
|       9|  AAPL|          #BREAKINGNEWS #Ne...|
|      10|  AAPL|私のお気に入りは「AAPL」\nh...|
|      11|  AAPL|          RT @stageanalysis...|
|      12|  AAPL|          $AAPL 1hr view fr...|
|      13|  AAPL|          RT @emrahvsibel: ...|
|      14|  AAPL|          @SteveWagsInvest ...|
|      15|  AAPL|          Top Trending S&am...|
|      16|  AAPL|          5 Advanced Secret...|
|      17|  AAPL|          Unionization 

# Upload data

In [6]:
# Configure database connection details
postgres_uri = "jdbc:postgresql://database-1.cfde4bhe6lbx.eu-west-2.rds.amazonaws.com:5432/dbtest"
user = ""
password = ""

In [7]:
# Upload S&P companies
s_p_companies.select("ticker").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'stocks_schema.s_p_companies') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [8]:
# Upload history
history.select("ticker").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'stocks_schema.history') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [9]:
# Upload recommendations
recommendations.select("ticker").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'stocks_schema.recommendations') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [10]:
# Upload tweets
tweets.select("ticker").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'stocks_schema.tweets') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()