Ibrahim Malik x23373385

# Data Ingestion and Cleaning

## Library Imports

In [1]:
# handle warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
# import relevant database manipulation libraries
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col
from pyspark.sql import Row
from pyspark.sql.functions import expr
import psycopg2
from pymongo import MongoClient
from azure.storage.blob import BlobServiceClient
import io

In [3]:
# import relevant data analysis libraries
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

## Database SetUp

In [4]:
# initialize Spark session
spark = SparkSession.builder.appName("PostgresIntegration").config("spark.jars", "/home/ibrahimssmalik/Downloads/postgresql-42.6.0.jar").getOrCreate()

25/04/21 17:20:57 WARN Utils: Your hostname, scpserver-i resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/21 17:20:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/04/21 17:20:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### PostgreSQL Database

In [5]:
from db_utils import PostgresDBHelper

In [6]:
# PostgreSQL Setup
pg_conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="disability_db",
    user="ibrahimssmalik",
    password="virtualbox2025"
)
pg_cursor = pg_conn.cursor()
print("Connected to PostgreSQL")

Connected to PostgreSQL


In [7]:
# set up sql database access
db = PostgresDBHelper(spark, pg_conn)

In [8]:
# check tables
db.check_tables().show()
tables = db.check_tables()

                                                                                

+----------+
|table_name|
+----------+
+----------+



In [9]:
# # refresh whole DB
# db.refresh_db()

### MongoDB Database

In [10]:
from db_utils import MongoDBHelper

In [11]:
# initialize MongoDBHelper
mongo_conn = "mongodb://ibrahimssmalik:virtualbox2025@localhost:27017" # MongoDB connection URI
print("Connected to MongoDB")

Connected to MongoDB


In [12]:
# set up nosql database access
db_mongo = MongoDBHelper(spark, mongo_conn)

In [13]:
# list Collections
collections = db_mongo.list_collections()
print("Collections:", collections)

Collections: []


In [14]:
# # clear the entire database
# db_mongo.clear_database()

## Disability Data

In [None]:
# main function to set file name and download URL
def set_file(file_name):
    sas_token = "ADD SAS TOKEN HERE"
    file_url = f"https://datasets4disability.blob.core.windows.net/datasets/{file_name}?{sas_token}"
    # check if the file already exists
    if not os.path.exists(file_name):
        print(f"Downloading {file_name}...")
        r = requests.get(file_url)
        with open(file_name, "wb") as f:
            f.write(r.content)
        print(f"{file_name} downloaded successfully.")
    else:
        print(f"{file_name} already exists. Skipping download.")

In [16]:
# set file name and download URL
file_name = "DHDS_2025.csv"
set_file(file_name)

DHDS_2025.csv already exists. Skipping download.


In [17]:
# load the file with PySpark
disability_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true") .load(file_name)

                                                                                

In [18]:
disability_df.head(1)

25/04/21 17:21:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(Rowid='2019~11~JOB01~JOB~DISTYPE~SELFDIS~~', Year=2019, LocationAbbr='DC', LocationDesc='District of Columbia', DataSource='BRFSS', Category='Demographics', Indicator='Employment status among adults 18 years of age or older', Response='Employed', Data_Value_Unit='%', Data_Value_Type='Age-adjusted Prevalence', Data_Value=None, Data_Value_Alt=None, Data_Value_Footnote_Symbol='*', Data_Value_Footnote='Data suppressed', Low_Confidence_Limit=None, High_Confidence_Limit=None, Number=None, WeightedNumber=None, StratificationCategory1='Disability Type', Stratification1='Self-care Disability', StratificationCategory2=None, Stratification2=None, CategoryID='DEMOG', IndicatorID='JOB', Geolocation=None, LocationID=11, ResponseID='JOB01', DataValueTypeID='AGEADJPREV', StratificationCategoryID1='DISTYPE', StratificationID1='SELFDIS', StratificationCategoryID2=None, StratificationID2=None)]

