In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!pip install pyspark

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 57.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=a80bdd63710916b7381eb8a770ae87eeae8826cc7ad741c9f3462106e8953aec
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Succ

# Read in CSV from S3

In [0]:
from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
url = "https://314159.s3.us-east-2.amazonaws.com/employee.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("employee.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")

# Drop duplicates and incomplete rows

In [3]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

292
291
289


# Examine the schema

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Employee ID: integer (nullable = true)
 |-- Email: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- DOB: timestamp (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Hire Date: timestamp (nullable = true)
 |-- Salaried: integer (nullable = true)
 |-- Vacation Hours: integer (nullable = true)
 |-- Sick Leave Hours: integer (nullable = true)
 |-- Encrypted Password: string (nullable = true)
 |-- Modified: string (nullable = true)



# Rename Columns

In [5]:
df1 = df.withColumnRenamed("Employee ID", "employee_id") \
        .withColumnRenamed("Email", "email") \
        .withColumnRenamed("Gender", "gender") \
        .withColumnRenamed("Hire Date", "hire_date") \
        .withColumnRenamed("DOB", "dob") \
        .withColumnRenamed("Encrypted Password", "password")
df1.show(5) 

+---+-----------+-------+--------------------+-------------------+--------------+------+-------------------+--------+--------------+----------------+--------------------+------------+
|_c0|employee_id|  email|            Position|                dob|Marital Status|gender|          hire_date|Salaried|Vacation Hours|Sick Leave Hours|            password|    Modified|
+---+-----------+-------+--------------------+-------------------+--------------+------+-------------------+--------+--------------+----------------+--------------------+------------+
|134|   45615666|  eric0|Production Superv...|1985-01-19 00:01:00|             M|     M|2009-01-14 00:01:00|       0|            40|              40|2ABCF39B-88D7-49F...|6/30/14 0:00|
|236|  476980013| grant0|Human Resources A...|1976-01-16 00:04:00|             S|     M|2009-01-25 00:02:00|       0|            53|              46|4511ABBA-7F34-400...|6/30/14 0:00|
| 31|  750246141|margie0|Production Techni...|1986-01-20 00:05:00|             M

# Create a new DataFrame for employee info

In [6]:
employee_personal_info = df1.select(["employee_id", "email", "gender", "hire_date", "dob"])
employee_personal_info.show(5)

+-----------+-------+------+-------------------+-------------------+
|employee_id|  email|gender|          hire_date|                dob|
+-----------+-------+------+-------------------+-------------------+
|   45615666|  eric0|     M|2009-01-14 00:01:00|1985-01-19 00:01:00|
|  476980013| grant0|     M|2009-01-25 00:02:00|1976-01-16 00:04:00|
|  750246141|margie0|     F|2009-01-04 00:01:00|1986-01-20 00:05:00|
|  621932914|stefen0|     M|2009-01-28 00:02:00|1975-01-21 00:12:00|
|  322160340|  lane0|     M|2009-01-11 00:01:00|1974-01-23 00:09:00|
+-----------+-------+------+-------------------+-------------------+
only showing top 5 rows



# Write DataFrame to RDS

In [0]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://mypostgresdb.cxyohgqc3pno.us-east-2.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"root",
          "password": "password",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to table

employee_personal_info.write.jdbc(url=jdbc_url, table='employee_personal_info', mode=mode, properties=config)

# Create a new DataFrame for employee passwords

In [9]:
employee_password = df1.select(["employee_id", "password"])
employee_password.show(5)

+-----------+--------------------+
|employee_id|            password|
+-----------+--------------------+
|   45615666|2ABCF39B-88D7-49F...|
|  476980013|4511ABBA-7F34-400...|
|  750246141|352D6E2F-655B-49A...|
|  621932914|B9EEBF9F-F694-4BC...|
|  322160340|042427B8-3883-4A8...|
+-----------+--------------------+
only showing top 5 rows



# Write DataFrame to RDS

In [0]:
employee_password.write.jdbc(url=jdbc_url, table='employee_password', mode=mode, properties=config)