In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz
!tar xf spark-3.1.3-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to cloud.r-project.org (108.150% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com] [Connected to                                                                                Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.91.39)]                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Conn                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Conn                           

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-09-28 23:00:12--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-09-28 23:00:12 (4.71 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://iyj-finalprojectbucket.s3.amazonaws.com/output.csv"
spark.sparkContext.addFile(url)
output_df = spark.read.csv(SparkFiles.get("output.csv"), sep=",", header=True, inferSchema=True)
# Show DataFrame
output_df.show()

+----------+-----+-------------+-----------+---------+------------------+
|      Date|Month|Date of Month|Day of Week|Test Data|         Predicted|
+----------+-----+-------------+-----------+---------+------------------+
|1994-01-02|    1|            2|          7|     7772|11891.849166666667|
|1994-01-09|    1|            9|          7|     7910|12474.083214285714|
|1994-01-13|    1|           13|          4|    11212|         12191.577|
|1994-01-16|    1|           16|          7|     8123|12826.990821428572|
|1994-01-23|    1|           23|          7|     8310| 12410.07666666667|
|1994-01-31|    1|           31|          1|    10765|12575.542047619052|
|1994-02-06|    2|            6|          7|     8309|13603.952499999998|
|1994-02-10|    2|           10|          4|    11623| 7471.151964285715|
|1994-02-17|    2|           17|          4|    11674|12043.235333333338|
|1994-02-25|    2|           25|          5|    11905|7457.9288809523805|
|1994-02-27|    2|           27|      

In [5]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, IntegerType, DateType, StructType, DoubleType

In [6]:
# Next we need to create the list of struct fields
schema = [StructField("Date", DateType(), True), 
          StructField("Month", IntegerType(), True), 
          StructField("Date of Month", IntegerType(), True), 
          StructField("Day of Week", IntegerType(), True),
          StructField("Test Data", IntegerType(), True),
          StructField("Predicted", DoubleType(), True),]
schema

[StructField(Date,DateType,true),
 StructField(Month,IntegerType,true),
 StructField(Date of Month,IntegerType,true),
 StructField(Day of Week,IntegerType,true),
 StructField(Test Data,IntegerType,true),
 StructField(Predicted,DoubleType,true)]

In [7]:
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(Date,DateType,true),StructField(Month,IntegerType,true),StructField(Date of Month,IntegerType,true),StructField(Day of Week,IntegerType,true),StructField(Test Data,IntegerType,true),StructField(Predicted,DoubleType,true)))

In [14]:
# Read our data from S3 Buckets with our new schema
from pyspark import SparkFiles
url="https://iyj-finalprojectbucket.s3.amazonaws.com/output.csv"
spark.sparkContext.addFile(url)
output_df = spark.read.csv(SparkFiles.get("output.csv"), schema=final, sep=",", header=True, inferSchema=True)
# Show DataFrame
output_df.show()

+----------+-----+-------------+-----------+---------+------------------+
|      Date|Month|Date of Month|Day of Week|Test Data|         Predicted|
+----------+-----+-------------+-----------+---------+------------------+
|1994-01-02|    1|            2|          7|     7772|11891.849166666667|
|1994-01-09|    1|            9|          7|     7910|12474.083214285714|
|1994-01-13|    1|           13|          4|    11212|         12191.577|
|1994-01-16|    1|           16|          7|     8123|12826.990821428572|
|1994-01-23|    1|           23|          7|     8310| 12410.07666666667|
|1994-01-31|    1|           31|          1|    10765|12575.542047619052|
|1994-02-06|    2|            6|          7|     8309|13603.952499999998|
|1994-02-10|    2|           10|          4|    11623| 7471.151964285715|
|1994-02-17|    2|           17|          4|    11674|12043.235333333338|
|1994-02-25|    2|           25|          5|    11905|7457.9288809523805|
|1994-02-27|    2|           27|      

In [15]:
output_df.show(5)

+----------+-----+-------------+-----------+---------+------------------+
|      Date|Month|Date of Month|Day of Week|Test Data|         Predicted|
+----------+-----+-------------+-----------+---------+------------------+
|1994-01-02|    1|            2|          7|     7772|11891.849166666667|
|1994-01-09|    1|            9|          7|     7910|12474.083214285714|
|1994-01-13|    1|           13|          4|    11212|         12191.577|
|1994-01-16|    1|           16|          7|     8123|12826.990821428572|
|1994-01-23|    1|           23|          7|     8310| 12410.07666666667|
+----------+-----+-------------+-----------+---------+------------------+
only showing top 5 rows



In [16]:
output_df.dtypes

[('Date', 'date'),
 ('Month', 'int'),
 ('Date of Month', 'int'),
 ('Day of Week', 'int'),
 ('Test Data', 'int'),
 ('Predicted', 'double')]

In [17]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cqfboxk2nwzu.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"root",
          "password":"FinalProject8!",
          "driver":"org.postgresql.Driver"}

In [19]:
# Write DataFrame to active_user table in RDS
output_df.write.jdbc(url=jdbc_url, table='OutputTable', mode=mode, properties=config)