In [19]:
# show the first few rows
disability_df.show(5, truncate=False)

+-----------------------------------+----+------------+--------------------+----------+------------+-------------------------------------------------------+--------------------+---------------+-----------------------+----------+--------------+--------------------------+-------------------+--------------------+---------------------+------+--------------+-----------------------+-----------------------------+-----------------------+---------------+----------+-----------+-----------+----------+----------+---------------+-------------------------+-----------------+-------------------------+-----------------+
|Rowid                              |Year|LocationAbbr|LocationDesc        |DataSource|Category    |Indicator                                              |Response            |Data_Value_Unit|Data_Value_Type        |Data_Value|Data_Value_Alt|Data_Value_Footnote_Symbol|Data_Value_Footnote|Low_Confidence_Limit|High_Confidence_Limit|Number|WeightedNumber|StratificationCategory1|Stratifica

In [20]:
# print schema
disability_df.printSchema()

root
 |-- Rowid: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- LocationAbbr: string (nullable = true)
 |-- LocationDesc: string (nullable = true)
 |-- DataSource: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Indicator: string (nullable = true)
 |-- Response: string (nullable = true)
 |-- Data_Value_Unit: string (nullable = true)
 |-- Data_Value_Type: string (nullable = true)
 |-- Data_Value: double (nullable = true)
 |-- Data_Value_Alt: double (nullable = true)
 |-- Data_Value_Footnote_Symbol: string (nullable = true)
 |-- Data_Value_Footnote: string (nullable = true)
 |-- Low_Confidence_Limit: double (nullable = true)
 |-- High_Confidence_Limit: double (nullable = true)
 |-- Number: integer (nullable = true)
 |-- WeightedNumber: integer (nullable = true)
 |-- StratificationCategory1: string (nullable = true)
 |-- Stratification1: string (nullable = true)
 |-- StratificationCategory2: string (nullable = true)
 |-- Stratification2: string (n

In [21]:
# filter out rows with no Data_Value
df_filtered = disability_df.filter(disability_df["Data_Value"].isNotNull())

In [22]:
# select and rename the columns
disability_stats_df = df_filtered.select(
    col("Year").cast("int").alias("year"),
    col("LocationAbbr").alias("state"),
    col("LocationDesc").alias("state_name"),
    col("Category").alias("category"),
    col("Indicator").alias("indicator"),
    col("Response").alias("response"),
    col("Data_Value_Type").alias("data_value_type"),
    col("Data_Value").cast("float").alias("data_value"),
    col("Number").cast("float").alias("number"),
    col("WeightedNumber").cast("int").alias("weightednumber"),
    col("StratificationCategory1").alias("strat_category1"),
    col("Stratification1").alias("strat_value1"),
    col("StratificationCategory2").alias("strat_category2"),
    col("Stratification2").alias("strat_value2"),
)

# add the table to database
db.add_table('disability_stats',disability_stats_df)



disability_stats table written to PostgreSQL.


                                                                                

In [23]:
categories_df = df_filtered.select("Category").distinct().withColumnRenamed("Category", "category_name")

# add categories table to database
db.add_table('categories',categories_df)

[Stage 9:>                                                          (0 + 4) / 4]

categories table written to PostgreSQL.


                                                                                

In [24]:
indicators_df = df_filtered.select("Indicator").distinct().withColumnRenamed("Indicator", "indicator_name")

# add indicators table to database
db.add_table('indicators',indicators_df)



indicators table written to PostgreSQL.


                                                                                

In [25]:
stratifications_df = df_filtered.select(
    col("StratificationCategory1").alias("strat_category"),
    col("Stratification1").alias("strat_value")
).distinct()

# add stratifications table to database
db.add_table('stratifications',stratifications_df)



stratifications table written to PostgreSQL.


                                                                                

In [26]:
# check if disability table written to PostgreSQL
db.check_tables().show()

+----------------+
|      table_name|
+----------------+
|disability_stats|
|      categories|
|      indicators|
| stratifications|
+----------------+



In [27]:
# example query to database table
db.run_query("SELECT * FROM disability_stats LIMIT 100").show()

+----+-----+--------------+--------------------+--------------------+--------------+--------------------+----------+------+--------------+-----------------+--------------------+---------------+-------------------+
|year|state|    state_name|            category|           indicator|      response|     data_value_type|data_value|number|weightednumber|  strat_category1|        strat_value1|strat_category2|       strat_value2|
+----+-----+--------------+--------------------+--------------------+--------------+--------------------+----------+------+--------------+-----------------+--------------------+---------------+-------------------+
|2021| HHS6|  HHS Region 6|Health Risks & Be...|Binge drinking in...|            No|Age-adjusted Prev...|      88.3|1885.0|       1637613|  Disability Type|   Vision Disability|           NULL|               NULL|
|2021|   MA| Massachusetts|Health Risks & Be...|Body mass index c...|   Underweight|Age-adjusted Prev...|       2.0|  48.0|         33453|Disabi

## Income Data

In [28]:
# set file name and download URL
file_name = "Income_Data.csv"
set_file(file_name)

Income_Data.csv already exists. Skipping download.


In [29]:
# load the file with PySpark
income_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_name)

