In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
                                                                               Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co                                                                               Get:4 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
0% [4 InRelease 21.4 kB/114 kB 19%] [Connecting to security.ubuntu.com (91.189.                                                                               Hit:5 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
0% [4 InRelease 21.4 kB/114 kB 19%] [Connecting to security.ubuntu.com

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-04-11 17:25:55--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-04-11 17:25:57 (1.59 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final-Project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [5]:
from pyspark import SparkFiles
# Load in the review data from S3 into the dataframe
url = "https://unbearable-1-project-bucket.s3.us-east-2.amazonaws.com/Behavioral_Risk_Factor_Surveillance_System__BRFSS__Prevalence_Data__2011_to_present_.csv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep=",", header=True, inferSchema=True)
df.show()

+----+------------+------------+--------------------+----------------+--------------------+--------------------+-------------------+------------------+-----------+----------+--------------------+---------------------+-------------+---------------+----------------+--------------------------+-------------------+----------+-------+-------+----------+----------+------------------+----------+----------+--------------------+
|Year|Locationabbr|Locationdesc|               Class|           Topic|            Question|            Response|          Break_Out|Break_Out_Category|Sample_Size|Data_value|Confidence_limit_Low|Confidence_limit_High|Display_order|Data_value_unit| Data_value_type|Data_Value_Footnote_Symbol|Data_Value_Footnote|DataSource|ClassId|TopicId|LocationID|BreakoutID|BreakOutCategoryID|QuestionID|ResponseID|         GeoLocation|
+----+------------+------------+--------------------+----------------+--------------------+--------------------+-------------------+------------------+---

### Clean up Columns

In [14]:
# Keep only most recent year
recent_df = df[df['Year']== 2021]
recent_df.show()

+----+------------+--------------+--------------------+---------+--------------------+--------+--------------------+------------------+-----------+----------+--------------------+---------------------+-------------+---------------+----------------+--------------------------+-------------------+----------+-------+-------+----------+----------+------------------+----------+----------+--------------------+
|Year|Locationabbr|  Locationdesc|               Class|    Topic|            Question|Response|           Break_Out|Break_Out_Category|Sample_Size|Data_value|Confidence_limit_Low|Confidence_limit_High|Display_order|Data_value_unit| Data_value_type|Data_Value_Footnote_Symbol|Data_Value_Footnote|DataSource|ClassId|TopicId|LocationID|BreakoutID|BreakOutCategoryID|QuestionID|ResponseID|         GeoLocation|
+----+------------+--------------+--------------------+---------+--------------------+--------+--------------------+------------------+-----------+----------+--------------------+-------

In [47]:
clean_recent_df = recent_df.drop("Locationdesc", 
                                 "Data_value",
                                 "Confidence_limit_Low", 
                                 "Confidence_limit_High", 
                                 "Display_order", 
                                 "Data_value_unit", 
                                 "Data_value_type", 
                                 "Data_Value_Footnote_Symbol", 
                                 "Data_Value_Footnote",
                                 "DataSource",
                                 "ClassId",
                                 "TopicId",
                                 "LocationID",
                                 "BreakoutID",
                                 "BreakOutCategoryID",
                                 "QuestionID",
                                 "ResponseID",
                                 "GeoLocation"
                                 )
clean_recent_df.show(truncate=False)

+----+------------+-------------------------+---------+----------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------+------------------+-----------+
|Year|Locationabbr|Class                    |Topic    |Question                                                                                            |Response|Break_Out                                              |Break_Out_Category|Sample_Size|
+----+------------+-------------------------+---------+----------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------+------------------+-----------+
|2021|NY          |Chronic Health Indicators|Arthritis|Adults who have been told they have arthritis (variable calculated from one or more BRFSS questions)|No      |Multiracial, non-Hispanic                              |Race/Ethnicity    |4

In [48]:
# Show distinct topics to narrow analysis
topic_df = clean_recent_df.dropDuplicates(['Topic']).select("Topic")
topic_df.sort(topic_df.Topic).show(50, truncate=False)

+----------------------+
|Topic                 |
+----------------------+
|Age                   |
|Alcohol Consumption   |
|Arthritis             |
|Asthma                |
|BMI Categories        |
|Binge Drinking        |
|COPD                  |
|Cardiovascular Disease|
|Cholesterol Checked   |
|Cholesterol High      |
|Current Smoker Status |
|Depression            |
|Diabetes              |
|Disability status     |
|E-Cigarette Use       |
|Education             |
|Employment            |
|Exercise              |
|Fair or Poor Health   |
|Flu Shot              |
|Fruit Consumption     |
|HIV Test              |
|Health Care Cost      |
|Health Care Coverage  |
|Healthy Days          |
|Hearing               |
|Heavy Drinking        |
|High Blood Pressure   |
|Income                |
|Kidney                |
|Last Checkup          |
|Marital Status        |
|Number of Children    |
|Other Cancer          |
|Overall Health        |
|Personal Care Provider|
|Pneumonia Vaccination |
