In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.39                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [2 InRelease gpgv 242 kB] [Waiting for headers] [Waiting for headers] [Waiti                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [2 InRelease gpgv 242 kB] [Waiting for headers] [Waiting for headers] [Waiti                                                                               Hit:4 http://s

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-10-20 22:02:04--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-10-20 22:02:04 (4.52 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

Extract

In [3]:
from pyspark import SparkFiles

# user data
user_data_url = "https://avq-dataviz.s3.amazonaws.com/user_data.csv"
spark.sparkContext.addFile(user_data_url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)
user_data_df.show(5)
# payment data
user_payment_url = "https://avq-dataviz.s3.amazonaws.com/user_payment.csv"
spark.sparkContext.addFile(user_payment_url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)
user_payment_df.show(5)


+---+----------+---------+-----------+-------------------+--------------+---------+
| id|first_name|last_name|active_user|     street_address|         state| username|
+---+----------+---------+-----------+-------------------+--------------+---------+
|  1|    Cletus|  Lithcow|      false|78309 Riverside Way|      Virginia|ibearham0|
|  2|       Caz|   Felgat|      false|83 Hazelcrest Place|       Alabama| wwaller1|
|  3|     Kerri|  Crowson|      false|     112 Eliot Pass|North Carolina|ichesnut2|
|  4|   Freddie|    Caghy|      false|    15 Merchant Way|      New York|  tsnarr3|
|  5|   Sadella|    Deuss|      false|   079 Acker Avenue|     Tennessee|fwherrit4|
+---+----------+---------+-----------+-------------------+--------------+---------+
only showing top 5 rows

+----------+---------+--------------------+
|billing_id| username|        cc_encrypted|
+----------+---------+--------------------+
|         1|ibearham0|a799fcafe47d7fb19...|
|         2| wwaller1|a799fcafe47d7fb19...|

In [4]:
# join data, drop null values and filter for active users
joined_df = user_data_df.join(user_payment_df, on="username", how="inner")
dropna_df = joined_df.dropna()

from pyspark.sql.functions import col
cleaned_df = dropna_df.filter(col("active_user") == True)
cleaned_df.show(5)

+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|  fstappard5|  6|    Fraser|  Korneev|       true|  76084 Novick Court|           Minnesota|         6|a799fcafe47d7fb19...|
|  lhambling6|  7|    Demott|   Rapson|       true|    86320 Dahle Park|District of Columbia|         7|a799fcafe47d7fb19...|
|   wheinerte| 15|   Sadella|    Jaram|       true|7528 Waxwing Terrace|         Connecticut|        15|a799fcafe47d7fb19...|
|droughsedgeg| 17|    Hewitt|  Trammel|       true|    2455 Corry Alley|      North Carolina|        17|a799fcafe47d7fb19...|
|   ydudeniei| 19|       Ted|  Knowlys|       true|      31 South Drive|                Ohio|        19|a799fcafe47d7f

Match Database Schema

In [5]:
clean_user_df = cleaned_df.select(
    ["id", "first_name", "last_name", "username"]
)
clean_user_df.show(5)
clean_billing_df = cleaned_df.select(
    ["billing_id", "street_address", "state", "username"]
)
clean_billing_df.show(5)
clean_payment_df = cleaned_df.select(
    ["billing_id", "cc_encrypted"]
)
clean_payment_df.show(5)

+---+----------+---------+------------+
| id|first_name|last_name|    username|
+---+----------+---------+------------+
|  6|    Fraser|  Korneev|  fstappard5|
|  7|    Demott|   Rapson|  lhambling6|
| 15|   Sadella|    Jaram|   wheinerte|
| 17|    Hewitt|  Trammel|droughsedgeg|
| 19|       Ted|  Knowlys|   ydudeniei|
+---+----------+---------+------------+
only showing top 5 rows

+----------+--------------------+--------------------+------------+
|billing_id|      street_address|               state|    username|
+----------+--------------------+--------------------+------------+
|         6|  76084 Novick Court|           Minnesota|  fstappard5|
|         7|    86320 Dahle Park|District of Columbia|  lhambling6|
|        15|7528 Waxwing Terrace|         Connecticut|   wheinerte|
|        17|    2455 Corry Alley|      North Carolina|droughsedgeg|
|        19|      31 South Drive|                Ohio|   ydudeniei|
+----------+--------------------+--------------------+------------+
onl

Connect to Databse

getpass prompts for the password when run

In [6]:
from getpass import getpass
password = getpass('Enter database password')
# Configure settings for RDS
mode = "append"
conn = "dataviz.cvj7xeynbmtt.us-east-1.rds.amazonaws.com"
dbname = "my_data_class_db"
jdbc_url = f"jdbc:postgresql://{conn}:5432/{dbname}"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

Enter database password··········


In [7]:
clean_user_df.write.jdbc(
    url=jdbc_url,
    table='active_user',
    mode=mode,
    properties=config
)
clean_billing_df.write.jdbc(
    url=jdbc_url,
    table='billing_info',
    mode=mode,
    properties=config
)
clean_payment_df.write.jdbc(
    url=jdbc_url,
    table='payment_info',
    mode=mode,
    properties=config
)