In [30]:
income_df.head()

Row(Year=2016, Stratification='All Households', Characteristic='All Households', Population_Thousands=126224, Median_Income=60309)

In [31]:
# print schema
income_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Stratification: string (nullable = true)
 |-- Characteristic: string (nullable = true)
 |-- Population_Thousands: integer (nullable = true)
 |-- Median_Income: integer (nullable = true)



In [32]:
# clean column names: remove spaces, dots, and convert to snake_case
for col in income_df.columns:
    income_df = income_df.withColumnRenamed(col, col.strip().lower().replace(" ", "_").replace(".", ""))

In [33]:
# add income table to database
db.add_table('income_data',income_df)

income_data table written to PostgreSQL.


In [34]:
# check if income table written to PostgreSQL
db.check_tables().show()

+----------------+
|      table_name|
+----------------+
|disability_stats|
|      categories|
|      indicators|
| stratifications|
|     income_data|
+----------------+



In [35]:
# example query to database table
db.run_query("SELECT * FROM income_data LIMIT 100").show()

+----+--------------------+--------------------+--------------------+-------------+
|year|      stratification|      characteristic|population_thousands|median_income|
+----+--------------------+--------------------+--------------------+-------------+
|2016|      All Households|      All Households|              126224|        60309|
|2016|   Type of Household|   Family households|               82827|        76676|
|2016|   Type of Household|     .Married-couple|               60804|        88929|
|2016|   Type of Household|.Female household...|               15572|        41909|
|2016|   Type of Household|.Male householder...|                6452|        59299|
|2016|   Type of Household|Nonfamily households|               43396|        36530|
|2016|   Type of Household| .Female householder|               22858|        31230|
|2016|   Type of Household|   .Male householder|               20539|        42647|
|2016|Race and Hispanic...|               White|               99400|       

## Health Data

In [36]:
# set file name and download URL
file_name = "Health_Data.csv"
set_file(file_name)

Health_Data.csv already exists. Skipping download.


In [37]:
# load the file with PySpark
health_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_name)

In [38]:
health_df.head()

