In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.38)] [Connected to cloud.r-pro                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [2 InRelease 35.9 kB/88.7 kB 40%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 242 kB] [2 InRelease 35.9 kB/88.7 kB 40%] [Connecting to s                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 242 kB] [2 InRele

We'll use Spark to write directly to our Postgres database. But in order to do so, there are few more lines of code we need.

In [None]:
#GRAB SPARK DRIVER FOR POSTGRES USAGE
!wget https://jdbc.postgresql.org/download/postgresql-42.4.1.jar


--2022-08-17 06:16:59--  https://jdbc.postgresql.org/download/postgresql-42.4.1.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1045138 (1021K) [application/java-archive]
Saving to: ‘postgresql-42.4.1.jar’


2022-08-17 06:17:00 (1.66 MB/s) - ‘postgresql-42.4.1.jar’ saved [1045138/1045138]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.4.1.jar").getOrCreate()


## Extract
We can connect to data storage, then extract that data into a DataFrame. We'll do this on two datasets, and be sure to replace the bucket name with one of your own.

We'll start by importing SparkFiles from PySpark into our notebook. This will allow Spark to add a file to our Spark project.

Next, the file is read in with the read method and combined with the `csv()` method, which pulls in our CSV stored in SparkFiles and infers the schema. `SparkFiles.get()` will have Spark retrieve the specified file, since we are dealing with a CSV.  The "," is the chosen separator, and we will have Spark determine the head for us. Enter the following code:

In [None]:
#read data from s3 buckets
#start with user data
from pyspark import SparkFiles
url = 'https://unixbear-bucket.s3.amazonaws.com/user_data.csv'
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)

In [None]:
user_data_df.show(5)

+---+----------+---------+-----------+-------------------+--------------+---------+
| id|first_name|last_name|active_user|     street_address|         state| username|
+---+----------+---------+-----------+-------------------+--------------+---------+
|  1|    Cletus|  Lithcow|      false|78309 Riverside Way|      Virginia|ibearham0|
|  2|       Caz|   Felgat|      false|83 Hazelcrest Place|       Alabama| wwaller1|
|  3|     Kerri|  Crowson|      false|     112 Eliot Pass|North Carolina|ichesnut2|
|  4|   Freddie|    Caghy|      false|    15 Merchant Way|      New York|  tsnarr3|
|  5|   Sadella|    Deuss|      false|   079 Acker Avenue|     Tennessee|fwherrit4|
+---+----------+---------+-----------+-------------------+--------------+---------+
only showing top 5 rows



In [None]:
#user payment extract
url = "https://unixbear-bucket.s3.amazonaws.com/user_payment.csv"
spark.sparkContext.addFile(url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)

In [None]:
user_payment_df.show(5)

+----------+---------+--------------------+
|billing_id| username|        cc_encrypted|
+----------+---------+--------------------+
|         1|ibearham0|a799fcafe47d7fb19...|
|         2| wwaller1|a799fcafe47d7fb19...|
|         3|ichesnut2|a799fcafe47d7fb19...|
|         4|  tsnarr3|a799fcafe47d7fb19...|
|         5|fwherrit4|a799fcafe47d7fb19...|
+----------+---------+--------------------+
only showing top 5 rows



In [None]:
#combine the dfs
joined_df = user_data_df.join(user_payment_df, on='username', how='inner')

In [None]:
joined_df.show()

+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   ibearham0|  1|    Cletus|  Lithcow|      false| 78309 Riverside Way|            Virginia|         1|a799fcafe47d7fb19...|
|    wwaller1|  2|       Caz|   Felgat|      false| 83 Hazelcrest Place|             Alabama|         2|a799fcafe47d7fb19...|
|   ichesnut2|  3|     Kerri|  Crowson|      false|      112 Eliot Pass|      North Carolina|         3|a799fcafe47d7fb19...|
|     tsnarr3|  4|   Freddie|    Caghy|      false|     15 Merchant Way|            New York|         4|a799fcafe47d7fb19...|
|   fwherrit4|  5|   Sadella|    Deuss|      false|    079 Acker Avenue|           Tennessee|         5|a799fcafe47d7f

In [None]:
#remove rows that are null
dropna_df = joined_df.dropna()
dropna_df.show()

+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   ibearham0|  1|    Cletus|  Lithcow|      false| 78309 Riverside Way|            Virginia|         1|a799fcafe47d7fb19...|
|    wwaller1|  2|       Caz|   Felgat|      false| 83 Hazelcrest Place|             Alabama|         2|a799fcafe47d7fb19...|
|   ichesnut2|  3|     Kerri|  Crowson|      false|      112 Eliot Pass|      North Carolina|         3|a799fcafe47d7fb19...|
|     tsnarr3|  4|   Freddie|    Caghy|      false|     15 Merchant Way|            New York|         4|a799fcafe47d7fb19...|
|   fwherrit4|  5|   Sadella|    Deuss|      false|    079 Acker Avenue|           Tennessee|         5|a799fcafe47d7f

In [None]:
#clean user list by removing inactive users

#need col function to ease of filtering through columns
from pyspark.sql.functions import col

#keep rows where active_user is true
cleanedUserDF = dropna_df.filter(col("active_user") == True)
cleanedUserDF.show()

+-------------+---+----------+-----------+-----------+--------------------+--------------------+----------+--------------------+
|     username| id|first_name|  last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+-------------+---+----------+-----------+-----------+--------------------+--------------------+----------+--------------------+
|   fstappard5|  6|    Fraser|    Korneev|       true|  76084 Novick Court|           Minnesota|         6|a799fcafe47d7fb19...|
|   lhambling6|  7|    Demott|     Rapson|       true|    86320 Dahle Park|District of Columbia|         7|a799fcafe47d7fb19...|
|    wheinerte| 15|   Sadella|      Jaram|       true|7528 Waxwing Terrace|         Connecticut|        15|a799fcafe47d7fb19...|
| droughsedgeg| 17|    Hewitt|    Trammel|       true|    2455 Corry Alley|      North Carolina|        17|a799fcafe47d7fb19...|
|    ydudeniei| 19|       Ted|    Knowlys|       true|      31 South Drive|                Ohio| 

Next, select columns to create three different DataFrames that match what is in the AWS RDS database. 

In [None]:
cleanUserDF = cleanedUserDF.select(["id", "first_name", "last_name", "username"])
cleanUserDF.show()

+---+----------+-----------+-------------+
| id|first_name|  last_name|     username|
+---+----------+-----------+-------------+
|  6|    Fraser|    Korneev|   fstappard5|
|  7|    Demott|     Rapson|   lhambling6|
| 15|   Sadella|      Jaram|    wheinerte|
| 17|    Hewitt|    Trammel| droughsedgeg|
| 19|       Ted|    Knowlys|    ydudeniei|
| 23|  Annmarie|     Lafond|     fmyttonm|
| 28|      Toma|     Sokell|   bfletcherr|
| 30|       Ram|    Lefever|     gturleyt|
| 31|    Raddie|    Heindle|    calyukinu|
| 33|    Wallie|       Caws| ckleinlererw|
| 34|    Derril|Varfolomeev|  pshanklandx|
| 39|     Kelcy|     Wheway|    enelane12|
| 40|    Dorree|    Rookeby|    sfollet13|
| 41|    Martyn|       Tott|      mtesh14|
| 43|     Cally|      Thody|   tseyfart16|
| 45|       Ted|   Pittaway|   hfarrier18|
| 48|      Fifi|    Lidgley|     nabbie1b|
| 50|    Ashely|     O'Hern|  ystadding1d|
| 53|   Diannne|Osbaldeston|hhallgalley1g|
| 60|     Sonny|     Jeskin|   ageaveny1n|
+---+------

In [None]:
cleanBillingDF = cleanedUserDF.select(["billing_id","street_address", "state", "username"])
cleanBillingDF.show()

+----------+--------------------+--------------------+-------------+
|billing_id|      street_address|               state|     username|
+----------+--------------------+--------------------+-------------+
|         6|  76084 Novick Court|           Minnesota|   fstappard5|
|         7|    86320 Dahle Park|District of Columbia|   lhambling6|
|        15|7528 Waxwing Terrace|         Connecticut|    wheinerte|
|        17|    2455 Corry Alley|      North Carolina| droughsedgeg|
|        19|      31 South Drive|                Ohio|    ydudeniei|
|        23|     35 Oriole Place|             Georgia|     fmyttonm|
|        28|39641 Eggendart Hill|            Maryland|   bfletcherr|
|        30|   9969 Laurel Alley|               Texas|     gturleyt|
|        31|   811 Talmadge Road|                Ohio|    calyukinu|
|        33|   9999 Kenwood Pass|              Oregon| ckleinlererw|
|        34|     4 Jenifer Court|             Florida|  pshanklandx|
|        39|93207 Morningstar...| 

In [None]:
cleanPaymentDF = cleanedUserDF.select(["billing_id", "cc_encrypted"])
cleanPaymentDF.show()

+----------+--------------------+
|billing_id|        cc_encrypted|
+----------+--------------------+
|         6|a799fcafe47d7fb19...|
|         7|a799fcafe47d7fb19...|
|        15|a799fcafe47d7fb19...|
|        17|a799fcafe47d7fb19...|
|        19|a799fcafe47d7fb19...|
|        23|a799fcafe47d7fb19...|
|        28|a799fcafe47d7fb19...|
|        30|a799fcafe47d7fb19...|
|        31|a799fcafe47d7fb19...|
|        33|a799fcafe47d7fb19...|
|        34|a799fcafe47d7fb19...|
|        39|a799fcafe47d7fb19...|
|        40|a799fcafe47d7fb19...|
|        41|a799fcafe47d7fb19...|
|        43|a799fcafe47d7fb19...|
|        45|a799fcafe47d7fb19...|
|        48|a799fcafe47d7fb19...|
|        50|a799fcafe47d7fb19...|
|        53|a799fcafe47d7fb19...|
|        60|a799fcafe47d7fb19...|
+----------+--------------------+
only showing top 20 rows



## Load
The final step is to get our transformed raw data into our database. PySpark can easily connect to a database to load the DataFrames into the table. First, we'll do some configuration to allow the connection with the following code:

In [None]:
#store env variable
from getpass import getpass
password = getpass('enter DB password')

#configure RDS settings
mode = 'append'
jdbcURL = 'jdbc:postgresql://datavis.c4yohomp20p9.us-east-1.rds.amazonaws.com:5432/my_data_class'

config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

enter DB password··········


The cleaned DataFrames can then be written directly to our database by using the .write.jdbc method that takes in the parameters we set:

- The connection string stored in jdbc_url is passed to the URL argument.
- The corresponding name of the table we are writing the DataFrame to.
- The mode we're using, which is "append."
- The connection configuration we set up passed to the properties.


In [None]:
# Write DataFrame to active_user table in RDS
cleanUserDF.write.jdbc(url=jdbcURL, table='active_user', mode=mode, properties=config)

jdbc:postgresql://datavis.c4yohomp20p9.us-east-1.rds.amazonaws.com:5432/my_data_class


In [None]:
# Write dataframe to billing_info table in RDS
cleanBillingDF.write.jdbc(url=jdbcURL, table='billing_info', mode=mode, properties=config)

In [None]:
# Write dataframe to payment_info table in RDS
cleanPaymentDF.write.jdbc(url=jdbcURL, table='payment_info', mode=mode, properties=config)