#### 4.2 Create the county dimension table
- Create county dimension table from Covid-19 data as a start
- Augment county table with health data
- Augment with area and population density (using health data and area data)
- Use state abbreviation as foreign key for state table (which is created afterwards)
- Partition by state

##### Setup
I'm going to need Spark for this because I'll want to make use of some of its functionality, such as the ability to create temporary SQL views of my dataframes.

In [1]:
from setup import create_spark_session

spark = create_spark_session()

Imports and output paths:

In [2]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *

from clean import *
from etl import *

# For now, just locally, later on maybe write this to S3 instead
output_path = "output/"

Let's first load in the cleaned Covid data and inspect the schema:

In [3]:
covid_cases_df = load_covid_case_data(spark)

In [4]:
covid_cases_df.printSchema()

root
 |-- fips: integer (nullable = true)
 |-- county_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- 1/22/20: integer (nullable = true)
 |-- 1/23/20: integer (nullable = true)
 |-- 1/24/20: integer (nullable = true)
 |-- 1/25/20: integer (nullable = true)
 |-- 1/26/20: integer (nullable = true)
 |-- 1/27/20: integer (nullable = true)
 |-- 1/28/20: integer (nullable = true)
 |-- 1/29/20: integer (nullable = true)
 |-- 1/30/20: integer (nullable = true)
 |-- 1/31/20: integer (nullable = true)
 |-- 2/1/20: integer (nullable = true)
 |-- 2/2/20: integer (nullable = true)
 |-- 2/3/20: integer (nullable = true)
 |-- 2/4/20: integer (nullable = true)
 |-- 2/5/20: integer (nullable = true)
 |-- 2/6/20: integer (nullable = true)
 |-- 2/7/20: integer (nullable = true)
 |-- 2/8/20: integer (nullable = true)
 |-- 2/9/20: integer (nullable = true)
 |-- 2/10/20: integer (nullable = true)
 |-- 2

Alright, that looks as expected, so let's select the first five columns to form the base of the county dimension table. The date-columns will become the county fact table later on.

In [5]:
county_columns = ["fips", "county_name", "latitude", "longitude"]
county_dim_df = covid_cases_df[county_columns]
county_dim_df.limit(5).show()

+----+-----------+-----------+------------------+
|fips|county_name|   latitude|         longitude|
+----+-----------+-----------+------------------+
|1001|    Autauga|32.53952745|      -86.64408227|
|1003|    Baldwin|30.72774991|      -87.72207058|
|1005|    Barbour|  31.868263|       -85.3871286|
|1007|       Bibb|32.99642064|-87.12511459999996|
|1009|     Blount|33.98210918|      -86.56790593|
+----+-----------+-----------+------------------+



In [6]:
county_dim_df.count()

3142

Next, let's load the health data and join it with the county data using the FIPS code. This will be an inner join, so we might lose some counties that aren't in both data sets; we will still have ~3000 counties, so I'm alright with that.

In [7]:
health_df = load_health_data(spark)

In [8]:
health_df.printSchema()

root
 |-- fips: integer (nullable = true)
 |-- county_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- poor_health: double (nullable = true)
 |-- smokers: double (nullable = true)
 |-- obesity: double (nullable = true)
 |-- physical_inactivity: double (nullable = true)
 |-- excessive_drinking: double (nullable = true)
 |-- uninsured: double (nullable = true)
 |-- physicians: double (nullable = true)
 |-- unemployment: double (nullable = true)
 |-- air_pollution: double (nullable = true)
 |-- housing_problems: double (nullable = true)
 |-- household_overcrowding: double (nullable = true)
 |-- food_insecurity: double (nullable = true)
 |-- residential_segregation: double (nullable = true)
 |-- over_sixtyfives: double (nullable = true)
 |-- rural: double (nullable = true)



In [9]:
health_df.limit(5).show()

+----+--------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+
|fips|   county_name|state|population| poor_health|     smokers|obesity|physical_inactivity|excessive_drinking|   uninsured| physicians|unemployment|air_pollution|housing_problems|household_overcrowding|food_insecurity|residential_segregation|over_sixtyfives|       rural|
+----+--------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+
|   0| United States|   US| 327167434|0.1719867644|0.1708001743|   0.29|              0.233|      0.1897709024|0.1022344603|7.546654E-4|0.0389533902|          8.6|    0.1791360885| 