Row(Year=2015, Metric='Adults Reporting Not Having a Personal Doctor, 2015', Demographic='Male', United States=27.1, Alabama=25.6, Alaska=43.8, Arizona=32.9, Arkansas=20.4, California=28.8, Colorado=30.8, Connecticut=20.1, Delaware=19.9, Dist. of Columbia=21.9, Florida=26.5, Georgia=35.2, Hawaii=18.3, Idaho=33.5, Illinois=22.4, Iowa=26.3, Indiana=25.8, Kansas=26.4, Kentucky=21.9, Louisiana=29.5, Maine=17.1, Maryland=19.0, Massachusetts=14.9, Michigan=20.2, Minnesota=30.2, Mississippi=28.9, Missouri=27.3, Montana=31.4, Nebraska=25.7, Nevada=39.7, New Hampshire=14.8, New Jersey=22.3, New Mexico=35.7, New York=22.7, North Carolina=28.9, North Dakota=35.3, Ohio=23.5, Oklahoma=32.0, Oregon=28.6, Pennsylvania=19.3, Rhode Island=16.6, South Carolina=27.8, South Dakota=29.1, Tennessee=28.8, Texas=37.8, Utah=31.9, Vermont=16.0, Virginia=26.1, Washington=29.8, West Virginia=27.5, Wisconsin=27.5, Wyoming=38.7)

