In [9]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connecting to security.0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.39                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [2 InRelease 14.2 kB/88.7 k                                                                               Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]

In [10]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-12-22 16:54:46--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-12-22 16:54:47 (5.55 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [13]:
# start spark session that can connect to postgre
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("final-project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [15]:
# load in data
from google.colab import files
uploaded = files.upload()

Saving cancer_incidence.csv to cancer_incidence.csv
Saving Daily_PM2.5_Concentrations_All_County__2001-2016.csv to Daily_PM2.5_Concentrations_All_County__2001-2016.csv


In [60]:
import io
import pandas as pd

# load in cancer data
cancer_df = pd.read_csv(io.BytesIO(uploaded['cancer_incidence.csv']))

cancer_df.head()

Unnamed: 0,index,County,FIPS,"Age-Adjusted Incidence Rate(Ê) - cases per 100,000",Lower 95% Confidence Interval,Upper 95% Confidence Interval,Average Annual Count,Recent Trend,Recent 5-Year Trend (ˆ) in Incidence Rates,Lower 95% Confidence Interval.1,Upper 95% Confidence Interval.1
0,0,"US (SEER+NPCR)(1,10)",0,62.4,62.3,62.6,214614,falling,-2.5,-3.0,-2.0
1,1,"Autauga County, Alabama(6,10)",1001,74.9,65.1,85.7,43,stable,0.5,-14.9,18.6
2,2,"Baldwin County, Alabama(6,10)",1003,66.9,62.4,71.7,170,stable,3.0,-10.2,18.3
3,3,"Barbour County, Alabama(6,10)",1005,74.6,61.8,89.4,25,stable,-6.4,-18.3,7.3
4,4,"Bibb County, Alabama(6,10)",1007,86.4,71.0,104.2,23,stable,-4.5,-31.4,32.9


In [66]:
# get rid of recent trend nulls (stored as *, _, __)

cancer_df = cancer_df.loc[(cancer_df['Recent Trend']=='rising')|(cancer_df['Recent Trend']=='falling')|(cancer_df['Recent Trend']=='stable')]

cancer_df['Recent Trend'].value_counts()

stable     2429
falling     200
rising       43
Name: Recent Trend, dtype: int64

In [61]:
# load in pollution data
pm_df = pd.read_csv(io.BytesIO(uploaded['Daily_PM2.5_Concentrations_All_County__2001-2016.csv']))

pm_df.head()

Unnamed: 0,year,date,statefips,countyfips,PM25_max_pred,PM25_med_pred,PM25_mean_pred,PM25_pop_pred
0,2001,01JAN2001,1,1,10.664367,10.264546,10.137631,10.188703
1,2001,01JAN2001,1,3,9.803209,8.739505,8.743748,8.811486
2,2001,01JAN2001,1,5,12.087599,11.809159,11.812775,11.802062
3,2001,01JAN2001,1,7,8.579425,8.435394,8.458118,8.448871
4,2001,01JAN2001,1,9,14.399446,13.577741,13.300528,13.231461


In [62]:
# filter pollution data for years matching cancer data
pm_df = pm_df.loc[pm_df['year']<=2014]

In [67]:
# check years
pm_df.year.unique()

array([2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011,
       2012, 2013, 2014])

In [64]:
# create state + county FIPS to match cancer data FIPS

pm_df['FIPS'] = pm_df['statefips'].astype(str) + pm_df['countyfips'].astype(str)

pm_df['FIPS'] = pm_df['FIPS'].astype(int)

pm_df.head()

Unnamed: 0,year,date,statefips,countyfips,PM25_max_pred,PM25_med_pred,PM25_mean_pred,PM25_pop_pred,FIPS
0,2001,01JAN2001,1,1,10.664367,10.264546,10.137631,10.188703,11
1,2001,01JAN2001,1,3,9.803209,8.739505,8.743748,8.811486,13
2,2001,01JAN2001,1,5,12.087599,11.809159,11.812775,11.802062,15
3,2001,01JAN2001,1,7,8.579425,8.435394,8.458118,8.448871,17
4,2001,01JAN2001,1,9,14.399446,13.577741,13.300528,13.231461,19


In [68]:
# aggregate pollution data by county

pm_agg = pm_df.groupby('FIPS').agg({'PM25_max_pred':'mean', 'PM25_med_pred':'mean', 'PM25_mean_pred':'mean'})

pm_agg.reset_index(inplace=True)

pm_agg.head()

Unnamed: 0,FIPS,PM25_max_pred,PM25_med_pred,PM25_mean_pred
0,11,12.239594,11.870589,11.7879
1,13,11.024418,9.82287,9.794272
2,15,11.68817,11.099414,11.084238
3,17,12.280141,11.700512,11.721958
4,19,13.498799,12.690525,12.741736


In [69]:
# get row count of initial df

len(cancer_df)

2672

In [70]:
# merge cancer and pollution data

final_df = pd.merge(cancer_df, pm_agg, how='inner', left_on=' FIPS', right_on='FIPS')

final_df.head()

Unnamed: 0,index,County,FIPS,"Age-Adjusted Incidence Rate(Ê) - cases per 100,000",Lower 95% Confidence Interval,Upper 95% Confidence Interval,Average Annual Count,Recent Trend,Recent 5-Year Trend (ˆ) in Incidence Rates,Lower 95% Confidence Interval.1,Upper 95% Confidence Interval.1,FIPS.1,PM25_max_pred,PM25_med_pred,PM25_mean_pred
0,51,"Montgomery County, Alabama(6,10)",1101,61.0,56.5,65.6,144,falling,-9.6,-15.1,-3.8,1101,12.555164,12.013325,11.958884
1,52,"Morgan County, Alabama(6,10)",1103,80.4,74.0,87.4,116,stable,-3.2,-12.0,6.4,1103,13.287901,12.68461,12.647047
2,53,"Perry County, Alabama(6,10)",1105,83.1,62.5,108.8,11,stable,-5.0,-20.5,13.6,1105,11.426605,11.07215,11.079387
3,54,"Pickens County, Alabama(6,10)",1107,65.6,52.8,81.0,18,stable,-13.2,-26.2,2.0,1107,11.594499,11.146401,11.155309
4,55,"Pike County, Alabama(6,10)",1109,61.8,50.5,74.9,22,stable,-11.9,-35.2,19.8,1109,11.352271,10.939546,10.932512


In [71]:
# get row count of matches

len(final_df)

1048

In [72]:
# get count of label distribution

final_df['Recent Trend'].value_counts()

stable     961
falling     68
rising      19
Name: Recent Trend, dtype: int64