# Cloud ETL with S3, PySpark, and RDS

### Instructions

* As the sole data person at your new company, you have been tasked with cleaning the data from an Excel spreadsheet—which has been exported as a CSV—and creating an SQL database with this data. In other words, you will be performing ETL.

* Take a moment to review the SQL table schemata, which reflect the requirements for the company's new database. Your company will be using S3 for file storage and RDS to host SQL databases.

* Then, complete the following tasks:

  * Upload the *employees.csv* file in the *Resources* folder to S3. **Note:** be sure to make the S3 bucket public.

  * Upload the unsolved jupyter notebook to Colab and use Spark to clean and transform the data.

  * With the *schema.sql* file from the *Resources* folder, use pgAdmin to create the table schemata in RDS.

  * Load the data from Pandas DataFrames into RDS.

### Attribution

The dataset was sourced from [https://github.com/Microsoft/sql-server-samples](https://github.com/Microsoft/sql-server-samples).


### Install Java, Spark, and Findspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

/bin/sh: apt-get: command not found


### Install the PostgreSQL driver

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

### Set Environment Variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

### Start a SparkSession

In [None]:
import findspark
findspark.init()

### Create a pySpark app

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

### Read the *employee.csv* data from AWS S3

In [None]:
from pyspark import SparkFiles

# Load in employee.csv from S3 into a DataFrame
url = "https://<bucket name>.s3.amazonaws.com/employee.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("employee.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
df.show(10)

### Drop duplicates and incomplete rows

In [None]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

### Display the schema

In [None]:
df.printSchema()

### Rename columns

In [None]:
df1 = df.withColumnRenamed("Employee ID", "employee_id") \
        .withColumnRenamed("Email", "email") \
        .withColumnRenamed("Gender", "gender") \
        .withColumnRenamed("Hire Date", "hire_date") \
        .withColumnRenamed("DOB", "dob") \
        .withColumnRenamed("Encrypted Password", "password")
df1.show(5)

### Create a new DataFrame for employee info

In [None]:
employee_personal_info = df1.select(["employee_id", "email", "gender", "hire_date", "dob"])
employee_personal_info.show(5)

### Create a new DataFrame for employee passwords

In [None]:
employee_password = df1.select(["employee_id", "password"])
employee_password.show(5)

### Configure settings for RDS

In [None]:
mode="append"
jdbc_url = "jdbc:postgresql://<insert endpoint>:5432/my_data_class_db"
config = {"user":"root",
          "password": "<insert password>",
          "driver":"org.postgresql.Driver"}

### Write the cleaned employee data to the *employee_personal_info* table in RDS

In [None]:
employee_personal_info.write.jdbc(url=jdbc_url, table='employee_personal_info', mode=mode, properties=config)

### Write the employee password DataFrame to the *employee_password* table

In [None]:
employee_password.write.jdbc(url=jdbc_url, table='employee_password', mode=mode, properties=config)