In [0]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [0]:
# Read in CSV from S3

from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
url = "https://<bucket name>.s3.amazonaws.com/employee.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("employee.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
df.show(10)

In [0]:
# Drop duplicates and incomplete rows
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

In [0]:
# Examine the schema
df.printSchema()

In [0]:
# Rename columns
df1 = df.withColumnRenamed("Employee ID", "employee_id") \
        .withColumnRenamed("Email", "email") \
        .withColumnRenamed("Gender", "gender") \
        .withColumnRenamed("Hire Date", "hire_date") \
        .withColumnRenamed("DOB", "dob") \
        .withColumnRenamed("Encrypted Password", "password")
df1.show(5)  

In [0]:
# Create a new DataFrame for employee info
employee_personal_info = df1.select(["employee_id", "email", "gender", "hire_date", "dob"])
employee_personal_info.show(5)

In [0]:
# Write DataFrame to RDS
mode="append"
jdbc_url = "jdbc:postgresql://<insert endpoint>:5432/my_data_class_db"
config = {"user":"root",
          "password": "<insert password>",
          "driver":"org.postgresql.Driver"}

employee_personal_info.write.jdbc(url=jdbc_url, table='employee_personal_info', mode=mode, properties=config)

In [0]:
# Create a new DataFrame for employee passwords
employee_password = df1.select(["employee_id", "password"])
employee_password.show(5)

In [0]:
# Write DataFrame to RDS
employee_password.write.jdbc(url=jdbc_url, table='employee_password', mode=mode, properties=config)