In [2]:
# Load Amazon S3 data into Spark dataframe


import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
Get:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Get:7 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Hit:10 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  Packages [973 kB]
Hit:12 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Get:13 http://security.ubuntu.com/ubuntu focal-security/restricted amd64 Packages

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-04-11 15:05:11--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-04-11 15:05:12 (6.32 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final-Project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [5]:
# Read data from S3 bucket

from pyspark import SparkFiles
url = "https://highered-bucket.s3.amazonaws.com/highered_merged_data.csv"
spark.sparkContext.addFile(url)
highered_data_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("highered_merged_data.csv"), header=True, inferSchema=True)
highered_data_df.show()

+----+------+-------+-----+-------+-------+-------+--------------------+------+------+-----+-------+-------+--------+----------+---------+
|Year|UNITID|FACSTAT|ARANK|HRTOTLT|HRTOTLM|HRTOTLW|              INSTNM|  CITY|STABBR|  ZIP|CONTROL|HLOFFER|INSTSIZE|  LONGITUD| LATITUDE|
+----+------+-------+-----+-------+-------+-------+--------------------+------+------+-----+-------+-------+--------+----------+---------+
|2019|100654|      0|    0|    242|    131|    111|Alabama A & M Uni...|Normal|    AL|35762|      1|      9|       3|-86.568502|34.783368|
|2019|100654|     10|    0|    242|    131|    111|Alabama A & M Uni...|Normal|    AL|35762|      1|      9|       3|-86.568502|34.783368|
|2019|100654|     10|    1|     49|     37|     12|Alabama A & M Uni...|Normal|    AL|35762|      1|      9|       3|-86.568502|34.783368|
|2019|100654|     10|    2|     50|     34|     16|Alabama A & M Uni...|Normal|    AL|35762|      1|      9|       3|-86.568502|34.783368|
|2019|100654|     10|    3|

In [49]:
# Create the appointment_type table
duplicates_df = highered_data_df.select(["UNITID", "ARANK", "HRTOTLT"]).drop_duplicates()
duplicates_df.show(10)

+------+-----+-------+
|UNITID|ARANK|HRTOTLT|
+------+-----+-------+
|101541|    2|      3|
|101587|    0|     36|
|106245|    3|     10|
|106342|    3|      6|
|107725|    0|      2|
|107877|    0|     14|
|110565|    0|    147|
|110574|    0|    193|
|110653|    3|    239|
|112260|    1|      3|
+------+-----+-------+
only showing top 10 rows



In [50]:
# Create the appointment_type table
appointment_type_df = duplicates_df.groupby("ARANK").agg({"HRTOTLT":"sum"}).withColumnRenamed("sum(HRTOTLT)" , "HRTOTLT")
appointment_type_df.show(10)

+-----+-------+
|ARANK|HRTOTLT|
+-----+-------+
|    1| 962995|
|    6| 194936|
|    3|1091593|
|    5| 238036|
|    4| 463237|
|    2| 908351|
|    0|4884597|
+-----+-------+



In [52]:
# Create the faculty_status table
faculty_duplicates_df = highered_data_df.select(["UNITID", "FACSTAT", "HRTOTLT"]).drop_duplicates()
faculty_duplicates_df.show(10)

+------+-------+-------+
|UNITID|FACSTAT|HRTOTLT|
+------+-------+-------+
|100654|     20|     49|
|100690|     42|      3|
|101541|     20|      4|
|101602|     30|     15|
|101879|     40|      2|
|102377|     42|      2|
|105297|     10|     10|
|106412|     20|     19|
|106412|     40|     54|
|107512|     20|     22|
+------+-------+-------+
only showing top 10 rows



In [54]:
# Create the faculty_status table
faculty_status_df = faculty_duplicates_df.groupby("FACSTAT").agg({"HRTOTLT":"sum"}).withColumnRenamed("sum(HRTOTLT)" , "HRTOTLT")
faculty_status_df.show(10)

+-------+-------+
|FACSTAT|HRTOTLT|
+-------+-------+
|     44| 403529|
|     20|1638174|
|     40|1530471|
|     41| 607436|
|     43|  56728|
|     10|3922719|
|     50|  30510|
|     45| 200416|
|     42| 840200|
|     30| 663558|
+-------+-------+
only showing top 10 rows



In [7]:
# Create the institution size table
institution_size_df = highered_data_df.select(["Year", "UNITID", "HRTOTLT", "HRTOTLM", "HRTOTLW", "INSTSIZE"]).drop_duplicates()
institution_size_df.show(10)

+----+------+-------+-------+-------+--------+
|Year|UNITID|HRTOTLT|HRTOTLM|HRTOTLW|INSTSIZE|
+----+------+-------+-------+-------+--------+
|2019|100663|    397|    205|    192|       5|
|2019|100751|    408|    231|    177|       5|
|2019|102049|     49|     14|     35|       3|
|2019|102313|     31|     18|     13|       2|
|2019|104179|    781|    536|    245|       5|
|2019|104717|     89|     41|     48|       5|
|2019|107600|     20|      8|     12|       1|
|2019|107974|      8|      2|      6|       2|
|2019|107983|     41|     24|     17|       2|
|2019|110404|      2|      1|      1|       2|
+----+------+-------+-------+-------+--------+
only showing top 10 rows



In [22]:
# Create the gender table
gender_df = highered_data_df.select(["Year", "UNITID", "HRTOTLT", "HRTOTLM", "HRTOTLW"]).drop_duplicates()
gender_df.show(10)

+----+------+-------+-------+-------+
|Year|UNITID|HRTOTLT|HRTOTLM|HRTOTLW|
+----+------+-------+-------+-------+
|2019|101435|      3|      2|      1|
|2019|101693|      1|      1|      0|
|2019|104708|    170|     81|     89|
|2019|106467|     74|     46|     28|
|2019|109350|    162|     93|     69|
|2019|110510|    125|     59|     66|
|2019|111948|     19|     10|      9|
|2019|113218|     10|      6|      4|
|2019|114859|    227|    115|    112|
|2019|117627|      6|      2|      4|
+----+------+-------+-------+-------+
only showing top 10 rows



In [32]:
# Connect to the AWS RDS and write the DataFrames to their tables
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://highered-db.cx2ll8zuuepz.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "db_password", 
          "driver":"org.postgresql.Driver"}

In [51]:
# Write appointment_type_df to table in RDS
appointment_type_df.write.jdbc(url=jdbc_url, table='appointment_type', mode=mode, properties=config)

In [34]:
# Write highered_data_df to table in RDS
highered_data_df.write.jdbc(url=jdbc_url, table='highered_data_table', mode=mode, properties=config)

In [36]:
# Write institution_size_df to table in RDS
institution_size_df.write.jdbc(url=jdbc_url, table='institution_table', mode=mode, properties=config)

In [37]:
# Write gender_df to table in RDS
gender_df.write.jdbc(url=jdbc_url, table='gender_table', mode=mode, properties=config)

In [55]:
# Write faculty_status_df to table in RDS
faculty_status_df.write.jdbc(url=jdbc_url, table='faculty_status_table', mode=mode, properties=config)