In [39]:
# print schema
health_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Metric: string (nullable = true)
 |-- Demographic: string (nullable = true)
 |-- United States: double (nullable = true)
 |-- Alabama: double (nullable = true)
 |-- Alaska: double (nullable = true)
 |-- Arizona: double (nullable = true)
 |-- Arkansas: double (nullable = true)
 |-- California: double (nullable = true)
 |-- Colorado: double (nullable = true)
 |-- Connecticut: double (nullable = true)
 |-- Delaware: double (nullable = true)
 |-- Dist. of Columbia: double (nullable = true)
 |-- Florida: double (nullable = true)
 |-- Georgia: double (nullable = true)
 |-- Hawaii: double (nullable = true)
 |-- Idaho: double (nullable = true)
 |-- Illinois: double (nullable = true)
 |-- Iowa: double (nullable = true)
 |-- Indiana: double (nullable = true)
 |-- Kansas: double (nullable = true)
 |-- Kentucky: double (nullable = true)
 |-- Louisiana: double (nullable = true)
 |-- Maine: double (nullable = true)
 |-- Maryland: double (nullable = true

In [40]:
# get list of state columns (all except Year, Metric, Demographic)
id_cols = ["Year", "Metric", "Demographic"]
state_cols = [col for col in health_df.columns if col not in id_cols]

# build stack expression to unpivot
stack_expr = "stack({}, {}) as (state, value)".format(
    len(state_cols),
    ", ".join([f"'{s}', `{s}`" for s in state_cols])
)

# apply unpivot
health_long_df = health_df.selectExpr(*id_cols, stack_expr)

# show sample
health_long_df.show(5)

+----+--------------------+-----------+-------------+-----+
|Year|              Metric|Demographic|        state|value|
+----+--------------------+-----------+-------------+-----+
|2015|Adults Reporting ...|       Male|United States| 27.1|
|2015|Adults Reporting ...|       Male|      Alabama| 25.6|
|2015|Adults Reporting ...|       Male|       Alaska| 43.8|
|2015|Adults Reporting ...|       Male|      Arizona| 32.9|
|2015|Adults Reporting ...|       Male|     Arkansas| 20.4|
+----+--------------------+-----------+-------------+-----+
only showing top 5 rows



In [41]:
# clean column names
for col in health_long_df.columns:
    health_long_df = health_long_df.withColumnRenamed(col, col.strip().lower().replace(" ", "_"))

In [42]:
# add health table to database
db.add_table('health_data',health_long_df)

                                                                                

health_data table written to PostgreSQL.


In [43]:
# check if health table written to PostgreSQL
db.check_tables().show()

+----------------+
|      table_name|
+----------------+
|disability_stats|
|      categories|
|      indicators|
| stratifications|
|     income_data|
|     health_data|
+----------------+



In [44]:
# example query to database table
db.run_query("SELECT * FROM health_data LIMIT 100").show()

+----+--------------------+-----------+-----------------+-----+
|year|              metric|demographic|            state|value|
+----+--------------------+-----------+-----------------+-----+
|2015|Adults Reporting ...|       Male|    United States| 27.1|
|2015|Adults Reporting ...|       Male|          Alabama| 25.6|
|2015|Adults Reporting ...|       Male|           Alaska| 43.8|
|2015|Adults Reporting ...|       Male|          Arizona| 32.9|
|2015|Adults Reporting ...|       Male|         Arkansas| 20.4|
|2015|Adults Reporting ...|       Male|       California| 28.8|
|2015|Adults Reporting ...|       Male|         Colorado| 30.8|
|2015|Adults Reporting ...|       Male|      Connecticut| 20.1|
|2015|Adults Reporting ...|       Male|         Delaware| 19.9|
|2015|Adults Reporting ...|       Male|Dist. of Columbia| 21.9|
|2015|Adults Reporting ...|       Male|          Florida| 26.5|
|2015|Adults Reporting ...|       Male|          Georgia| 35.2|
|2015|Adults Reporting ...|       Male| 

## Education Data

In [45]:
# List to store all data rows
all_rows = []

# loop through years 2015–2022
for year in range(2015, 2023):
    url = f"https://api.census.gov/data/{year}/acs/acs5/subject?get=NAME,S1501_C02_014E&for=state:*"
    response = requests.get(url)

    if response.status_code == 200:
        json_data = response.json()
        headers = json_data[0]
        rows = json_data[1:]

        for row in rows:
            doc = {
                "year": year,
                "state": row[0],
                "state_code": row[2],
                "percent_bachelors_or_higher": float(row[1]) if row[1] not in ("", None) else None
            }
            all_rows.append(doc)
    else:
        print(f"Failed to fetch data for {year}: HTTP {response.status_code}")

In [46]:
# insert all rows into MongoDB
db_mongo.write_collection("education_data",all_rows)

Inserted 416 records into collection 'education_data'.


In [47]:
# check if education documents written to PostgreSQL
collections = db_mongo.list_collections()
print("Collections:", collections)

Collections: ['education_data']


In [48]:
# example query to database table
db_mongo.read_collection("education_data",one=True)

Fetched 1 records from collection 'education_data'.


[{'_id': ObjectId('68067ece25cebf77505d9dff'),
  'year': 2015,
  'state': 'Mississippi',
  'state_code': '28',
  'percent_bachelors_or_higher': 82.3}]

In [49]:
docs = db_mongo.read_collection("education_data",one=False)

# convert _id from ObjectId to string in ObjectId format
for doc in docs:
    doc['_id'] = str(doc['_id'])

Fetched 416 records from collection 'education_data'.


In [50]:
# convert docs to Spark DataFrame
education_df = spark.createDataFrame([Row(**doc) for doc in docs])

In [51]:
# Insert the DataFrame into PostgreSQL
db.add_table("education_data", education_df)

education_data table written to PostgreSQL.


In [52]:
# check if health table written to PostgreSQL
db.check_tables().show()

+----------------+
|      table_name|
+----------------+
|disability_stats|
|      categories|
|      indicators|
| stratifications|
|     income_data|
|     health_data|
|  education_data|
+----------------+



In [53]:
# example query to database table
db.run_query("SELECT * FROM education_data LIMIT 100").show()

+--------------------+----+--------------+----------+---------------------------+
|                 _id|year|         state|state_code|percent_bachelors_or_higher|
+--------------------+----+--------------+----------+---------------------------+
|68067ece25cebf775...|2015|   Mississippi|        28|                       82.3|
|68067ece25cebf775...|2015|      Missouri|        29|                       88.4|
|68067ece25cebf775...|2015|       Montana|        30|                       92.8|
|68067ece25cebf775...|2015|      Nebraska|        31|                       90.7|
|68067ece25cebf775...|2015|        Nevada|        32|                       85.1|
|68067ece25cebf775...|2015| New Hampshire|        33|                       92.3|
|68067ece25cebf775...|2015|    New Jersey|        34|                       88.6|
|68067ece25cebf775...|2015|    New Mexico|        35|                       84.2|
|68067ece25cebf775...|2015|      New York|        36|                       85.6|
|68067ece25cebf7