In [1]:
#using pyspark 

import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,626 B] [Connecting to ar

In [2]:
#Postgres driver that will allow Spark to interact with Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-10-28 17:19:46--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.6’


2021-10-28 17:19:47 (6.07 MB/s) - ‘postgresql-42.2.16.jar.6’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
#get data from S3 Buckets
from pyspark import SparkFiles
url = "https://team5-stock-index-bucket.s3.us-east-2.amazonaws.com/final+project+data/GDP.csv"
spark.sparkContext.addFile(url)
gdp_df = spark.read.csv(SparkFiles.get("GDP.csv"), sep=",", header=True, inferSchema=True)

In [5]:
gdp_df.show(truncate = False)

+---------------------------+----------------------------+-----------------+
|Data Source                |World Development Indicators|_c2              |
+---------------------------+----------------------------+-----------------+
|Last Updated Date          |2021-09-15                  |null             |
|Country Name               |Country Code                |Indicator Name   |
|Aruba                      |ABW                         |GDP (current US$)|
|Africa Eastern and Southern|AFE                         |GDP (current US$)|
|Afghanistan                |AFG                         |GDP (current US$)|
|Africa Western and Central |AFW                         |GDP (current US$)|
|Angola                     |AGO                         |GDP (current US$)|
|Albania                    |ALB                         |GDP (current US$)|
|Andorra                    |AND                         |GDP (current US$)|
|Arab World                 |ARB                         |GDP (current US$)|

In [6]:
url = "https://team5-stock-index-bucket.s3.us-east-2.amazonaws.com/final+project+data/indexProcessed.csv"
spark.sparkContext.addFile(url)
indexProcessed_df = spark.read.csv(SparkFiles.get("indexProcessed.csv"), sep=",", header=True, inferSchema=True)

In [7]:
indexProcessed_df.describe

<bound method DataFrame.describe of DataFrame[Index: string, Date: string, Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: double, CloseUSD: double]>

In [8]:
indexProcessed_df = indexProcessed_df.withColumnRenamed("Adj Close", "Adj_Close")

In [9]:
# change Date column data type from string to date 
#from pyspark.sql.functions import col
#from pyspark.sql.functions import to_date
#indexProcessed_df = indexProcessed_df.select(["Date", to_date("Date", 'yyyy-MM-dd').alias("Date")])

In [10]:
indexProcessed_df.show()

+-----+----------+-----------+-----------+-----------+-----------+-----------+------+------------------+
|Index|      Date|       Open|       High|        Low|      Close|  Adj_Close|Volume|          CloseUSD|
+-----+----------+-----------+-----------+-----------+-----------+-----------+------+------------------+
|  HSI|1986-12-31|2568.300049|2568.300049|2568.300049|2568.300049|2568.300049|   0.0|      333.87900637|
|  HSI|1987-01-02|2540.100098|2540.100098|2540.100098|2540.100098|2540.100098|   0.0|      330.21301274|
|  HSI|1987-01-05|2552.399902|2552.399902|2552.399902|2552.399902|2552.399902|   0.0|      331.81198726|
|  HSI|1987-01-06|2583.899902|2583.899902|2583.899902|2583.899902|2583.899902|   0.0|335.90698726000005|
|  HSI|1987-01-07|2607.100098|2607.100098|2607.100098|2607.100098|2607.100098|   0.0|      338.92301274|
|  HSI|1987-01-08|2603.300049|2603.300049|2603.300049|2603.300049|2603.300049|   0.0|      338.42900637|
|  HSI|1987-01-09|2561.699951|2561.699951|2561.699951|2

In [11]:
url = "https://team5-stock-index-bucket.s3.us-east-2.amazonaws.com/final+project+data/indexInfo.csv"
spark.sparkContext.addFile(url)
indexInfo_df = spark.read.csv(SparkFiles.get("indexInfo.csv"), sep=",", header=True, inferSchema=True)

In [12]:
indexInfo_df.show()

+-------------+--------------------+---------+--------+
|       Region|            Exchange|    Index|Currency|
+-------------+--------------------+---------+--------+
|United States|New York Stock Ex...|      NYA|     USD|
|United States|              NASDAQ|     IXIC|     USD|
|    Hong Kong|Hong Kong Stock E...|      HSI|     HKD|
|        China|Shanghai Stock Ex...|000001.SS|     CNY|
|        Japan|Tokyo Stock Exchange|     N225|     JPY|
|       Europe|            Euronext|     N100|     EUR|
|        China|Shenzhen Stock Ex...|399001.SZ|     CNY|
|       Canada|Toronto Stock Exc...|   GSPTSE|     CAD|
|        India|National Stock Ex...|     NSEI|     INR|
|      Germany|Frankfurt Stock E...|    GDAXI|     EUR|
|        Korea|      Korea Exchange|     KS11|     KRW|
|  Switzerland|  SIX Swiss Exchange|     SSMI|     CHF|
|       Taiwan|Taiwan Stock Exch...|     TWII|     TWD|
| South Africa|Johannesburg Stoc...|  J203.JO|     ZAR|
+-------------+--------------------+---------+--

In [13]:
# Store environmental variable
from getpass import getpass
password = getpass('finalproject99')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://team5-stock-index-analysis.c5eoj6us86ug.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

finalproject99··········


In [16]:
#write indexInfo_df to indexInfo table in RDS
indexInfo_df.write.jdbc(url=jdbc_url, table='indexInfo', mode=mode, properties=config)

In [17]:
#write indexProecessed_df to indexProcessed table in RDS
indexProcessed_df.write.jdbc(url=jdbc_url, table='indexProcessed', mode=mode, properties=config)