In [None]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.<version number>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-03-21 13:17:31--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-03-21 13:17:32 (6.02 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [31]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://<bucket name>.s3.amazonaws.com/user_data.csv" 
#url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1/22-big-data/day_3/user_data.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show()

+---+----------+---------+-----------+--------------------+--------------------+----------+
| id|first_name|last_name|active_user|      street_address|               state|  username|
+---+----------+---------+-----------+--------------------+--------------------+----------+
|  1|      Andy|    Tuvey|      false| 12376 Darwin Circle|            New York|   atuvey0|
|  2|   Bastian|  Francke|       true|14034 Summerview ...|          Washington| bfrancke1|
|  3|    Dallis|    Duffy|      false| 8 Autumn Leaf Court|                Ohio|   dduffy2|
|  4|      Zena|    Saker|       true|      3605 Gina Park|          California|   zsaker3|
|  5|      Jere|    Argue|       true|        8 Hauk Court|          New Jersey|   jargue4|
|  6|    Jethro| Creeghan|       true|29047 Internation...|               Texas|jcreeghan5|
|  7|  Meridith|   Rapley|      false|95258 Stuart Terrace|             Indiana|  mrapley6|
|  8| Anastasie|     Gant|       true|   4193 Dexter Drive|            Michigan|

In [35]:
url="https://<bucket name>.s3.amazonaws.com/user_payment.csv"
#url = 'https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1/22-big-data/day_3/user_payment.csv'
spark.sparkContext.addFile(url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)

#Rename the id column to "billing_id" to match our schema.

user_payment_df = user_payment_df.withColumnRenamed("id","billing_id")

# Show DataFrame
user_payment_df.show()

+----------+----------+--------------------+
|billing_id|  username|        cc_encrypted|
+----------+----------+--------------------+
|         1|   atuvey0|e1a4f985f7607bbae...|
|         2| bfrancke1|e1a4f985f7607bbae...|
|         3|   dduffy2|e1a4f985f7607bbae...|
|         4|   zsaker3|e1a4f985f7607bbae...|
|         5|   jargue4|e1a4f985f7607bbae...|
|         6|jcreeghan5|e1a4f985f7607bbae...|
|         7|  mrapley6|e1a4f985f7607bbae...|
|         8|    agant7|e1a4f985f7607bbae...|
|         9|  bgummow8|e1a4f985f7607bbae...|
|        10| nhaughin9|e1a4f985f7607bbae...|
|        11| sjurczika|e1a4f985f7607bbae...|
|        12|  knuschab|e1a4f985f7607bbae...|
|        13|   rantonc|e1a4f985f7607bbae...|
|        14|     jtodd|e1a4f985f7607bbae...|
|        15| dswinneye|e1a4f985f7607bbae...|
|        16|  ufernanf|e1a4f985f7607bbae...|
|        17|  ajoshamg|e1a4f985f7607bbae...|
|        18|cstiddardh|e1a4f985f7607bbae...|
|        19| mblizardi|e1a4f985f7607bbae...|
|        2

In [36]:
# Join the two DataFrame, 
joined_df= user_data_df.join(user_payment_df, on="username", how="inner")
joined_df.show()

+----------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|  username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+----------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   atuvey0|  1|      Andy|    Tuvey|      false| 12376 Darwin Circle|            New York|         1|e1a4f985f7607bbae...|
| bfrancke1|  2|   Bastian|  Francke|       true|14034 Summerview ...|          Washington|         2|e1a4f985f7607bbae...|
|   dduffy2|  3|    Dallis|    Duffy|      false| 8 Autumn Leaf Court|                Ohio|         3|e1a4f985f7607bbae...|
|   zsaker3|  4|      Zena|    Saker|       true|      3605 Gina Park|          California|         4|e1a4f985f7607bbae...|
|   jargue4|  5|      Jere|    Argue|       true|        8 Hauk Court|          New Jersey|         5|e1a4f985f7607bbae...|
|jcreegh

In [37]:
# Drop null values, by default is any which means will drop any rows which has a NA
dropna_df = joined_df.dropna(how='any')
dropna_df.show()

+----------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|  username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+----------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   atuvey0|  1|      Andy|    Tuvey|      false| 12376 Darwin Circle|            New York|         1|e1a4f985f7607bbae...|
| bfrancke1|  2|   Bastian|  Francke|       true|14034 Summerview ...|          Washington|         2|e1a4f985f7607bbae...|
|   dduffy2|  3|    Dallis|    Duffy|      false| 8 Autumn Leaf Court|                Ohio|         3|e1a4f985f7607bbae...|
|   zsaker3|  4|      Zena|    Saker|       true|      3605 Gina Park|          California|         4|e1a4f985f7607bbae...|
|   jargue4|  5|      Jere|    Argue|       true|        8 Hauk Court|          New Jersey|         5|e1a4f985f7607bbae...|
|jcreegh

In [38]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with active users
cleaned_df = dropna_df.filter(col("active_user")  == True)
cleaned_df.show()

+------------+---+----------+----------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name| last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+----------+-----------+--------------------+--------------------+----------+--------------------+
|   bfrancke1|  2|   Bastian|   Francke|       true|14034 Summerview ...|          Washington|         2|e1a4f985f7607bbae...|
|     zsaker3|  4|      Zena|     Saker|       true|      3605 Gina Park|          California|         4|e1a4f985f7607bbae...|
|     jargue4|  5|      Jere|     Argue|       true|        8 Hauk Court|          New Jersey|         5|e1a4f985f7607bbae...|
|  jcreeghan5|  6|    Jethro|  Creeghan|       true|29047 Internation...|               Texas|         6|e1a4f985f7607bbae...|
|      agant7|  8| Anastasie|      Gant|       true|   4193 Dexter Drive|            Michigan|         8|e1a4f9

In [39]:
# Create user dataframe to match active_user table
clean_user_df = cleaned_df.select(["billing_id", "first_name", "last_name", "username"])
clean_user_df.show()

+----------+----------+----------+------------+
|billing_id|first_name| last_name|    username|
+----------+----------+----------+------------+
|         2|   Bastian|   Francke|   bfrancke1|
|         4|      Zena|     Saker|     zsaker3|
|         5|      Jere|     Argue|     jargue4|
|         6|    Jethro|  Creeghan|  jcreeghan5|
|         8| Anastasie|      Gant|      agant7|
|        13|      Roth|     Anton|     rantonc|
|        14|     Jareb|       Tod|       jtodd|
|        18|    Carney|  Stiddard|  cstiddardh|
|        19|    Milzie|   Blizard|   mblizardi|
|        22|     Chuck|     Davio|     cdaviol|
|        25|    Maisie|      Pack|      mpacko|
|        27|   Trueman|   Spering|   tsperingq|
|        28|   Rosalyn|  Gascoyen|  rgascoyenr|
|        29|       Lev|     Basey|     lbaseys|
|        32|      Jean|   McPhail|   jmcphailv|
|        33|    Robbyn|   Macvain|   rmacvainw|
|        35|    Binnie|   de Grey|    bdegreyy|
|        36|   Phineas|Hindenburg|phinde

In [40]:
# Create user dataframe to match billing_info table.  
clean_billing_df = cleaned_df.select(["billing_id", "street_address", "state", "username"])
clean_billing_df.show()

+----------+--------------------+--------------------+------------+
|billing_id|      street_address|               state|    username|
+----------+--------------------+--------------------+------------+
|         2|14034 Summerview ...|          Washington|   bfrancke1|
|         4|      3605 Gina Park|          California|     zsaker3|
|         5|        8 Hauk Court|          New Jersey|     jargue4|
|         6|29047 Internation...|               Texas|  jcreeghan5|
|         8|   4193 Dexter Drive|            Michigan|      agant7|
|        13|  143 Garrison Drive|District of Columbia|     rantonc|
|        14|      468 Vidon Lane|           Tennessee|       jtodd|
|        18|93776 Twin Pines ...|               Texas|  cstiddardh|
|        19|      4 Corscot Park|            Kentucky|   mblizardi|
|        22|       33 Elka Trail|              Nevada|     cdaviol|
|        25|0407 Northfield P...|           Minnesota|      mpacko|
|        27|98 Rockefeller Pa...|             Fl

In [41]:
# Create user dataframe to match payment_info table
clean_payment_df = cleaned_df.select(["billing_id", "cc_encrypted"])
clean_payment_df.show()

+----------+--------------------+
|billing_id|        cc_encrypted|
+----------+--------------------+
|         2|e1a4f985f7607bbae...|
|         4|e1a4f985f7607bbae...|
|         5|e1a4f985f7607bbae...|
|         6|e1a4f985f7607bbae...|
|         8|e1a4f985f7607bbae...|
|        13|e1a4f985f7607bbae...|
|        14|e1a4f985f7607bbae...|
|        18|e1a4f985f7607bbae...|
|        19|e1a4f985f7607bbae...|
|        22|e1a4f985f7607bbae...|
|        25|e1a4f985f7607bbae...|
|        27|e1a4f985f7607bbae...|
|        28|e1a4f985f7607bbae...|
|        29|e1a4f985f7607bbae...|
|        32|e1a4f985f7607bbae...|
|        33|e1a4f985f7607bbae...|
|        35|e1a4f985f7607bbae...|
|        36|e1a4f985f7607bbae...|
|        37|e1a4f985f7607bbae...|
|        38|e1a4f985f7607bbae...|
+----------+--------------------+
only showing top 20 rows



Postgres Setup

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<connection string>:5432/<database-name>"
config = {"user":"postgres", 
          "password": "<password>", 
          "driver":"org.postgresql.Driver"}


In [None]:
# Write DataFrame to active_user table in RDS

clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
# Write dataframe to billing_info table in RDS

clean_billing_df.write.jdbc(url=jdbc_url, table='billing_info', mode=mode, properties=config)

In [None]:
# Write dataframe to payment_info table in RDS

clean_payment_df.write.jdbc(url=jdbc_url, table='payment_info', mode=mode, properties=config)