# Predicting the Effects of COVID-19 Restrictions Using Historical Traffic Data
## By: Andrew LaFortune, May 6th 2022

### Scenario
Two years into the COVID-19 pandemic and starting to see the light at the end of the tunnel, the Minnesota Governor's Office has started to reflect on the actions they took early on in the pandemic. They have asked LFDS (LaFortune Data Solutions) to find a way to numerically evaluate the effectiveness of stay-at-home and reopening policies not just in terms of the number of COVID cases, but in other less obvious numbers that might be found in other datasets online. These other data points are up to LFDS to find and examine. The final product to be delivered is a model that might be used in future pandemic situations to determine which restrictions and relaxations on businesses to deploy and in what time frame to minimize the spread of disease. 

### Approach
The primary dataset that will bridge the gap between the Governor's policies and COVID-19 cases is the Minnesota Department of Transportation's (MNDOT) Automatic Traffic Recorder (ATR) and Weigh in Motion (WIM) [Hourly Volume Data](http://www.dot.state.mn.us/traffic/data/data-products.html). Some work has already been done with this data to visualize the percent difference in traffic volume from the average daily traffic volume across all days 2016-2019 ([PDF Download](https://edocs-public.dot.state.mn.us/edocs_public/DMResultSet/download?docId=12227832)). LFDS believes this is a good first step, but would like to produce a model that adjusts for nuances in typical daily or seasonal traffic patterns. This goal is summarized by Deliverable 1:
> Deliverable 1: Graph Daily Volume Change for each MNDOT District and statewide compared to daily baseline.
__Note:__ The 2016 volume data is no longer available, so the baseline to compare will be from 2017-2019

The goal of this report is to see how key policy decisions affected COVID cases. To see this, LFDS gathered several COVID case count datasets from the [New York Times COVID-19 Data GitHub](https://github.com/nytimes/covid-19-data). These datasets provide counts from January 21st, 2020 to the present day (May 3rd at the latest update of this notebook) at country-wide, state-wide, and county-wide levels. Compiling these into graphs for comparison with the traffic volume data will be the goal of Deliverable 2:
> Deliverable 2: Graph Daily COVID-19 case and death counts in Minnesota.

These graphs will likely show some clear trends in traffic and COVID cases that may have some visual relation and indicate when major policy changes occured. To verify these instances and work towards a model for gauging effectiveness of policy changes, the events from BallotPedia's [Documenting Minnesota's path to recovery from the coronavirus (COVID-19) pandemic, 2020-2021"](https://ballotpedia.org/Documenting_Minnesota%27s_path_to_recovery_from_the_coronavirus_(COVID-19)_pandemic,_2020-2021) article have been compiled into a spreadsheet with dates and classifications for each notable event. The events can be added to the plots of traffic volume and COVID-19 cases to paint a clearer picture of what happened immediately following policy changes for Deliverable 3:
> Deliverable 3: Deliverable 1 and 2 graphs with vertical lines indicating when policy changes went into effect.

Deliverables 1, 2, and 3 will give some idea of __what happened__. The last piece of this project is to condense that data into a useful model that can be used to predict __what will happen__ if similar restriction/relaxation measures are taken in a future pandemic situation. LFDS has decided that the best model for the job is a Time Series Forecasing model that takes as inputs the proportion of expected traffic volume for 30 days prior to an event predict the target variable of daily traffic volume percent difference from baseline for next 10 days.

Deliverable 4 will consist of testing the Facebook Prophet time series prediction model for this task with the goal of minimizing the error between model prediction and real value for traffic volume percent difference. This will include the test scenarios:
- a baseline prediction on a non-COVID year
- a prediction immediately following each type of COVID policy change

Based on the results of these scenarios, a recommendation to use Facebook's Prophet model or to look for other options:
> Deliverable 4: A recommendation of Time Series Forecasting model, or a recommendation to look for other prediction methods.

In [0]:
# imports
from pyspark.sql.types import *
from pyspark.sql.functions import mean, lit, min as colmin, max as colmax, sum as colsum, col, when, count, datediff, lag, countDistinct, to_date, regexp_replace, udf, date_format,desc
from pyspark.sql.window import Window
from operator import add
from functools import reduce

## Data Loading

### Available Data Sources

In [0]:
%sh
rm -r /dbfs/lafor038;
rm -r lafor038;

rm: cannot remove '/dbfs/lafor038': No such file or directory
rm: cannot remove 'lafor038': No such file or directory


In [0]:
%sh
mkdir lafor038;
cd lafor038;
pwd;
mkdir wim_volume;
cd wim_volume;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2017_wim_atr_volume.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2018_wim_atr_volume.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2019_wim_atr_volume.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2020_wim_atr_volume.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2021_wim_atr_volume.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/2022_wim_atr_volume.csv;

/databricks/driver/lafor038
--2022-05-08 18:18:35--  https://github.com/andrew-lafortune/SparkData/raw/main/data/2017_wim_atr_volume.csv
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/2017_wim_atr_volume.csv [following]
--2022-05-08 18:18:36--  https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/2017_wim_atr_volume.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5117525 (4.9M) [text/plain]
Saving to: ‘2017_wim_atr_volume.csv’

     0K .......... .......... .......... .......... ..........  1% 6.15M 1s
    50K .......... .......... 

In [0]:
%sh
cd lafor038;
mkdir covid_cases;
cd covid_cases;
wget https://github.com/nytimes/covid-19-data/raw/master/us-states.csv;
wget https://github.com/nytimes/covid-19-data/raw/master/us.csv;
mkdir us_counties;
cd us_counties;
wget https://github.com/nytimes/covid-19-data/raw/master/us-counties-2020.csv;
wget https://github.com/nytimes/covid-19-data/raw/master/us-counties-2021.csv;
wget https://github.com/nytimes/covid-19-data/raw/master/us-counties-2022.csv;


--2022-05-08 18:18:45--  https://github.com/nytimes/covid-19-data/raw/master/us-states.csv
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv [following]
--2022-05-08 18:18:45--  https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1555167 (1.5M) [text/plain]
Saving to: ‘us-states.csv’

     0K .......... .......... .......... .......... ..........  3%  819K 2s
    50K .......... .......... .......... .......... ..........  6% 1003K 2s
   100K .......... .......... .......... .....

In [0]:
%sh
cd lafor038;
mkdir stations_counties_districts;
cd stations_counties_districts;

wget https://github.com/andrew-lafortune/SparkData/raw/main/data/Current_CC_StationList.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/Retired_CC_StationList.csv;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/MN_Counties_Districts.csv;

--2022-05-08 18:18:52--  https://github.com/andrew-lafortune/SparkData/raw/main/data/Current_CC_StationList.csv
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/Current_CC_StationList.csv [following]
--2022-05-08 18:18:52--  https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/Current_CC_StationList.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12371 (12K) [text/plain]
Saving to: ‘Current_CC_StationList.csv’

     0K .......... ..                                         100% 36.3M=0s

2022-05-08 18:18:52 (36.3 MB/s) - ‘Current_CC_St

In [0]:
%sh
cd lafor038;
wget https://github.com/andrew-lafortune/SparkData/raw/main/data/MN_COVID_Timeline.csv;

--2022-05-08 18:18:54--  https://github.com/andrew-lafortune/SparkData/raw/main/data/MN_COVID_Timeline.csv
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/MN_COVID_Timeline.csv [following]
--2022-05-08 18:18:54--  https://raw.githubusercontent.com/andrew-lafortune/SparkData/main/data/MN_COVID_Timeline.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4517 (4.4K) [text/plain]
Saving to: ‘MN_COVID_Timeline.csv’

     0K ....                                                  100% 17.9M=0s

2022-05-08 18:18:54 (17.9 MB/s) - ‘MN_COVID_Timeline.csv’ saved [451

In [0]:
%sh ls lafor038

MN_COVID_Timeline.csv
covid_cases
stations_counties_districts
wim_volume


In [0]:
%sh cp -r lafor038 /dbfs/lafor038

In [0]:
%fs ls file:/dbfs/lafor038

path,name,size,modificationTime
file:/dbfs/lafor038/wim_volume/,wim_volume/,4096,1652033934751
file:/dbfs/lafor038/covid_cases/,covid_cases/,4096,1652033934755
file:/dbfs/lafor038/stations_counties_districts/,stations_counties_districts/,4096,1652033934895
file:/dbfs/lafor038/MN_COVID_Timeline.csv,MN_COVID_Timeline.csv,4517,1652033934895


### WIM ATR Volume Data

In [0]:
%sh ls /dbfs/lafor038/wim_volume

2017_wim_atr_volume.csv
2018_wim_atr_volume.csv
2019_wim_atr_volume.csv
2020_wim_atr_volume.csv
2021_wim_atr_volume.csv
2022_wim_atr_volume.csv


#### Define Schema
Columns identify station, direction, and lane of observation

Each Numbered column represents an hour of the day (e.g. 1 = 1AM)

In [0]:
wim_volume_schema = StructType([StructField('station_id',IntegerType(),True),
                               StructField('dir_of_travel',IntegerType(),True),
                               StructField('lane_of_travel',IntegerType(),True),
                               StructField('date',StringType(),True),
                               StructField('1',IntegerType(),True),
                               StructField('2',IntegerType(),True),
                               StructField('3',IntegerType(),True),
                               StructField('4',IntegerType(),True),
                               StructField('5',IntegerType(),True),
                               StructField('6',IntegerType(),True),
                               StructField('7',IntegerType(),True),
                               StructField('8',IntegerType(),True),
                               StructField('9',IntegerType(),True),
                               StructField('10',IntegerType(),True),
                               StructField('11',IntegerType(),True),
                               StructField('12',IntegerType(),True),
                               StructField('13',IntegerType(),True),
                               StructField('14',IntegerType(),True),
                               StructField('15',IntegerType(),True),
                               StructField('16',IntegerType(),True),
                               StructField('17',IntegerType(),True),
                               StructField('18',IntegerType(),True),
                               StructField('19',IntegerType(),True),
                               StructField('20',IntegerType(),True),
                               StructField('21',IntegerType(),True),
                               StructField('22',IntegerType(),True),
                               StructField('23',IntegerType(),True),
                               StructField('24',IntegerType(),True)])

#### Load Each File and Save as Parquet Table

In [0]:
%fs ls file:/dbfs/lafor038

path,name,size,modificationTime
file:/dbfs/lafor038/wim_volume/,wim_volume/,4096,1652033934751
file:/dbfs/lafor038/covid_cases/,covid_cases/,4096,1652033934755
file:/dbfs/lafor038/stations_counties_districts/,stations_counties_districts/,4096,1652033934895
file:/dbfs/lafor038/MN_COVID_Timeline.csv,MN_COVID_Timeline.csv,4517,1652033934895


In [0]:
file_dir = "file:/dbfs/lafor038/wim_volume/"
file_type = "csv"
f_name_suffix = "_wim_atr_volume.csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

wim_volume = {}

for year in range(2017,2023):
    file_location = file_dir + str(year) + f_name_suffix
    wim_volume[year] = spark.read.format(file_type) \
      .schema(wim_volume_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)
    
    table_name = str(year)+"_wim_atr_volume"

    wim_volume[year].createOrReplaceTempView(table_name)

### COVID Case Data

In [0]:
%sh ls lafor038/covid_cases

us-states.csv
us.csv
us_counties


In [0]:
%sh ls lafor038/covid_cases/us_counties

us-counties-2020.csv
us-counties-2021.csv
us-counties-2022.csv


In [0]:
#define schemas for each level of case and death data
us_covid_schema = StructType([StructField('date',StringType(),True),
                             StructField('cases',IntegerType(),True),
                             StructField('deaths',IntegerType(),True)])

states_covid_schema = StructType([StructField('date',StringType(),True),
                                  StructField('state',StringType(),True),
                                  StructField('fips',IntegerType(),True),
                                  StructField('cases',IntegerType(),True),
                                  StructField('deaths',IntegerType(),True)])

counties_covid_schema = StructType([StructField('date',StringType(),True),
                                    StructField('county',StringType(),True),
                                    StructField('state',StringType(),True),
                                    StructField('fips',IntegerType(),True),
                                    StructField('cases',IntegerType(),True),
                                    StructField('deaths',IntegerType(),True)])

#file location and type
covid_dir = "file:/dbfs/lafor038/covid_cases/"
file_type = "csv"

# CSV options
first_row_is_header = "true"
delimiter = ","

# read files for country and state wide case counts and save to Parquet
us_cases = spark.read.format(file_type).schema(us_covid_schema).option("header",first_row_is_header).option("sep",delimiter).load(covid_dir+"us.csv")
state_cases = spark.read.format(file_type).schema(states_covid_schema).option("header",first_row_is_header).option("sep",delimiter).load(covid_dir+"us-states.csv")

us_cases.createOrReplaceTempView("us_covid_cases")
state_cases.createOrReplaceTempView("state_covid_cases")                                                             
                                                             
# read and write for each year of county data                                                             
counties = {}                                                             
for year in range(2020,2023):    
    file_location = covid_dir+"us_counties/us-counties-"+str(year)+".csv"                                                            
    counties[year] = spark.read.format(file_type) \
      .schema(counties_covid_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)
    
    table_name = "county_cases_"+str(year)                                                             
    counties[year].createOrReplaceTempView(table_name)   

### Station Info
Each station has columns indicating location, type, number of lanes, etc. Most important is the "County Name" column which can be used to connect WIM recordings to specific counties for comparison with COVID case counts.

The MN_Counties_Districts data connects county names to district numbers to mimic the graphs from MNDOT which are being recreated for Deliverable 1

In [0]:
%sh ls lafor038/stations_counties_districts

Current_CC_StationList.csv
MN_Counties_Districts.csv
Retired_CC_StationList.csv


In [0]:
current_station_schema = StructType([StructField("Continuous Number",IntegerType(),True),
                             StructField("Sequence Number",IntegerType(),True),
                             StructField("Collection Type",StringType(),True),
                             StructField("Route",StringType(),True),
                             StructField("Pos Dir Dir",StringType(),True),
                             StructField("Pos Lanes",IntegerType(),True),
                             StructField("Neg Lanes",IntegerType(),True),
                             StructField("Urban/Rural",StringType(),True),
                             StructField("Functional Class",StringType(),True),
                             StructField("County Name",StringType(),True),
                             StructField("Location Text",StringType(),True)])

retired_station_schema = StructType([StructField("Continuous Number",IntegerType(),True),
                                     StructField("Collection Type",StringType(),True),
                                     StructField("Measurement",StringType(),True),
                                     StructField("Route",StringType(),True),
                                     StructField("Pos Lanes",IntegerType(),True),
                                     StructField("Pos Dir Dir",StringType(),True),
                                     StructField("Ref Post",StringType(),True),
                                     StructField("True Mile",FloatType(),True),
                                     StructField("Location",StringType(),True),
                                     StructField("County",StringType(),True),
                                     StructField("Closest City",StringType(),True),
                                     StructField("Status",StringType(),True)])

county_district_schema = StructType([StructField("County",StringType(),True),
                                    StructField("MnDOT district(s)",StringType(),True)])

In [0]:
file_location = "file:/dbfs/lafor038/stations_counties_districts/"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

current_stations = spark.read.format(file_type).schema(current_station_schema).option("header",first_row_is_header).option("sep",delimiter).load(file_location+"Current_CC_StationList.csv")
retired_stations = spark.read.format(file_type).schema(retired_station_schema).option("header",first_row_is_header).option("sep",delimiter).load(file_location+"Retired_CC_StationList.csv")
county_district = spark.read.format(file_type).schema(county_district_schema).option("header",first_row_is_header).option("sep",delimiter).load(file_location+"MN_Counties_Districts.csv")

current_stations.createOrReplaceTempView("current_stations")
retired_stations.createOrReplaceTempView("retired_stations")
county_district.createOrReplaceTempView("county_district")

## COVID Timeline
This data was made by hand based on data in an article from [BallotPedia](https://ballotpedia.org/Documenting_Minnesota%27s_path_to_recovery_from_the_coronavirus_%28COVID-19%29_pandemic,_2020-2021). It classifies each significant policy event in Minnesota throughout the pandemic by date, type, and category of society affected to be aligned with traffic volume and COVID data later on.

In [0]:
timeline_schema = StructType([StructField("Date",StringType(),True),
                              StructField("Category",StringType(),True),
                              StructField("Order Type",StringType(),True),
                              StructField("Event Type",StringType(),True),
                              StructField("Description",StringType(),True)])

file_location = "file:/dbfs/lafor038/"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

timeline = spark.read.format(file_type).schema(timeline_schema).option("header",first_row_is_header).option("sep",delimiter).load(file_location+"MN_COVID_Timeline.csv")
timeline.createOrReplaceTempView("MN_COVID_Timeline")

In [0]:
%sql
show views;

namespace,viewName,isTemporary
,2017_wim_atr_volume,True
,2018_wim_atr_volume,True
,2019_wim_atr_volume,True
,2020_wim_atr_volume,True
,2021_wim_atr_volume,True
,2022_wim_atr_volume,True
,county_cases_2020,True
,county_cases_2021,True
,county_cases_2022,True
,county_district,True


### Data Analysis (exploration)

### Timeline
This data was made by hand for this project, so the contents shouldn't have anything unexpected. Exploration will consist of checking the different categories available.

In [0]:
timeline.groupBy("Category", "Order Type", "Event Type").count().orderBy(desc("count")).display()

Category,Order Type,Event Type,count
Business,Relaxation,Effective,8
Business,Restriction,Effective,5
Emergency,Policy,Effective,4
Business,Restriction,Announcement,3
Business,Relaxation,Announcement,3
Vaccination,Relaxation,Effective,3
Emergency,Policy,Extension,3
Business,Policy,Effective,2
Education,Restriction,Extension,2
Education,Relaxation,Announcement,2


There are 23 different groupings of Category, Order Type, and Event Type with Business regulations being the most common by far. The most useful points in the policy timeline will probably be when Business Relaxations or Restrictions go into effect.

#### COVID Data
Test for: 
- Completeness: no null values, complete time series
- Uniqueness: how many unique values for each dataset?
- Validity: does the data make sense? (e.g. no negative case counts)

In [0]:
us_cases.limit(100).display()

date,cases,deaths
2020-01-21,1,0
2020-01-22,1,0
2020-01-23,1,0
2020-01-24,2,0
2020-01-25,3,0
2020-01-26,5,0
2020-01-27,5,0
2020-01-28,5,0
2020-01-29,5,0
2020-01-30,6,0


Looks like the counts per date are a running total.

In [0]:
us_cases.select([count(when(col(c).isNotNull() , c)).alias("notNull"+c.capitalize()) for c in us_cases.columns]
   ).withColumn("expected",lit(us_cases.count())).show()

us_cases.select([count(when(col(c) >= 0 , c)).alias("pos"+c.capitalize()) for c in ["cases","deaths"]]
   ).withColumn("expected",lit(us_cases.count())).show()

us_cases.select(mean((us_cases["deaths"] <= us_cases["cases"]).cast("int")).alias("isValidDeathCount")).show()

windowSpec = Window.orderBy("date")
us_cases.orderBy("date") \
        .withColumn("date_diff",datediff(us_cases.date,lag("date",1).over(windowSpec))) \
        .select(mean("date_diff").alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()

us_cases.select("date").distinct().count() == us_cases.count()

+-----------+------------+-------------+--------+
|notNullDate|notNullCases|notNullDeaths|expected|
+-----------+------------+-------------+--------+
|        838|         838|          838|     838|
+-----------+------------+-------------+--------+

+--------+---------+--------+
|posCases|posDeaths|expected|
+--------+---------+--------+
|     838|      838|     838|
+--------+---------+--------+

+-----------------+
|isValidDeathCount|
+-----------------+
|              1.0|
+-----------------+

+---------------+--------+
|isValidTimeStep|expected|
+---------------+--------+
|            1.0|     1.0|
+---------------+--------+

Out[21]: True

In [0]:
state_cases.limit(100).display()
state_cases.filter(state_cases["state"] == "Washington").limit(100).display()

date,state,fips,cases,deaths
2020-01-21,Washington,53,1,0
2020-01-22,Washington,53,1,0
2020-01-23,Washington,53,1,0
2020-01-24,Illinois,17,1,0
2020-01-24,Washington,53,1,0
2020-01-25,California,6,1,0
2020-01-25,Illinois,17,1,0
2020-01-25,Washington,53,1,0
2020-01-26,Arizona,4,1,0
2020-01-26,California,6,2,0


date,state,fips,cases,deaths
2020-01-21,Washington,53,1,0
2020-01-22,Washington,53,1,0
2020-01-23,Washington,53,1,0
2020-01-24,Washington,53,1,0
2020-01-25,Washington,53,1,0
2020-01-26,Washington,53,1,0
2020-01-27,Washington,53,1,0
2020-01-28,Washington,53,1,0
2020-01-29,Washington,53,1,0
2020-01-30,Washington,53,1,0


Again, running total of cases and deaths.

In [0]:
state_cases.select([count(when(col(c).isNotNull() , c)).alias("notNull"+c.capitalize()) for c in state_cases.columns]
   ).withColumn("expected",lit(state_cases.count())).show()

state_cases.select([count(when(col(c) >= 0 , c)).alias("pos"+c.capitalize()) for c in ["cases","deaths"]]
   ).withColumn("expected",lit(state_cases.count())).show()

state_cases.select(mean((state_cases["deaths"] <= state_cases["cases"]).cast("int")).alias("isValidDeathCount")).show()

windowSpec = Window.partitionBy("state").orderBy("date")
state_cases.orderBy("date") \
        .withColumn("date_diff",datediff(state_cases.date,lag("date",1).over(windowSpec))) \
        .select(mean("date_diff").alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()

state_cases.groupBy("state").agg(countDistinct("date").alias("distinct"),count(lit(1)).alias("expected")) \
           .withColumn("isUniqueDates",(col("distinct") == col("expected")).cast("int")).select(mean("isUniqueDates").alias("pctUnique")).withColumn("expected",lit(1.0)).show()

+-----------+------------+-----------+------------+-------------+--------+
|notNullDate|notNullState|notNullFips|notNullCases|notNullDeaths|expected|
+-----------+------------+-----------+------------+-------------+--------+
|      44022|       44022|      44022|       44022|        44022|   44022|
+-----------+------------+-----------+------------+-------------+--------+

+--------+---------+--------+
|posCases|posDeaths|expected|
+--------+---------+--------+
|   44022|    44022|   44022|
+--------+---------+--------+

+-----------------+
|isValidDeathCount|
+-----------------+
|              1.0|
+-----------------+

+---------------+--------+
|isValidTimeStep|expected|
+---------------+--------+
|            1.0|     1.0|
+---------------+--------+

+---------+--------+
|pctUnique|expected|
+---------+--------+
|      1.0|     1.0|
+---------+--------+



For states, check how many are accounted.

In [0]:
state_cases.select("state").distinct().orderBy("state").display()

state
Alabama
Alaska
American Samoa
Arizona
Arkansas
California
Colorado
Connecticut
Delaware
District of Columbia


Looks like all 50 states are there (verified by singing the "50 Nifty" song from elementary school) plus 6 territories like American Samoa and Puerto Rico. Leave those in for now.

In [0]:
counties[2020].filter(col("county") == lit("Hennepin")).limit(20).show()

+----------+--------+---------+-----+-----+------+
|      date|  county|    state| fips|cases|deaths|
+----------+--------+---------+-----+-----+------+
|2020-03-12|Hennepin|Minnesota|27053|    1|     0|
|2020-03-13|Hennepin|Minnesota|27053|    3|     0|
|2020-03-14|Hennepin|Minnesota|27053|    6|     0|
|2020-03-15|Hennepin|Minnesota|27053|   12|     0|
|2020-03-16|Hennepin|Minnesota|27053|   26|     0|
|2020-03-17|Hennepin|Minnesota|27053|   26|     0|
|2020-03-18|Hennepin|Minnesota|27053|   33|     0|
|2020-03-19|Hennepin|Minnesota|27053|   33|     0|
|2020-03-20|Hennepin|Minnesota|27053|   44|     0|
|2020-03-21|Hennepin|Minnesota|27053|   52|     0|
|2020-03-22|Hennepin|Minnesota|27053|   57|     0|
|2020-03-23|Hennepin|Minnesota|27053|   89|     0|
|2020-03-24|Hennepin|Minnesota|27053|  103|     0|
|2020-03-25|Hennepin|Minnesota|27053|  111|     0|
|2020-03-26|Hennepin|Minnesota|27053|  128|     0|
|2020-03-27|Hennepin|Minnesota|27053|  141|     1|
|2020-03-28|Hennepin|Minnesota|

In [0]:
for year in counties:
    df = counties[year]
    print(year)
    df.select([count(when(col(c).isNotNull() , c)).alias("notNull"+c.capitalize()) for c in df.columns]
       ).withColumn("expected",lit(df.count())).show()

    df.select([count(when(col(c) >= 0 , c)).alias("pos"+c.capitalize()) for c in ["cases","deaths"]]
       ).withColumn("expected",lit(df.count())).show()

    df.select(mean((df["deaths"] <= df["cases"]).cast("int")).alias("isValidDeathCount")).show()

    windowSpec = Window.partitionBy("county").orderBy("date")
    df.orderBy("date") \
            .withColumn("date_diff",datediff(df.date,lag("date",1).over(windowSpec))) \
            .select(mean((col("date_diff") == lit(1)).cast("int")).alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()
    
    df.groupBy("state","county").agg(countDistinct("date").alias("distinct"),count(lit(1)).alias("expected")) \
           .withColumn("isUniqueDates",(col("distinct") == col("expected")).cast("int")).select(mean("isUniqueDates").alias("pctUnique")).withColumn("expected",lit(1.0)).show()
    break

2020
+-----------+-------------+------------+-----------+------------+-------------+--------+
|notNullDate|notNullCounty|notNullState|notNullFips|notNullCases|notNullDeaths|expected|
+-----------+-------------+------------+-----------+------------+-------------+--------+
|     884737|       884737|      884737|     876471|      884737|       865976|  884737|
+-----------+-------------+------------+-----------+------------+-------------+--------+

+--------+---------+--------+
|posCases|posDeaths|expected|
+--------+---------+--------+
|  884737|   865976|  884737|
+--------+---------+--------+

+------------------+
| isValidDeathCount|
+------------------+
|0.9990854250002309|
+------------------+

+-----------------+--------+
|  isValidTimeStep|expected|
+-----------------+--------+
|0.597892857668777|     1.0|
+-----------------+--------+

+---------+--------+
|pctUnique|expected|
+---------+--------+
|      1.0|     1.0|
+---------+--------+



Null counts in fips and deaths don't match the expected. Missing fips is ok since that column won't be used, but the missing death counts need to be identified.

In [0]:
for year in counties:
    print(year)
    df = counties[year]
    df.filter(df["deaths"].isNull()).limit(10).show()
    df.filter(df["deaths"].isNull()).select(df["state"]).distinct().show()
    

2020
+----------+------------+-----------+-----+-----+------+
|      date|      county|      state| fips|cases|deaths|
+----------+------------+-----------+-----+-----+------+
|2020-05-05|    Adjuntas|Puerto Rico|72001|    3|  null|
|2020-05-05|      Aguada|Puerto Rico|72003|    7|  null|
|2020-05-05|   Aguadilla|Puerto Rico|72005|   11|  null|
|2020-05-05|Aguas Buenas|Puerto Rico|72007|   22|  null|
|2020-05-05|    Aibonito|Puerto Rico|72009|   13|  null|
|2020-05-05|      Anasco|Puerto Rico|72011|    5|  null|
|2020-05-05|     Arecibo|Puerto Rico|72013|   43|  null|
|2020-05-05|      Arroyo|Puerto Rico|72015|    5|  null|
|2020-05-05| Barceloneta|Puerto Rico|72017|    3|  null|
|2020-05-05|Barranquitas|Puerto Rico|72019|   15|  null|
+----------+------------+-----------+-----+-----+------+

+-----------+
|      state|
+-----------+
|Puerto Rico|
+-----------+

2021
+----------+------------+-----------+-----+-----+------+
|      date|      county|      state| fips|cases|deaths|
+-----

Null death counts are only in Puerto Rico. That's a US territory, not a state, and the county-level data isn't needed, so those values get dropped.

In [0]:
for year in counties:
    df = counties[year]
    df = df.filter(df["state"] != "Puerto Rico")
    print(year)
    df.select([count(when(col(c).isNotNull() , c)).alias("notNull"+c.capitalize()) for c in df.columns]
       ).withColumn("expected",lit(df.count())).show()

    df.select([count(when(col(c) >= 0 , c)).alias("pos"+c.capitalize()) for c in ["cases","deaths"]]
       ).withColumn("expected",lit(df.count())).show()

    df.select(mean((df["deaths"] <= df["cases"]).cast("int")).alias("isValidDeathCount")).show()

    windowSpec = Window.partitionBy("county","state").orderBy("date")
    df.orderBy("date") \
            .withColumn("date_diff",datediff(df.date,lag("date",1).over(windowSpec))) \
            .select(mean((col("date_diff") == lit(1)).cast("int")).alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()
    
    df.groupBy("state","county").agg(countDistinct("date").alias("distinct"),count(lit(1)).alias("expected")) \
           .withColumn("isUniqueDates",(col("distinct") == col("expected")).cast("int")).select(mean("isUniqueDates").alias("pctUnique")).withColumn("expected",lit(1.0)).show()

2020
+-----------+-------------+------------+-----------+------------+-------------+--------+
|notNullDate|notNullCounty|notNullState|notNullFips|notNullCases|notNullDeaths|expected|
+-----------+-------------+------------+-----------+------------+-------------+--------+
|     865682|       865682|      865682|     857710|      865682|       865682|  865682|
+-----------+-------------+------------+-----------+------------+-------------+--------+

+--------+---------+--------+
|posCases|posDeaths|expected|
+--------+---------+--------+
|  865682|   865682|  865682|
+--------+---------+--------+

+------------------+
| isValidDeathCount|
+------------------+
|0.9991047520914146|
+------------------+

+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.9997194163860831|     1.0|
+------------------+--------+

+---------+--------+
|pctUnique|expected|
+---------+--------+
|      1.0|     1.0|
+---------+--------+

2021
+-----------+-------------+---

Some rows have more deaths than cases. Filter to see what's going on.

In [0]:
df.filter(col("deaths") > col("cases")).limit(20).show()
df.filter((col("deaths") > col("cases")) & (col("county") != lit("Unknown"))).show()

+----------+-------+--------+----+-----+------+
|      date| county|   state|fips|cases|deaths|
+----------+-------+--------+----+-----+------+
|2022-01-01|Unknown| Florida|null|    5|   162|
|2022-01-01|Unknown|  Hawaii|null|    0|     8|
|2022-01-01|Unknown|Illinois|null|    0|  1167|
|2022-01-01|Unknown|    Ohio|null|    0|     7|
|2022-01-02|Unknown| Florida|null|    5|   162|
|2022-01-02|Unknown|  Hawaii|null|    0|     8|
|2022-01-02|Unknown|Illinois|null|    0|  1167|
|2022-01-02|Unknown|    Ohio|null|    0|     7|
|2022-01-03|Unknown| Florida|null|    5|   162|
|2022-01-03|Unknown|  Hawaii|null|    0|     8|
|2022-01-03|Unknown|Illinois|null|    0|  1186|
|2022-01-03|Unknown|    Ohio|null|    0|     7|
|2022-01-04|Unknown|  Hawaii|null|    0|     8|
|2022-01-04|Unknown|Illinois|null|    0|  1208|
|2022-01-04|Unknown|    Ohio|null|    0|     7|
|2022-01-05|Unknown|  Hawaii|null|    0|     8|
|2022-01-05|Unknown|Illinois|null|    0|  1224|
|2022-01-05|Unknown|    Ohio|null|    0|

This is only a problem in Unknown counties, wich can be removed later.

Next, it seems like some time steps might be missing. Filter by date difference to see what's going on.

In [0]:
windowSpec = Window.partitionBy("county","state").orderBy("date")
withDiff = df.orderBy("date") \
        .withColumn("date_diff",datediff(df.date,lag("date",1).over(windowSpec)))
withDiff.filter(withDiff["date_diff"] != 1).show()
withDiff.filter(withDiff["date_diff"] != 1).filter(col("county") != lit("Unknown")).show()

+----------+-------+--------------------+----+-----+------+---------+
|      date| county|               state|fips|cases|deaths|date_diff|
+----------+-------+--------------------+----+-----+------+---------+
|2022-04-09|Unknown|             Indiana|null|    0|     5|        2|
|2022-02-04|Unknown|               Maine|null|    1|     0|        2|
|2022-01-07|Unknown|Northern Mariana ...|null|   43|     1|        2|
|2022-01-21|Unknown|Northern Mariana ...|null|   47|     0|        2|
|2022-02-02|Unknown|Northern Mariana ...|null|   34|     0|        2|
|2022-03-02|Unknown|Northern Mariana ...|null|  132|     0|        2|
|2022-04-05|Unknown|Northern Mariana ...|null|    5|     0|        5|
|2022-02-05|Unknown|               Texas|null| 2792|     0|        4|
|2022-03-22|Unknown|            Virginia|null|  656|    23|       58|
|2022-02-16|Unknown|              Alaska|null|  821|     1|        3|
|2022-01-19|Unknown|             Arizona|null|    1|     0|        5|
|2022-01-22|Unknown|

The missing dates is only happening in Unknown county reports. These can be filtered out when finding average cases by county.

In [0]:
df.filter(df["county"] != "Unknown").display()

date,county,state,fips,cases,deaths
2022-01-01,Autauga,Alabama,1001,11018,160
2022-01-01,Baldwin,Alabama,1003,39911,593
2022-01-01,Barbour,Alabama,1005,3860,81
2022-01-01,Bibb,Alabama,1007,4533,95
2022-01-01,Blount,Alabama,1009,11256,198
2022-01-01,Bullock,Alabama,1011,1676,46
2022-01-01,Butler,Alabama,1013,3613,102
2022-01-01,Calhoun,Alabama,1015,23411,532
2022-01-01,Chambers,Alabama,1017,6171,147
2022-01-01,Cherokee,Alabama,1019,3385,65


In [0]:
df.filter((df["county"] != "Unknown") & (df["state"] == "Minnesota")).display()

date,county,state,fips,cases,deaths
2022-01-01,Aitkin,Minnesota,27001,2424,54
2022-01-01,Anoka,Minnesota,27003,71618,676
2022-01-01,Becker,Minnesota,27005,6618,81
2022-01-01,Beltrami,Minnesota,27007,8803,111
2022-01-01,Benton,Minnesota,27009,10048,140
2022-01-01,Big Stone,Minnesota,27011,1014,7
2022-01-01,Blue Earth,Minnesota,27013,12732,82
2022-01-01,Brown,Minnesota,27015,5031,66
2022-01-01,Carlton,Minnesota,27017,6115,80
2022-01-01,Carver,Minnesota,27019,18778,90


#### Results
- Completeness: 
  - State and Countrywide Data is fully complete
  - County data was missing death counts in Puerto Rico, so that "state" will be excluded from further analysis. "fips" values were missing for all unknown counties, but those aren't used for analysis or joining so that column can be dropped.
- Uniqueness: 
  - There are 56 unique "state" identifiers including all 50 US states and 6 territories.
  - No date is repeated for any unique state/county combination or in the countrywide DataFrame
- Validity: 
  - With each row being a running total of COVID cases and deaths, there should never be more deaths than cases. This does occur in many of the "Unknown" district entries, which will be filtered out.
  - All counts of cases and deaths are greater than or equal to 0.
  - When sorted by date, each row in the countrywide data is one day apart indicating a continuous time series.
  - When sorted by date and state, each row in the statewide data is one data apart indicating continuous time series.
  - County data has some gaps. These will be filled in with repeated values in the Data Preparation step.

### Station Data
Test for: 
- Completeness: no null values, coverage of all districts
- Uniqueness: how many unique values for each dataset?
  - Unique station #'s
  - Urban/Rural split
  - Number of counties
- Validity: does the data make sense?
  - Does each station belong to a county?
  - Does each county have a corresponding district?

In [0]:
current_stations.limit(100).display()
retired_stations.limit(100).display()

Continuous Number,Sequence Number,Collection Type,Route,Pos Dir Dir,Pos Lanes,Neg Lanes,Urban/Rural,Functional Class,County Name,Location Text
26,5707,WIM,I-35,North,2,2,Rural,Interstate,Steele,"3.5 MI N OF TH30, N OF ELLENDALE"
27,9830,WIM,TH 60,East,2,2,Rural,Principal Arterial - Other,Watonwan,"0.7 MI W OF W JCT OF TH4, SW OF ST JAMES"
28,22993,"ATR Volume, Speed, Class",MSAS 114,East,1,1,Urban,Minor Arterial,Stearns,W OF 20TH AVE N IN ST CLOUD
29,69377,WIM,TH 53,North,2,2,Rural,Principal Arterial - Other,Saint Louis,"3 MI S OF CSAH59 (MELRUDE RD), S OF EVELETH"
30,6757,WIM,TH 61,North,2,2,Rural,Principal Arterial - Other,Lake,"SW OF CSAH25, SW OF TWO HARBORS"
31,4332,WIM,TH 2,East,2,2,Rural,Principal Arterial - Other,Polk,"NW OF CSAH63 (100TH AVE SW), SE OF E GR FORKS"
32,67740,WIM,TH 52,North,2,2,Rural,Principal Arterial - Other Freeways and Expressways,Olmsted,".25 MI N OF CSAH12 (100TH ST NW), S OF ORONOCO"
33,4669,WIM,TH 212,East,1,1,Rural,Principal Arterial - Other,Renville,".7 MI E OF CSAH14 (1ST ST N), E OF OLIVIA"
34,797,WIM,TH 23,North,1,1,Rural,Principal Arterial - Other,Chippewa,"1.1 MI NE OF CR3 (80TH AVE SE), SW OF CLARA CITY"
35,1066,WIM,TH 2,East,2,2,Rural,Principal Arterial - Other,Clearwater,".5 MI W OF CSAH30, E OF BAGLEY"


Continuous Number,Collection Type,Measurement,Route,Pos Lanes,Pos Dir Dir,Ref Post,True Mile,Location,County,Closest City,Status
1,ATR,VOLUME,CSAH 1,2,N/S,000+00.170,0.17,N OF CSAH 15 (4TH ST NE) IN AITKIN,Aitkin,Aitkin,RETIRED 1/1/2001
2,ATR,VOLUME,CSAH 71,2,E/W,000+00.180,0.18,W OF 3RD AVE NE IN MAPLETON,Blue Earth,Mapleton,RETIRED 1/1/2001
3,ATR,VOLUME,MSAS 104,2,E/W,000+00.380,0.38,W OF TH197 (BEMIDJI AVE) IN BEMIDJI,Beltrami,Bemidji,RETIRED 1/1/2001
4,ATR,VOLUME,MSAS 110,2,E/W,016+00.730,16.73,NE OF CR68 (MAIN ST) IN MARSHALL,Lyon,Marshall,RETIRED 1/1/1995
6,ATR,VOLUME,CSAH 43,2,E/W,000+00.040,0.04,E OF TH4 (S MILKY WAY) IN COSMOS,Meeker,Cosmos,RETIRED 1/1/2001
7,ATR,VOLUME,CSAH 32,2,N/S,004+00.080,4.08,N OF 23RD ST IN SLAYTON,Murray,Slayton,RETIRED 1/1/2001
8,ATR,VOLUME/SPEED/CLASS,CSAH 50,2,E/W,000+00.070,0.07,E OF TH75 (BRANDT ST) IN HALSTAD,Norman,Halstad,RETIRED 1/1/2018
9,ATR,VOLUME,CSAH 3,2,N/S,010+00.240,10.24,S OF CSAH22 (MAIN ST) IN EAGLE BEND,Todd,Eagle Bend,RETIRED 1/1/2001
10,ATR,VOLUME,CSAH 30,2,E/W,005+00.990,5.99,W OF CSAH47 (MAIN ST) IN BLACKDUCK,Beltrami,Blackduck,RETIRED 1/1/2001
13,ATR,VOLUME,CSAH 38,2,E/W,014+00.820,14.82,E OF 2ND ST SW IN LONG PRAIRIE,Todd,Long Prairie,RETIRED 1/1/2001


In [0]:
case_counties = None

In [0]:
print("Non Null counts")
current_stations.select([count(when(col(c).isNotNull() , c)).alias(c) for c in current_stations.columns]).withColumn("rowCount",lit(current_stations.count())).show()
retired_stations.select([count(when(col(c).isNotNull() , c)).alias(c) for c in retired_stations.columns]).withColumn("rowCount",lit(retired_stations.count())).show()
print("Unique value counts")
current_stations.select([countDistinct(c).alias(c) for c in current_stations.columns]).withColumn("rowCount",lit(current_stations.count())).show()
retired_stations.select([countDistinct(c).alias(c) for c in retired_stations.columns]).withColumn("rowCount",lit(retired_stations.count())).show()
print("Number of unique stations retired and active")
print(current_stations.select("Continuous Number").unionByName(retired_stations.select("Continuous Number")).distinct().count())
print("Urban / Rural Split")
current_stations.groupBy().pivot("Urban/Rural").count().show()

# all counties that have or have had a measurement station in them
station_counties = current_stations.select(col("County Name").alias("stationCounties")) \
                    .unionByName(retired_stations.select(col("COUNTY").alias("stationCounties"))).distinct()

for year in counties:
    if case_counties is None:
        case_counties = counties[year].filter(col("state") == "Minesota").select("county").distinct()
    
    case_counties = case_counties.union(counties[year].filter(col("state") == "Minnesota").select("county")).distinct()
    
print("Station List Counties Matched with COVID Case Counties")
station_counties = station_counties.join(case_counties,case_counties.county == station_counties.stationCounties,"outer").distinct().orderBy("stationCounties")
station_counties.display()

Non Null counts
+-----------------+---------------+---------------+-----+-----------+---------+---------+-----------+----------------+-----------+-------------+--------+
|Continuous Number|Sequence Number|Collection Type|Route|Pos Dir Dir|Pos Lanes|Neg Lanes|Urban/Rural|Functional Class|County Name|Location Text|rowCount|
+-----------------+---------------+---------------+-----+-----------+---------+---------+-----------+----------------+-----------+-------------+--------+
|              104|            104|            104|  104|        104|      104|      104|        104|             104|        104|          104|     104|
+-----------------+---------------+---------------+-----+-----------+---------+---------+-----------+----------------+-----------+-------------+--------+

+-----------------+---------------+-----------+-----+---------+-----------+--------+---------+--------+------+------------+------+--------+
|Continuous Number|Collection Type|Measurement|Route|Pos Lanes|Pos Dir Di

stationCounties,county
,Cook
,Stevens
,Traverse
,Swift
,Yellow Medicine
,Koochiching
,Lincoln
,Kandiyohi
,Redwood
,St. Louis


There are 28 counties that reported COVID cases, but don't have any WIM stations. That should be ok as long as each county can be mapped to a district to aggregate and predict at the district level.

In [0]:
station_counties.columns

Out[36]: ['stationCounties', 'county']

In [0]:
county_district.select([countDistinct(c).alias(c) for c in county_district.columns]).withColumn("rowCount",lit(county_district.count())).show()

county_district.select("MnDOT district(s)").distinct().show()

station_counties.join(county_district,"county","outer").distinct().orderBy("MnDOT district(s)").display()

+------+-----------------+--------+
|County|MnDOT district(s)|rowCount|
+------+-----------------+--------+
|    87|               10|      87|
+------+-----------------+--------+

+-----------------+
|MnDOT district(s)|
+-----------------+
|                7|
|                3|
|                8|
|          2 and 3|
|                6|
|          1 and 2|
|                1|
|                4|
|                2|
|            Metro|
+-----------------+



county,stationCounties,MnDOT district(s)
,Saint Louis,
Unknown,,
Carlton,,
Aitkin,Aitkin,
Olmsted,Olmsted,
Pennington,,
,St Louis,
Aikin,,1
Pine,Pine,1
Carlson,,1


There are a few cases that can't be aligned to MnDOT districts here:
- Unknown: These will be dropped for per-district predictions.
- "St Louis"/"Saint Louis": There is a "St. Louis" assigned to District 1. "Saint Louis" is the closest match to other county names, so all instances of "St Louis" and "St. Louis" will be replaced and matched to district 1.
- Olmsted: a quick Google search reveals that this is in district 6
- Aitkin: There is an "Aikin" entry in the county_districts table. No "Aikin" county exists in Minnesota, and "Aikin" is assigned to District 1 which is Aitkin's actual district. "Aikin" can be replaced with "Aitkin" and connected to district 1.
- Pennington: There is a "Penningham" entry in the country_districts table, but again that is not a real county. The real Pennington county is ni District 2, which "Penningham" was assigned to. Like "Aikin", replace "Penningham" with "Pennington" and associate with district 2.
- Carlton: "Carlson" is entered in the county_districts table. This is another mispelling, and entries labeled "Carlson" should be replaced with Carlton and assigned to district 1. 
- Cass: This is assigned to district 2 and 3. Looking at the district map it is mostly in district 3, so it can be relabeled accordingly.
- Koochiching: This is assigned to district 1 and 2. The district map shows a majority in district 1, so it can be relabeled to district 1.
- Itasca: This is the same as Koochiching. Relabel to district 1.

### Results
- Completeness:
  - All districts are covered and there are no null values in columns
  - Some counties that reported COVID cases do not have WIM stations, but those case counts will be aggregated to district counts later on.
- Uniqueness:
  - 62 different counties have WIM stations, covering all 8 MNDOT Districts.
  - There are 104 active stations and 119 retired stations for a total of 223 potential stations to view WIM data from.
  - The current stations show Urban / Rural classification, which is split 61 / 43. This likely means that a large portion of rural traffic is not measured, but relative changes in activity should still be discernable.
- Validity:
  - Some counties were assigned to multiple districts. There are few enough that they can be reassigned manually to the district covering a majority of the district.
  - Some county names were not immediately matched to a MNDOT District because of spelling variations or mispellings. Reviewing each case showed that all counties that are not "Unknown" can be matched to a real county in Minnesota. 
  - There are no null values for counties in the station tables, so each station is assigned to a county, and the county name validation confirms that the county for each station is a real county.

### WIM Volume Data
Test for:
- Completeness:
  - No null values
  - Full time series for each station
- Uniqueness:
  - No repeat entries for station, dir, lane, date groupings
- Validity:
  - No negative values in columns
  - Each station has a corresponding value in either current or retired station tables

In [0]:
for year in wim_volume:
    wim_volume[year].limit(20).show()
    break

+----------+-------------+--------------+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|station_id|dir_of_travel|lane_of_travel|      date|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24|
+----------+-------------+--------------+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|         8|            3|             0|2017-01-01|  0|  3|  1|  1|  0|  0|  2|  2|  2|  1|  0| 12|  9|  8|  4|  8|  5|  4|  6|  6|  3|  2|  1|  0|
|         8|            7|             0|2017-01-01|  2|  0|  0|  1|  1|  2|  0|  3|  3|  4|  8|  1|  6|  8|  3|  4|  2|  2|  5|  3|  5|  0|  1|  0|
|         8|            3|             0|2017-01-02|  0|  0|  1|  0|  0|  0|  2|  7|  8|  5|  7| 11|  8|  6| 15|  7|  8|  6| 10|  4|  4|  5|  1|  0|
|         8|            7|             0|2017-01-02|  0|  0|  0|  0|  0|  0|  2| 11| 10|  4|  5|  7| 11|  

In [0]:
print("Null value check")
for year in wim_volume:
    tmp = wim_volume[year]
    print(year, ":", tmp.count(), "Rows")
    tmp.select([count(when(col(c).isNull() , c)).alias(c) for c in tmp.columns]).withColumn("rowCount",lit(tmp.count())).show()

Null value check
2017 : 47850 Rows
+----------+-------------+--------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|station_id|dir_of_travel|lane_of_travel|date|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24|rowCount|
+----------+-------------+--------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|         0|            0|             0|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   47850|
+----------+-------------+--------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+

2018 : 45496 Rows
+----------+-------------+--------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|station_id|dir_of_travel|lane_of_

Note that the number of rows for yearly WIM data doubled from 2020 to 2021. Did the number of stations increase?

In [0]:
for year in wim_volume:
    tmp = wim_volume[year]
    print(year, "unique stations:",tmp.select("station_id").distinct().count())

2017 unique stations: 68
2018 unique stations: 65
2019 unique stations: 78
2020 unique stations: 77
2021 unique stations: 159
2022 unique stations: 156


Yes, there are more than twice as many stations for 2021 than 2020.

In [0]:
# check if each station across all sheets is in the active or retired list
all_stations = current_stations.select("Continuous Number").unionByName(retired_stations.select("Continuous Number")).distinct()

for year in wim_volume:
    tmp = wim_volume[year]
    print(year)
    tmp.select("station_id").distinct().join(all_stations,col("station_id") == col("Continuous Number"),"left").select(count(when(col("station_id").isNull(),"station_id")).alias("unmatched")).show()

2017
+---------+
|unmatched|
+---------+
|        0|
+---------+

2018
+---------+
|unmatched|
+---------+
|        0|
+---------+

2019
+---------+
|unmatched|
+---------+
|        0|
+---------+

2020
+---------+
|unmatched|
+---------+
|        0|
+---------+

2021
+---------+
|unmatched|
+---------+
|        0|
+---------+

2022
+---------+
|unmatched|
+---------+
|        0|
+---------+



In [0]:
print("Unique station, direction, lane, date entries")
for year in wim_volume:
    tmp = wim_volume[year]
    print(year)
    print("Unique:",tmp.select("station_id","dir_of_travel","lane_of_travel","date").distinct().count(),"| Expected:",tmp.count())

Unique station, direction, lane, date entries
2017
Unique: 47850 | Expected: 47850
2018
Unique: 45496 | Expected: 45496
2019
Unique: 64466 | Expected: 64466
2020
Unique: 61078 | Expected: 61078
2021
Unique: 139374 | Expected: 139374
2022
Unique: 26311 | Expected: 26311


In [0]:
print("No Negative values check")
for year in wim_volume:
    tmp = wim_volume[year].drop("date")
    print(year, ":", tmp.count(), "Rows")
    tmp.select([count(when(col(c) < 0 , c)).alias(c) for c in tmp.columns]).withColumn("rowCount",lit(tmp.count())).show()

No Negative values check
2017 : 47850 Rows
+----------+-------------+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|station_id|dir_of_travel|lane_of_travel|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24|rowCount|
+----------+-------------+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|         0|            0|             0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   47850|
+----------+-------------+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+

2018 : 45496 Rows
+----------+-------------+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------+
|station_id|dir_of_travel|lane_of_travel|  1|  2|  3|  4

In [0]:
print("Time series check")
windowSpec = Window.partitionBy("station_id","dir_of_travel","lane_of_travel").orderBy("date")
for year in wim_volume:
    tmp = wim_volume[year]
    print(year)
    tmp.orderBy("date") \
        .withColumn("date_diff",datediff(tmp.date,lag("date",1).over(windowSpec))) \
        .select(mean((col("date_diff") == lit(1)).cast("int")).alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()


Time series check
2017
+---------------+--------+
|isValidTimeStep|expected|
+---------------+--------+
|            1.0|     1.0|
+---------------+--------+

2018
+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.9923930830126929|     1.0|
+------------------+--------+

2019
+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.9563545202578556|     1.0|
+------------------+--------+

2020
+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.9253378045172108|     1.0|
+------------------+--------+

2021
+---------------+--------+
|isValidTimeStep|expected|
+---------------+--------+
|           null|     1.0|
+---------------+--------+

2022
+---------------+--------+
|isValidTimeStep|expected|
+---------------+--------+
|           null|     1.0|
+---------------+--------+



Some stations like 29 seem to have gaps in recordings. Those can be filled in with repeat values after aggregating by date.

In [0]:
wim_volume[2021].limit(10).show()

+----------+-------------+--------------+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|station_id|dir_of_travel|lane_of_travel|    date|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24|
+----------+-------------+--------------+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|        26|            1|             1|1/1/2021| 88| 75| 50| 50| 62| 55| 66| 79|114|205|285|334|360|391|354|304|331|320|245|200|188|138|136| 86|
|        26|            1|             2|1/1/2021|  9| 11|  7|  3|  9|  3| 13|  8| 16| 29| 58|106| 95|136|105|112|116| 95| 72| 55| 51| 31| 25| 18|
|        26|            5|             1|1/1/2021| 58| 39| 46| 28| 50| 68|122|189|251|354|422|425|430|424|373|361|296|270|217|169|145|108| 94| 66|
|        26|            5|             2|1/1/2021|  8|  2|  4|  1|  7|  7| 19| 39| 61|111|163|179|182|189|118|139|102|

The date format changed from 2020 to 2021. That can be transformed quickly and the time series check re-run.

In [0]:
windowSpec = Window.partitionBy("station_id","dir_of_travel","lane_of_travel").orderBy("new_date")
for year in [2021,2022]:
    tmp = wim_volume[year]
    print(year)
    tmp.orderBy("date") \
        .withColumn("new_date",to_date(col("date"),"M/D/yyyy")) \
        .withColumn("date_diff",datediff(col("new_date"),lag("new_date",1).over(windowSpec))) \
        .select(mean((col("date_diff") == lit(1)).cast("int")).alias("isValidTimeStep")).withColumn("expected",lit(1.0)).show()


2021
+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.9215109437514709|     1.0|
+------------------+--------+

2022
+------------------+--------+
|   isValidTimeStep|expected|
+------------------+--------+
|0.8786654960491659|     1.0|
+------------------+--------+



### Results
- Completeness:
  - There are no null values in the volume data.
  - Some stations have gaps in their time series, which will be filled in with an average value of several days before and after the gap.
- Uniqueness: 
  - There are no repeated combinations of station, direction, lane, and date.
- Validity:
  - The date format changed from 2020 to 2021 and 2022. That will need to be fixed when aggregating.
  - No numeric values were negative.
  - Each station matches with one listed in the current or retired station list tables.

In [0]:
county_district.display()

County,MnDOT district(s)
Aikin,1
Anoka,Metro
Becker,4
Beltrami,2
Benton,3
Big Stone,4
Blue Earth,7
Brown,7
Carlson,1
Carver,Metro


## Data Conditioning
The available data for ATR WIM stations (current and retired), COVID cases (county, state, and country), and traffic volume has been examined and several necessary changes identified. In order of discovery those are:
1. Remove Puerto Rico from county level COVID case reports because of null death count values
2. Remove Unknown counties from county level COVID case reports because of non-increasing case/death counts and incomplete time series
3. Minnesota Counties:
    - Rename all instances of "St Louis"/"Saint Louis"/"St. Louis" to "Saint Louis" in COVID case data and rename "St. Louis" to "Saint Louis" in county_district view. (Use again after combining retired and current stations)
    - Remove mysterious whitespace character from Olmsted
    - Replace "Aikin" in county_districts with "Aitkin"
    - Replace "Penningham" in county_districts with "Pennington"
    - Replace "Carlson" with "Carlton" in county_districts
    - Assign Cass county to district 3 only.
    - Assign Koochiching county to district 1 only.
    - Assign Itasca county to district 1 only.
4. Reformat dates for wim_volume data in 2021 and 2022

In [0]:
# Remove Puerto Rico and Unknown counties from county level COVID reports
for year in counties:
    tmp = counties[year]
    tmp = tmp.filter(col("state") != "Puerto Rico")
    #verify no more Null death counts
    tmp.select(count(when(col("deaths").isNull(), "deaths")).alias("nullDeathCount")).show()
    tmp = tmp.filter(col("county") != "Unknown")
    
    counties[year] = tmp

+--------------+
|nullDeathCount|
+--------------+
|             0|
+--------------+

+--------------+
|nullDeathCount|
+--------------+
|             0|
+--------------+

+--------------+
|nullDeathCount|
+--------------+
|             0|
+--------------+



In [0]:
mn_counties = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema=counties_covid_schema)

for year in counties:
    mn_counties = mn_counties.unionByName(counties[year].filter(col("state") == "Minnesota"))
mn_counties.orderBy("county","date").limit(10).show()

+----------+------+---------+-----+-----+------+
|      date|county|    state| fips|cases|deaths|
+----------+------+---------+-----+-----+------+
|2020-04-17|Aitkin|Minnesota|27001|    1|     0|
|2020-04-18|Aitkin|Minnesota|27001|    1|     0|
|2020-04-19|Aitkin|Minnesota|27001|    1|     0|
|2020-04-20|Aitkin|Minnesota|27001|    1|     0|
|2020-04-21|Aitkin|Minnesota|27001|    1|     0|
|2020-04-22|Aitkin|Minnesota|27001|    1|     0|
|2020-04-23|Aitkin|Minnesota|27001|    1|     0|
|2020-04-24|Aitkin|Minnesota|27001|    1|     0|
|2020-04-25|Aitkin|Minnesota|27001|    1|     0|
|2020-04-26|Aitkin|Minnesota|27001|    1|     0|
+----------+------+---------+-----+-----+------+



In [0]:
# replace St. Louis with Saint Louis
mn_counties = mn_counties.withColumn('county', regexp_replace('county', 'St. Louis', 'Saint Louis'))

In [0]:
def fix_spelling(str):
    corrections = {'Aikin':'Aitkin',
                   'Carlson':'Carlton',
                  'Penningham':'Pennington',
                  'St Louis':'Saint Louis',
                  'St. Louis':'Saint Louis'
                  }
    if str.startswith('Olmsted'):
        return 'Olmsted'
    if str in corrections:
        return corrections[str]
    else:
        return str
    
def remap_district(str):
    corrections = {'Cass':'3',
                   'Koochiching':'1',
                   'Itasca':'1'}
    if str in corrections:
        return corrections[str]
    else:
        return '0'
    
fix_spellingUDF = udf(lambda s: fix_spelling(s),StringType())
remap_districtUDF = udf(lambda s: remap_district(s),StringType())
    
county_district = county_district.withColumn("county", fix_spellingUDF(col("county"))) \
                                 .withColumn("MnDOT district(s)", when(remap_districtUDF(col("county")) != "0", remap_districtUDF(col("county"))) \
                                                                    .otherwise(col("MnDOT district(s)")))
county_district.display()

county,MnDOT district(s)
Aitkin,1
Anoka,Metro
Becker,4
Beltrami,2
Benton,3
Big Stone,4
Blue Earth,7
Brown,7
Carlton,1
Carver,Metro


In [0]:
# reformat dates for wim_volume in 2021 and 2022
windowSpec = Window.partitionBy("station_id","dir_of_travel","lane_of_travel").orderBy("new_date")
test = {}
for year in [2021,2022]:
    tmp = wim_volume[year]
    tmp = tmp.orderBy("date") \
        .withColumn("date", to_date(col("date"),"M/d/yyyy"))
    wim_volume[year] = tmp

### Aggregation
Now that data has been prepped, it needs to be aggregated so that there are singular datasets for WIM volumes by date, county, and district. The steps to do that are:
1. Combine all years of WIM volume into a single DataFrame
2. Combine all lanes and directions for each station and date so there is one row per station per date
3. Sum across all hours of each day for each station and date

### Joining
1. Join current and retired stations and apply fix_spelling function to make joining with county_district frame possible
2. Join each station / date row to the appropriate county in the joined station frame
3. Join each station / date / county row to the appropriate district in the county_distric frame

In [0]:
# combine all years of WIM volume
all_wim = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema=wim_volume_schema)

for year in wim_volume:
    all_wim = all_wim.unionByName(wim_volume[year])

In [0]:
all_wim.limit(10).show()

+----------+-------------+--------------+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|station_id|dir_of_travel|lane_of_travel|      date|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24|
+----------+-------------+--------------+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|         8|            3|             0|2017-01-01|  0|  3|  1|  1|  0|  0|  2|  2|  2|  1|  0| 12|  9|  8|  4|  8|  5|  4|  6|  6|  3|  2|  1|  0|
|         8|            7|             0|2017-01-01|  2|  0|  0|  1|  1|  2|  0|  3|  3|  4|  8|  1|  6|  8|  3|  4|  2|  2|  5|  3|  5|  0|  1|  0|
|         8|            3|             0|2017-01-02|  0|  0|  1|  0|  0|  0|  2|  7|  8|  5|  7| 11|  8|  6| 15|  7|  8|  6| 10|  4|  4|  5|  1|  0|
|         8|            7|             0|2017-01-02|  0|  0|  0|  0|  0|  0|  2| 11| 10|  4|  5|  7| 11|  

In [0]:
# Aggregate by station
exprs = {str(i): "sum" for i in range(1,25)}
all_wim = all_wim.groupBy("station_id","date").agg(exprs)
all_wim.filter(col("station_id") == 51).limit(10).show()

+----------+----------+-------+------+-------+-------+------+-------+-------+------+-------+-------+-------+-------+------+-------+-------+------+------+-------+-------+-------+------+-------+------+------+
|station_id|      date|sum(12)|sum(8)|sum(19)|sum(23)|sum(4)|sum(15)|sum(11)|sum(9)|sum(22)|sum(13)|sum(24)|sum(16)|sum(5)|sum(10)|sum(21)|sum(6)|sum(1)|sum(17)|sum(14)|sum(20)|sum(2)|sum(18)|sum(7)|sum(3)|
+----------+----------+-------+------+-------+-------+------+-------+-------+------+-------+-------+-------+-------+------+-------+-------+------+------+-------+-------+-------+------+-------+------+------+
|        51|2017-06-22|     59|    32|     53|     16|     0|     65|     68|    44|     30|     64|      5|     81|     0|     52|     33|     5|     1|     75|     62|     37|     1|     58|    16|     2|
|        51|2017-08-02|     62|    22|     57|      8|     4|     55|     52|    51|     20|     68|     10|     58|     0|     54|     35|     4|     2|     68|     62|   

In [0]:
# Aggregate days
aggregated_wim = all_wim.withColumn("totalVolume", reduce(add, [col("sum("+str(i)+")") for i in range(1,25)])).select("station_id","date","totalVolume")
aggregated_wim.limit(10).show()

+----------+----------+-----------+
|station_id|      date|totalVolume|
+----------+----------+-----------+
|        51|2017-06-22|        859|
|        51|2017-08-02|        834|
|        51|2017-09-29|        922|
|       103|2017-05-03|      50930|
|       103|2017-08-05|      49951|
|       103|2017-11-30|      49811|
|       110|2017-05-20|      25010|
|       110|2017-06-17|      25817|
|       164|2017-01-10|       6222|
|       164|2017-03-27|       7634|
+----------+----------+-----------+



In [0]:
aggregated_wim.filter(col("station_id").isNull()).display()

station_id,date,totalVolume


In [0]:
# Join current and retired stations only keeping one row for each id
# To keep one row, order by Status on retired stations, which will filter by retired date
# Keep only the retired stations with ids not in use by current stations, and union the two
filtered_retired_stations = retired_stations.orderBy("Status").dropDuplicates(["Continuous Number"]) \
                            .join(retired_stations.select("Continuous Number").subtract(current_stations.select("Continuous Number")),"Continuous Number","leftouter")
all_stations = current_stations.select("Continuous Number",col("County Name").alias("County")) \
                .unionByName(filtered_retired_stations.select("Continuous Number","County")) \
                .withColumn("County", fix_spellingUDF(col("County")))
all_stations.display()

Continuous Number,County
26,Steele
27,Watonwan
28,Stearns
29,Saint Louis
30,Lake
31,Polk
32,Olmsted
33,Renville
34,Chippewa
35,Clearwater


In [0]:
county_district.display()

county,MnDOT district(s)
Aitkin,1
Anoka,Metro
Becker,4
Beltrami,2
Benton,3
Big Stone,4
Blue Earth,7
Brown,7
Carlton,1
Carver,Metro


In [0]:
# join station number and county to district
station_county_district = all_stations.join(county_district,all_stations["County"] == county_district["county"],"left").select("Continuous Number",county_district["County"],"MnDOT district(s)")
station_county_district.limit(10).show()

+-----------------+-----------+-----------------+
|Continuous Number|     County|MnDOT district(s)|
+-----------------+-----------+-----------------+
|               26|     Steele|                6|
|               27|   Watonwan|                7|
|               28|    Stearns|                3|
|               29|Saint Louis|                1|
|               30|       Lake|                1|
|               31|       Polk|                2|
|               32|    Olmsted|                6|
|               33|   Renville|                8|
|               34|   Chippewa|                8|
|               35| Clearwater|                2|
+-----------------+-----------+-----------------+



At this point there are some stations that have volume data that are not listed among the current or retired stations. Some of the stations listed as active also do not show up in any of the traffic volume reports. Unfortunately the missing data is not readily available, so an inner join will be used keeping only rows that have a station id, county, district, date, and volume data.

In [0]:
# join aggregated WIM to county
wim_joined = station_county_district.join(aggregated_wim,col("Continuous Number") == col("station_id"),"inner")
wim_joined.limit(10).show()

+-----------------+------+-----------------+----------+----------+-----------+
|Continuous Number|County|MnDOT district(s)|station_id|      date|totalVolume|
+-----------------+------+-----------------+----------+----------+-----------+
|               31|  Polk|                2|        31|2022-02-17|       4996|
|               31|  Polk|                2|        31|2022-02-09|       3708|
|               31|  Polk|                2|        31|2022-01-26|       3826|
|               31|  Polk|                2|        31|2022-01-21|       3073|
|               31|  Polk|                2|        31|2021-04-18|       3550|
|               31|  Polk|                2|        31|2021-04-03|       3872|
|               31|  Polk|                2|        31|2021-04-01|       5219|
|               31|  Polk|                2|        31|2021-02-25|       4633|
|               31|  Polk|                2|        31|2021-06-25|       6632|
|               31|  Polk|                2|        

In [0]:
# confirm there are no null values at this point
wim_joined.select([count(when(col(c).isNull() , c)).alias(c) for c in wim_joined.columns]).show()

+-----------------+------+-----------------+----------+----+-----------+
|Continuous Number|County|MnDOT district(s)|station_id|date|totalVolume|
+-----------------+------+-----------------+----------+----+-----------+
|                0|     0|                0|         0|   0|          0|
+-----------------+------+-----------------+----------+----+-----------+



In [0]:
# rename and drop unnecessary column
wim_joined = wim_joined.drop("Continuous Number").withColumnRenamed("MnDOT district(s)","district").withColumnRenamed("County","county")
print(wim_joined.columns)

['county', 'district', 'station_id', 'date', 'totalVolume']


In [0]:
print("Total rows of Traffic Volume data:",wim_joined.count())

Total rows of Traffic Volume data: 111832


#### State Volume Data
A much simpler task is to aggregate the county data per date into a statewide Traffic Volume total.

In [0]:
state_wim = wim_joined.groupBy("date").agg(colsum(col("totalVolume")).alias("totalVolume")).select("date", "totalVolume")
state_wim.limit(20).show()

+----------+-----------+
|      date|totalVolume|
+----------+-----------+
|2020-02-26|     704910|
|2019-08-23|     851141|
|2020-04-13|     391162|
|2019-08-08|     742763|
|2019-08-22|     781921|
|2021-11-03|    1049892|
|2017-12-05|    1806043|
|2017-05-14|    2010971|
|2019-08-31|     654566|
|2019-04-25|     962722|
|2019-09-29|     548622|
|2020-06-24|     426759|
|2018-12-31|     439755|
|2017-12-22|    2475663|
|2017-10-20|    2565656|
|2017-05-11|    2415528|
|2017-02-24|    2026335|
|2020-06-08|     610815|
|2020-09-12|     557259|
|2021-04-06|    1204418|
+----------+-----------+



### MN County COVID Data
Back to the `mn_counties` DataFrame: now that all of the county and district mappings are complete that can be joined and written out to a table.

In [0]:
mn_counties_districts_covid = mn_counties.join(county_district,mn_counties["county"] == county_district["County"],"left").drop(county_district["county"]).drop("fips").withColumnRenamed("MnDOT district(s)","district")
print("Check Null Counts")
mn_counties_districts_covid.select([count(when(col(c).isNull() , c)).alias(c) for c in mn_counties_districts_covid.columns]).show()

Check Null Counts
+----+------+-----+-----+------+--------+
|date|county|state|cases|deaths|district|
+----+------+-----+-----+------+--------+
|   0|     0|    0|    0|     0|       0|
+----+------+-----+-----+------+--------+



## All County COVID Data
Lastly, the COVID case and death totals by county that were cleaned up earlier can be aggregated into average counts by county for each day. There is too much data to look at each county individually, but it may be useful to compare counties in Minnesota to the average of all counties in the US.

In [0]:
average_county_covid_schema = StructType([StructField("date",StringType(),True),
                                          StructField("cases",FloatType(),True),
                                          StructField("deaths",FloatType(),True)])

average_county_covid = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema=average_county_covid_schema)

for year in counties:
    tmp = counties[year].drop('fips')
    average_county_covid = average_county_covid.unionByName(tmp.groupBy("date").agg(mean(col("cases")).alias("cases"), mean(col("deaths")).alias("deaths")).select("date","cases","deaths"))

average_county_covid.limit(20).show()

+----------+------------------+-------------------+
|      date|             cases|             deaths|
+----------+------------------+-------------------+
|2020-02-26|               3.0|                0.0|
|2020-04-13|216.66403903903904|   9.91891891891892|
|2020-04-12| 208.0392304790645|  9.294983025273481|
|2020-02-13|1.3636363636363635|                0.0|
|2020-04-20| 283.4368861404147| 15.140778464896325|
|2020-01-22|               1.0|                0.0|
|2020-04-22| 300.9598698481562| 16.849240780911064|
|2020-03-16| 9.537313432835822|0.19402985074626866|
|2020-04-19| 274.2048851622311| 14.511483776886621|
|2020-03-03|3.6176470588235294|0.29411764705882354|
|2020-03-06| 4.219178082191781| 0.2054794520547945|
|2020-03-26| 50.71804281345566| 0.8042813455657493|
|2020-02-04|             1.375|                0.0|
|2020-02-05|1.3333333333333333|                0.0|
|2020-03-05| 4.708333333333333|               0.25|
|2020-03-02|            3.1875|             0.1875|
|2020-04-25|

## Export
At the end of this notebook, the data sources available for modeling are:
* Minnesota Traffic Volume by Date / WIM Station (wim_joined, columns={Date, County, District, StationID, TotalVolume})
* Minnesota Total Traffic Volume by Date (state_wim, columns={Date,TotalVolume})
* Minnesota COVID cases / deaths by county by date (mn_counties_districts_covid, columns={Date, County, State, District, Cases, Deaths})
* Average COVID cases per county by date (average_county_covid, columns={date,cases,deaths})
* Total US cases by date (unmodified us_cases, columns={date,cases,deaths})
* Total cases per state by date (unmodified state_cases, columns={date,state,fips,cases,deaths})

Each will be written to a parquet table for use in the Modeling notebook.

In [0]:
# check schemas
print(wim_joined.schema)
print(state_wim.schema)
print(mn_counties_districts_covid.schema)
print(average_county_covid.schema)
print(us_cases.schema)
print(state_cases.schema)

StructType(List(StructField(county,StringType,true),StructField(district,StringType,true),StructField(station_id,IntegerType,true),StructField(date,StringType,true),StructField(totalVolume,LongType,true)))
StructType(List(StructField(date,StringType,true),StructField(totalVolume,LongType,true)))
StructType(List(StructField(date,StringType,true),StructField(county,StringType,true),StructField(state,StringType,true),StructField(cases,IntegerType,true),StructField(deaths,IntegerType,true),StructField(district,StringType,true)))
StructType(List(StructField(date,StringType,true),StructField(cases,DoubleType,true),StructField(deaths,DoubleType,true)))
StructType(List(StructField(date,StringType,true),StructField(cases,IntegerType,true),StructField(deaths,IntegerType,true)))
StructType(List(StructField(date,StringType,true),StructField(state,StringType,true),StructField(fips,IntegerType,true),StructField(cases,IntegerType,true),StructField(deaths,IntegerType,true)))


In [0]:
%sql
drop table if exists county_wim_2017_2022;
drop table if exists state_wim_2017_2022;
drop table if exists mn_county_covid_2020_2022;
drop table if exists average_county_covid_2020_2022;
drop table if exists total_us_covid_2020_2022;
drop table if exists total_state_covid_2020_2022;
drop table if exists mn_covid_timeline;

In [0]:
dbutils.fs.rm("dbfs:/user/hive/warehouse/", True)
wim_joined.write.format("parquet").saveAsTable("county_wim_2017_2022")
state_wim.write.format("parquet").saveAsTable("state_wim_2017_2022")
mn_counties_districts_covid.write.format("parquet").saveAsTable("mn_county_covid_2020_2022")
average_county_covid.write.format("parquet").saveAsTable("average_county_covid_2020_2022")
us_cases.write.format("parquet").saveAsTable("total_us_covid_2020_2022")
state_cases.write.format("parquet").saveAsTable("total_state_covid_2020_2022")
timeline.write.format("parquet").saveAsTable("mn_covid_timeline")

In [0]:
%sql
show tables;

database,tableName,isTemporary
default,average_county_covid_2020_2022,False
default,county_wim_2017_2022,False
default,mn_county_covid_2020_2022,False
default,mn_covid_timeline,False
default,state_wim_2017_2022,False
default,total_state_covid_2020_2022,False
default,total_us_covid_2020_2022,False
,2017_wim_atr_volume,True
,2018_wim_atr_volume,True
,2019_wim_atr_volume,True
