In [1]:
# Dependencies
import pandas as pd

In [2]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [913 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-09-30 01:50:10--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-09-30 01:50:11 (4.90 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [26]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Recidivism_Prediction_Cloud").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [27]:
from pyspark import SparkFiles
url = "https://vasudha-bucket.s3.ca-central-1.amazonaws.com/recidivism_data.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get(""), sep=",", header=True, inferSchema=True)
df.show()

+--------------------+-------------------------+-------------------------+--------------------+--------------------+---------------+------+----------------------+------------+---------------+----------------+--------------+---------------+--------------------------+----------------+--------------------+-----------------+
|Fiscal Year Released|Recidivism Reporting Year|Main Supervising District|        Release Type|    Race - Ethnicity|Age At Release |   Sex|Offense Classification|Offense Type|Offense Subtype|Return to Prison|Days to Return|Recidivism Type|New Offense Classification|New Offense Type|New Offense Sub Type|Target Population|
+--------------------+-------------------------+-------------------------+--------------------+--------------------+---------------+------+----------------------+------------+---------------+----------------+--------------+---------------+--------------------------+----------------+--------------------+-----------------+
|                2010|         

In [28]:
raw_df = df.toPandas()

In [29]:
# Dropping unnecessary columns
raw_df = raw_df.drop(['Recidivism Reporting Year', 'Offense Subtype', 'Main Supervising District', 'Target Population'], axis = 1)
raw_df.head()

Unnamed: 0,Fiscal Year Released,Release Type,Race - Ethnicity,Age At Release,Sex,Offense Classification,Offense Type,Return to Prison,Days to Return,Recidivism Type,New Offense Classification,New Offense Type,New Offense Sub Type
0,2010,Parole,Black - Non-Hispanic,25-34,Male,C Felony,Violent,Yes,433.0,New,C Felony,Drug,Trafficking
1,2010,Discharged – End of Sentence,White - Non-Hispanic,25-34,Male,D Felony,Property,Yes,453.0,Tech,,,
2,2010,Parole,White - Non-Hispanic,35-44,Male,B Felony,Drug,Yes,832.0,Tech,,,
3,2010,Parole,White - Non-Hispanic,25-34,Male,B Felony,Other,No,,No Recidivism,,,
4,2010,Discharged – End of Sentence,Black - Non-Hispanic,35-44,Male,D Felony,Violent,Yes,116.0,Tech,,,


In [30]:
# Dropping rows with blanks in race column
import numpy as np
raw_df["Race - Ethnicity"].replace("", np.nan, inplace=True)
raw_df = raw_df.dropna(subset=["Race - Ethnicity"])

In [31]:
raw_df.head()

Unnamed: 0,Fiscal Year Released,Release Type,Race - Ethnicity,Age At Release,Sex,Offense Classification,Offense Type,Return to Prison,Days to Return,Recidivism Type,New Offense Classification,New Offense Type,New Offense Sub Type
0,2010,Parole,Black - Non-Hispanic,25-34,Male,C Felony,Violent,Yes,433.0,New,C Felony,Drug,Trafficking
1,2010,Discharged – End of Sentence,White - Non-Hispanic,25-34,Male,D Felony,Property,Yes,453.0,Tech,,,
2,2010,Parole,White - Non-Hispanic,35-44,Male,B Felony,Drug,Yes,832.0,Tech,,,
3,2010,Parole,White - Non-Hispanic,25-34,Male,B Felony,Other,No,,No Recidivism,,,
4,2010,Discharged – End of Sentence,Black - Non-Hispanic,35-44,Male,D Felony,Violent,Yes,116.0,Tech,,,


In [38]:
# Dropping rows with "N/A -" in race column
raw_df.drop(raw_df.loc[raw_df["Race - Ethnicity"] == "N/A -"].index, inplace=True)

In [33]:
# Simplifying race/ethnic categories
raw_df.loc[(raw_df["Race - Ethnicity"] == "White - Hispanic"), "Race - Ethnicity"] = "Hispanic"
raw_df.loc[(raw_df["Race - Ethnicity"] == "White - Non-Hispanic"), "Race - Ethnicity"] = "White"
raw_df.loc[(raw_df["Race - Ethnicity"] == "Black - Hispanic"), "Race - Ethnicity"] = "Hispanic"
raw_df.loc[(raw_df["Race - Ethnicity"] == "Black - Non-Hispanic"), "Race - Ethnicity"] = "Black"
raw_df.loc[(raw_df["Race - Ethnicity"] == "Asian or Pacific Islander - Hispanic"), "Race - Ethnicity"] = "Hispanic"
raw_df.loc[(raw_df["Race - Ethnicity"] == "Asian or Pacific Islander - Non-Hispanic"), "Race - Ethnicity"] = "Asian"
raw_df.loc[(raw_df["Race - Ethnicity"] == "American Indian or Alaska Native - Hispanic"), "Race - Ethnicity"] = "Hispanic"
raw_df.loc[(raw_df["Race - Ethnicity"] == "American Indian or Alaska Native - Non-Hispanic"), "Race - Ethnicity"] = "Native"
raw_df.loc[(raw_df["Race - Ethnicity"] == "White -"), "Race - Ethnicity"] = "White"
raw_df.loc[(raw_df["Race - Ethnicity"] == "Black -"), "Race - Ethnicity"] = "Black"

In [40]:
# Combining "Paroled at Detainer" columns into single column
raw_df.loc[(raw_df["Release Type"] == "Paroled to Detainer - INS"), "Release Type"] = "Paroled to Detainer"
raw_df.loc[(raw_df["Release Type"] == "Paroled to Detainer - Iowa"), "Release Type"] = "Paroled to Detainer"
raw_df.loc[(raw_df["Release Type"] == "Paroled to Detainer - Out of State"), "Release Type"] = "Paroled to Detainer"
raw_df.loc[(raw_df["Release Type"] == "Paroled to Detainer - U.S. Marshall"), "Release Type"] = "Paroled to Detainer"
raw_df['Release Type'] = raw_df['Release Type'].fillna('Other')
raw_df = raw_df.rename({"Fiscal Year Released": "Year Released", "Race - Ethnicity": "Race", "Age At Release ": "Age", "Convicting Offense Classification": "Offense Classification", "Convicting Offense Type": "Offense Type", "Recidivism - Return to Prison numeric": "Recidivism"}, axis=1)

In [41]:
spark_df = spark.createDataFrame(raw_df)
spark_df.show()

+-------------+--------------------+-----+--------+------+----------------------+------------+----------------+--------------+---------------+--------------------------+----------------+--------------------+
|Year Released|        Release Type| Race|     Age|   Sex|Offense Classification|Offense Type|Return to Prison|Days to Return|Recidivism Type|New Offense Classification|New Offense Type|New Offense Sub Type|
+-------------+--------------------+-----+--------+------+----------------------+------------+----------------+--------------+---------------+--------------------------+----------------+--------------------+
|         2010|              Parole|Black|   25-34|  Male|              C Felony|     Violent|             Yes|           433|            New|                  C Felony|            Drug|         Trafficking|
|         2010|Discharged – End ...|White|   25-34|  Male|              D Felony|    Property|             Yes|           453|           Tech|                      null

In [43]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import monotonically_increasing_id
df1 = spark_df.withColumn("ID", row_number().over(Window.orderBy(monotonically_increasing_id())))

In [44]:
prisoner_df = df1.select(['ID','Year Released','Release Type', 'Age','Race','Sex'])
prisoner_df.show()

+---+-------------+--------------------+--------+-----+------+
| ID|Year Released|        Release Type|     Age| Race|   Sex|
+---+-------------+--------------------+--------+-----+------+
|  1|         2010|              Parole|   25-34|Black|  Male|
|  2|         2010|Discharged – End ...|   25-34|White|  Male|
|  3|         2010|              Parole|   35-44|White|  Male|
|  4|         2010|              Parole|   25-34|White|  Male|
|  5|         2010|Discharged – End ...|   35-44|Black|  Male|
|  6|         2010|              Parole|   25-34|White|  Male|
|  7|         2010|              Parole|   25-34|White|  Male|
|  8|         2010|              Parole|   35-44|Black|  Male|
|  9|         2010|              Parole|   25-34|White|  Male|
| 10|         2010|              Parole|   25-34|Black|  Male|
| 11|         2010|              Parole|   25-34|White|  Male|
| 12|         2010|              Parole|   25-34|White|  Male|
| 13|         2010|              Parole|   35-44|Black|

In [46]:
offense_type_df = df1[['ID','Offense Classification','Offense Type']]
offense_type_df.show()

+---+----------------------+------------+
| ID|Offense Classification|Offense Type|
+---+----------------------+------------+
|  1|              C Felony|     Violent|
|  2|              D Felony|    Property|
|  3|              B Felony|        Drug|
|  4|              B Felony|       Other|
|  5|              D Felony|     Violent|
|  6|              C Felony|        Drug|
|  7|              C Felony|        Drug|
|  8|              D Felony|Public Order|
|  9|     Felony - Enhanced|    Property|
| 10|              C Felony|     Violent|
| 11|              C Felony|        Drug|
| 12|              C Felony|        Drug|
| 13|              D Felony|        Drug|
| 14|              D Felony|        Drug|
| 15|              D Felony|        Drug|
| 16|              C Felony|    Property|
| 17|              D Felony|Public Order|
| 18|              D Felony|    Property|
| 19|              C Felony|     Violent|
| 20|  Aggravated Misdem...|    Property|
+---+----------------------+------

In [48]:
Recidivism_details_df = df1[['ID','Days to Return','Recidivism Type','New Offense Classification','New Offense Type','Return to Prison']]
Recidivism_details_df.show()

+---+--------------+---------------+--------------------------+----------------+----------------+
| ID|Days to Return|Recidivism Type|New Offense Classification|New Offense Type|Return to Prison|
+---+--------------+---------------+--------------------------+----------------+----------------+
|  1|           433|            New|                  C Felony|            Drug|             Yes|
|  2|           453|           Tech|                      null|            null|             Yes|
|  3|           832|           Tech|                      null|            null|             Yes|
|  4|          null|  No Recidivism|                      null|            null|              No|
|  5|           116|           Tech|                      null|            null|             Yes|
|  6|          null|  No Recidivism|                      null|            null|              No|
|  7|          null|  No Recidivism|                      null|            null|              No|
|  8|            84|

In [49]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-1.clf82frcjuur.ca-central-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "Database_1", 
          "driver":"org.postgresql.Driver"}    

In [50]:
# Write df to prisoners table in postgres server
prisoner_df.write.jdbc(url=jdbc_url, table='PRISONERS', mode=mode, properties=config)

In [51]:
Recidivism_details_df.write.jdbc(url=jdbc_url, table='RECIDIVISM', mode=mode, properties=config)

In [52]:
offense_type_df.write.jdbc(url=jdbc_url, table='OFFENSE', mode=mode, properties=config)