In [10]:
health_df = health_df.withColumnRenamed('county_name', 'cn')
county_dim_df = county_dim_df.join(health_df, on=["fips"], how="inner").drop('cn')
county_dim_df.limit(5).show()

+----+-----------+-----------+------------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+
|fips|county_name|   latitude|         longitude|state|population| poor_health|     smokers|obesity|physical_inactivity|excessive_drinking|   uninsured| physicians|unemployment|air_pollution|housing_problems|household_overcrowding|food_insecurity|residential_segregation|over_sixtyfives|       rural|
+----+-----------+-----------+------------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+
|1001|    Autauga|32.53952745|      -86.64408227|   AL|     55601|0.2088298733|0.1808155718|  0.3

In [11]:
county_dim_df.printSchema()

root
 |-- fips: integer (nullable = true)
 |-- county_name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- poor_health: double (nullable = true)
 |-- smokers: double (nullable = true)
 |-- obesity: double (nullable = true)
 |-- physical_inactivity: double (nullable = true)
 |-- excessive_drinking: double (nullable = true)
 |-- uninsured: double (nullable = true)
 |-- physicians: double (nullable = true)
 |-- unemployment: double (nullable = true)
 |-- air_pollution: double (nullable = true)
 |-- housing_problems: double (nullable = true)
 |-- household_overcrowding: double (nullable = true)
 |-- food_insecurity: double (nullable = true)
 |-- residential_segregation: double (nullable = true)
 |-- over_sixtyfives: double (nullable = true)
 |-- rural: double (nullable = true)



In [12]:
county_dim_df.count()

3142

That's... kinda weird, but I'll take it. It looks like we didn't lose any counties at all with this join.

Next up, let's augment it with our area data.

In [13]:
county_area_df = load_area_data(spark)

In [14]:
county_area_df.printSchema()

root
 |-- fips: integer (nullable = true)
 |-- area: double (nullable = true)



In [15]:
county_area_df.limit(5).show()

+----+-------+
|fips|   area|
+----+-------+
|1001|594.436|
|1009|644.776|
|1017|596.531|
|1021|692.854|
|1033|592.619|
+----+-------+



In [16]:
county_dim_df = county_dim_df.join(county_area_df, on=["fips"], how="inner")
county_dim_df.limit(5).show()

+----+-----------+-----------+------------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+--------+
|fips|county_name|   latitude|         longitude|state|population| poor_health|     smokers|obesity|physical_inactivity|excessive_drinking|   uninsured| physicians|unemployment|air_pollution|housing_problems|household_overcrowding|food_insecurity|residential_segregation|over_sixtyfives|       rural|    area|
+----+-----------+-----------+------------------+-----+----------+------------+------------+-------+-------------------+------------------+------------+-----------+------------+-------------+----------------+----------------------+---------------+-----------------------+---------------+------------+--------+
|1001|    Autauga|32.53952745|      -86.64408227|   AL|     55601|0.20

In [17]:
county_dim_df.count()

3140

Looks like we lost two counties. Honestly, that's pretty great for three data sets of 3000+ rows.

Now that we have columns for both the population and the area, let's combine these to calculate the population density:

In [18]:
county_dim_df = county_dim_df.withColumn('population_density', county_dim_df['population'] / county_dim_df['area'])
county_dim_df[['county_name', 'population', 'area', 'population_density']].limit(5).show()

+-----------+----------+--------+------------------+
|county_name|population|    area|population_density|
+-----------+----------+--------+------------------+
|    Autauga|     55601| 594.436| 93.53572125510567|
|    Baldwin|    218022|1589.784|137.13938497305293|
|    Barbour|     24881| 884.876|28.118063999927674|
|       Bibb|     22400| 622.582| 35.97919631470232|
|     Blount|     57840| 644.776| 89.70557216769856|
+-----------+----------+--------+------------------+



In [19]:
county_dim_df.agg({'population_density': 'min'}).show()

+-----------------------+
|min(population_density)|
+-----------------------+
|     0.0366104788482254|
+-----------------------+



In [20]:
county_dim_df.agg({'population_density': 'max'}).show()

+-----------------------+
|max(population_density)|
+-----------------------+
|      71343.51044723816|
+-----------------------+



Quick data quality checks, looks like the min and max values are reasonable (not negative, not infinite).

Now we're ready to write out the dimension table to parquet.

In [21]:
county_dim_df.write.partitionBy('state').mode('overwrite').parquet(output_path + "county_dim.parquet")