In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/u

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-03-26 16:23:41--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-03-26 16:23:41 (6.49 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()


In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://data-finalproject.s3.amazonaws.com/ID_rawdata.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get(""), sep=",", header=True, inferSchema=True)

In [5]:
df.show(5)

+--------+---------------+---------+-----+--------------------+-----------------------+-----------------+-----------------+--------------+---+----------+---------------+-------+------+------------+------+-----+---------+--------------+----------------+----------------+----------+------------+----------+----------+----------------+----------+-------------+----+---------+
|      ID|      timestamp|  company|level|               title|totalyearlycompensation|         location|yearsofexperience|yearsatcompany|tag|basesalary|stockgrantvalue|  bonus|gender|otherdetails|cityid|dmaid|rowNumber|Masters_Degree|Bachelors_Degree|Doctorate_Degree|Highschool|Some_College|Race_Asian|Race_White|Race_Two_Or_More|Race_Black|Race_Hispanic|Race|Education|
+--------+---------------+---------+-----+--------------------+-----------------------+-----------------+-----------------+--------------+---+----------+---------------+-------+------+------------+------+-----+---------+--------------+----------------+--

In [6]:
df.columns

['ID',
 'timestamp',
 'company',
 'level',
 'title',
 'totalyearlycompensation',
 'location',
 'yearsofexperience',
 'yearsatcompany',
 'tag',
 'basesalary',
 'stockgrantvalue',
 'bonus',
 'gender',
 'otherdetails',
 'cityid',
 'dmaid',
 'rowNumber',
 'Masters_Degree',
 'Bachelors_Degree',
 'Doctorate_Degree',
 'Highschool',
 'Some_College',
 'Race_Asian',
 'Race_White',
 'Race_Two_Or_More',
 'Race_Black',
 'Race_Hispanic',
 'Race',
 'Education']

In [7]:
salary_info_df = df.select(['ID','title', 'totalyearlycompensation','yearsofexperience','yearsatcompany','basesalary','stockgrantvalue','bonus']).drop_duplicates()
salary_info_df.show(5)

+--------+-----------------+-----------------------+-----------------+--------------+----------+---------------+---------+
|      ID|            title|totalyearlycompensation|yearsofexperience|yearsatcompany|basesalary|stockgrantvalue|    bonus|
+--------+-----------------+-----------------------+-----------------+--------------+----------+---------------+---------+
|FLP00639|Software Engineer|                 125000|              6.0|           3.0|    125000|            0.0|1000000.0|
|FLP01034|Software Engineer|                 130000|              2.0|           4.0|    130000|            0.0|      0.0|
|FLP01242|  Product Manager|                 150000|              4.0|           4.0|    125000|        10000.0|  15000.0|
|FLP01700|Software Engineer|                 180000|              4.0|           4.0|         0|            0.0|      0.0|
|FLP01737|Software Engineer|                 215000|              6.0|           5.0|         0|            0.0|      0.0|
+--------+------

In [8]:
Company_info_df = df.select(['ID','company', 'location','cityid']).drop_duplicates()
Company_info_df.show(5)

+--------+----------+-----------------+------+
|      ID|   company|         location|cityid|
+--------+----------+-----------------+------+
|FLP00256|    Google|Mountain View, CA|  7322|
|FLP00496|  Facebook|      Seattle, WA| 11527|
|FLP00590|Salesforce|San Francisco, CA|  7419|
|FLP01020|     Intel|  Santa Clara, CA|  7434|
|FLP01608|      eBay|     San Jose, CA|  7422|
+--------+----------+-----------------+------+
only showing top 5 rows



In [11]:
Employee_info_df = df.select(['ID', 'yearsofexperience','yearsatcompany','gender','Masters_Degree','Bachelors_Degree','Doctorate_Degree','Highschool']).drop_duplicates()
Employee_info_df.show(5)

+--------+-----------------+--------------+------+--------------+----------------+----------------+----------+
|      ID|yearsofexperience|yearsatcompany|gender|Masters_Degree|Bachelors_Degree|Doctorate_Degree|Highschool|
+--------+-----------------+--------------+------+--------------+----------------+----------------+----------+
|FLP00548|             15.0|           1.0|  Male|             0|               0|               0|         0|
|FLP00600|              4.0|           0.1|  Male|             0|               0|               0|         0|
|FLP00964|              3.0|           1.0|    NA|             0|               0|               0|         0|
|FLP01003|             11.0|           6.0|    NA|             0|               0|               0|         0|
|FLP01066|              2.0|           2.0|    NA|             0|               0|               0|         0|
+--------+-----------------+--------------+------+--------------+----------------+----------------+----------+
o

In [None]:
# salary_by_location_df = df.select(['title', 'state', 'city', 'totalyearlycompensation']).drop_duplicates()
# salary_by_location_df.show(5)

+--------------+-----+-------------+-----------------------+
|         title|state|         city|totalyearlycompensation|
+--------------+-----+-------------+-----------------------+
|Data Scientist|   CA|    San Ramon|                 142000|
|Data Scientist|   CA|    Sunnyvale|                 130000|
|Data Scientist|   WA|      Redmond|                 320000|
|Data Scientist|   NY|     New York|                 284000|
|Data Scientist|   CA|San Francisco|                 285000|
+--------------+-----+-------------+-----------------------+
only showing top 5 rows



In [None]:
# salary_by_education_df = df.select(['title', 'Masters_Degree', 'Bachelors_Degree', 'totalyearlycompensation']).drop_duplicates()
# salary_by_education_df.show(5)

+--------------+--------------+----------------+-----------------------+
|         title|Masters_Degree|Bachelors_Degree|totalyearlycompensation|
+--------------+--------------+----------------+-----------------------+
|Data Scientist|             0|               0|                 235000|
|Data Scientist|             0|               0|                 325000|
|Data Scientist|             0|               0|                 141000|
|Data Scientist|             1|               0|                 108000|
|Data Scientist|             1|               0|                 390000|
+--------------+--------------+----------------+-----------------------+
only showing top 5 rows



In [10]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://final-project.cgernf5ijfvg.us-east-1.rds.amazonaws.com/postgres"
config = {"user":"postgres", 
          "password": "finalproject1", 
          "driver":"org.postgresql.Driver"}

In [29]:
# salary_by_title_df.write.jdbc(url=jdbc_url, table="salary_by_title", mode=mode, properties=config)

In [None]:
# salary_by_title_and_gender_df.write.jdbc(url=jdbc_url, table="salary_by_title_and_gender", mode=mode, properties=config)

In [None]:
# salary_by_experience_df.write.jdbc(url=jdbc_url, table="salary_by_experience", mode=mode, properties=config)

In [None]:
# salary_by_location_df.write.jdbc(url=jdbc_url, table="salary_by_location", mode=mode, properties=config)

In [None]:
# salary_by_education_df.write.jdbc(url=jdbc_url, table="salary_by_education", mode=mode, properties=config)

In [16]:
salary_info_df.write.jdbc(url=jdbc_url, table="Salary_info", mode=mode, properties=config)
salary_info_df.show()

+--------+-----------------+-----------------------+-----------------+--------------+----------+---------------+---------+
|      ID|            title|totalyearlycompensation|yearsofexperience|yearsatcompany|basesalary|stockgrantvalue|    bonus|
+--------+-----------------+-----------------------+-----------------+--------------+----------+---------------+---------+
|FLP00639|Software Engineer|                 125000|              6.0|           3.0|    125000|            0.0|1000000.0|
|FLP01034|Software Engineer|                 130000|              2.0|           4.0|    130000|            0.0|      0.0|
|FLP01242|  Product Manager|                 150000|              4.0|           4.0|    125000|        10000.0|  15000.0|
|FLP01700|Software Engineer|                 180000|              4.0|           4.0|         0|            0.0|      0.0|
|FLP01737|Software Engineer|                 215000|              6.0|           5.0|         0|            0.0|      0.0|
|FLP01769|Softwa

In [14]:
Company_info_df.write.jdbc(url=jdbc_url, table="Company_info", mode=mode, properties=config)
Company_info_df

DataFrame[ID: string, company: string, location: string, cityid: string]

In [15]:
Employee_info_df.write.jdbc(url=jdbc_url, table="Employee_info", mode=mode, properties=config)
Employee_info_df

DataFrame[ID: string, yearsofexperience: double, yearsatcompany: double, gender: string, Masters_Degree: string, Bachelors_Degree: string, Doctorate_Degree: string, Highschool: string]

In [22]:
salary_info_df = salary_info_df.alias('salary_info_df')
Employee_info_df = Employee_info_df.alias('Employee_info_df')
Company_info_df = Company_info_df.alias('Company_info_df')

In [33]:
from pyspark.sql.functions import col

Joined_df = salary_info_df.join(Employee_info_df, col('salary_info_df.ID') == col('Employee_info_df.ID'), 'inner') \
.join(Company_info_df, col('salary_info_df.ID') == col('Company_info_df.ID'), 'inner') 

In [37]:
clean_df=Joined_df.drop("ID")

In [38]:
clean_df.show()

+--------------------+-----------------------+-----------------+--------------+----------+---------------+-------+-----------------+--------------+------+--------------+----------------+----------------+----------+----------+-----------------+------+
|               title|totalyearlycompensation|yearsofexperience|yearsatcompany|basesalary|stockgrantvalue|  bonus|yearsofexperience|yearsatcompany|gender|Masters_Degree|Bachelors_Degree|Doctorate_Degree|Highschool|   company|         location|cityid|
+--------------------+-----------------------+-----------------+--------------+----------+---------------+-------+-----------------+--------------+------+--------------+----------------+----------------+----------+----------+-----------------+------+
|   Software Engineer|                 215000|             15.0|           1.0|    165000|       200000.0|50000.0|             15.0|           1.0|  Male|             0|               0|               0|         0| Microsoft|      Seattle, WA| 115