In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [817 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/deadsnake

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-07-22 02:42:23--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-07-22 02:42:24 (5.42 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
# Setup pyspark session & imports
from pyspark import SparkFiles
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# Import x-ray data csv file from GitHub.

url_date = "https://raw.githubusercontent.com/emilybstevens/CXR-ML/main/Resources/csv/Data_Entry_2017.csv"

spark.sparkContext.addFile(url_date)
data_df = spark.read.csv(SparkFiles.get("Data_Entry_2017.csv"), sep=",", header=True, inferSchema=True)

data_df.show()

+----------------+--------------------+-----------+----------+-----------+--------------+-------------+-------------------+-------+---------------------------+-----+----+
|     Image Index|      Finding Labels|Follow-up #|Patient ID|Patient Age|Patient Gender|View Position|OriginalImage[Width|Height]|OriginalImagePixelSpacing[x|   y]|_c11|
+----------------+--------------------+-----------+----------+-----------+--------------+-------------+-------------------+-------+---------------------------+-----+----+
|00000001_000.png|        Cardiomegaly|          0|         1|         58|             M|           PA|               2682|   2749|                      0.143|0.143|null|
|00000001_001.png|Cardiomegaly|Emph...|          1|         1|         58|             M|           PA|               2894|   2729|                      0.143|0.143|null|
|00000001_002.png|Cardiomegaly|Effu...|          2|         1|         58|             M|           PA|               2500|   2048|              

In [5]:
# Drop _c11 column 

data_entry_df = data_df[["Image Index", "Finding Labels", "Follow-up #", "Patient ID", "Patient Age", "Patient Gender", "View Position", "OriginalImage[Width", "Height]", "OriginalImagePixelSpacing[x", "y]"]]
data_entry_df.show()

+----------------+--------------------+-----------+----------+-----------+--------------+-------------+-------------------+-------+---------------------------+-----+
|     Image Index|      Finding Labels|Follow-up #|Patient ID|Patient Age|Patient Gender|View Position|OriginalImage[Width|Height]|OriginalImagePixelSpacing[x|   y]|
+----------------+--------------------+-----------+----------+-----------+--------------+-------------+-------------------+-------+---------------------------+-----+
|00000001_000.png|        Cardiomegaly|          0|         1|         58|             M|           PA|               2682|   2749|                      0.143|0.143|
|00000001_001.png|Cardiomegaly|Emph...|          1|         1|         58|             M|           PA|               2894|   2729|                      0.143|0.143|
|00000001_002.png|Cardiomegaly|Effu...|          2|         1|         58|             M|           PA|               2500|   2048|                      0.168|0.168|
|000

In [6]:
# Import mortality data csv file from GitHub.

url_mortality = "https://raw.githubusercontent.com/emilybstevens/CXR-ML/main/Resources/Mortality_Rate_Classification.csv"

spark.sparkContext.addFile(url_mortality)
mortality_df = spark.read.csv(SparkFiles.get("Mortality_Rate_Classification.csv"), sep=",", header=True, inferSchema=True)

mortality_df.show()

+------------------+----------+-------------------------+-----------+
|         Condition|     Class|30-Day Mortality Rate (%)|Triage Rank|
+------------------+----------+-------------------------+-----------+
|       Atelectasis|  Emergent|                     14.0|          a|
|      Cardiomegaly|   Chronic|                     null|          j|
|     Consolidation|  Emergent|                     35.0|          b|
|             Edema|  Emergent|                     46.0|          c|
|          Effusion|  Emergent|                     15.0|          d|
|         Emphysema|   Chronic|                     null|          k|
|          Fibrosis|   Chronic|                     null|          l|
|            Hernia|     Acute|                     null|          g|
|      Infiltration|  Emergent|                     null|          f|
|              Mass|     Acute|                      NaN|          h|
|        No Finding|No Finding|                      NaN|          o|
|            Nodule|

In [7]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://finalprojectaws.cqzwtqfkpisz.us-east-1.rds.amazonaws.com:5432/my_aws_db"
config = {"user":"group4", 
          "password": "group4%%", 
          "driver":"org.postgresql.Driver"}

In [10]:
# Write data_entry_2007_df to table in RDS
data_entry_df.write.jdbc(url=jdbc_url, table='Data_Entry_2007', mode=mode, properties=config)

In [9]:
# Write mortality_rate_class_df to table in RDS
mortality_df.write.jdbc(url=jdbc_url, table='Mortality_Rate_Classification', mode=mode, properties=config)