# etl_us_data.ipynb

Acquire a copy of the latest COVID-19 time series data and write that
data out into local CSV files in a format amenable to analysis with 
Pandas dataframes.

Input data sources:
* [2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository by Johns Hopkins CSSE](https://github.com/CSSEGISandData/COVID-19).

Output files produced:
* `data/us_counties.csv`: County-level time series data for the United States
* `data/us_counties_meta.json`: Column type metadata for reading `data/us_counties.csv` with `pd.read_csv()`

To read these files back in, use `pd.read_csv()`:
```python
with open("data/us_counties_meta.json") as f:
    cases_meta = json.load(f)
cases_meta["Date"] = "object"  # Workaround for pd.read_csv() not supporting parsing datetime64
cases_raw = pd.read_csv("../data/us_counties.csv", dtype=cases_meta, parse_dates=["Date"])
cases = cases_raw.set_index(["FIPS", "Date"], verify_integrity=True)
```


In [1]:
# Initialization boilerplate

# Ensure a consistent Python environment.
import sys
sys.path.append("..")  # Local libraries are in the directory above "notebooks"
import env
env.maybe_install_libs()

# Import Python packages that this notebook uses.
import numpy as np
import pandas as pd
from urllib.request import urlopen
import json
from datetime import datetime, date

# URLs for downloading the time series data directly from Github
_JH_BASE_URL = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/" + \
               "csse_covid_19_data/csse_covid_19_time_series/"
_JH_CONFIRMED_URL = _JH_BASE_URL + "time_series_covid19_confirmed_US.csv"
_JH_DEATHS_URL = _JH_BASE_URL + "time_series_covid19_deaths_US.csv"

# Currently there are no data on recovered patients for the US.
# _JH_RECOVERED_URL = _JH_BASE_URL + "time_series_covid19_recovered_US.csv"


# First date present in the data set, and the format of these dates. 
# Hopefully this won't change as new data are added.
# NOTE: One file uses "01/22/20" and the other uses "1/22/20"
# so you need to filter with endswith() to find matches.
_FIRST_DATE_SUFFIX = "1/22/20"
_DATE_FORMAT = "%m/%d/%y"

In [23]:
# Read the raw data from Github
raw_confirmed = pd.read_csv(_JH_CONFIRMED_URL)
raw_deaths = pd.read_csv(_JH_DEATHS_URL)

# No "recovered" time series at the moment. Generate an empty
# time series the schema from the deaths
raw_recovered = raw_deaths.copy(deep=True)
for i in range(len(raw_recovered.columns)):
    if str(raw_recovered.columns[i]).endswith(_FIRST_DATE_SUFFIX):
        ts_start_index = i
        break
for c in raw_recovered.columns[ts_start_index:]:
    raw_recovered[c] = 0

raw_confirmed

Unnamed: 0,UID,iso2,iso3,code3,FIPS,Admin2,Province_State,Country_Region,Lat,Long_,...,4/18/20,4/19/20,4/20/20,4/21/20,4/22/20,4/23/20,4/24/20,4/25/20,4/26/20,4/27/20
0,16.0,AS,ASM,16,60.0,,American Samoa,US,-14.271000,-170.132000,...,0,0,0,0,0,0,0,0,0,0
1,316.0,GU,GUM,316,66.0,,Guam,US,13.444300,144.793700,...,136,136,136,136,136,139,141,141,141,141
2,580.0,MP,MNP,580,69.0,,Northern Mariana Islands,US,15.097900,145.673900,...,14,14,14,14,14,14,14,14,14,14
3,630.0,PR,PRI,630,72.0,,Puerto Rico,US,18.220800,-66.590100,...,1118,1213,1252,1298,1252,1416,1276,1307,1371,1389
4,850.0,VI,VIR,850,78.0,,Virgin Islands,US,18.335800,-64.896300,...,53,53,53,53,54,54,54,55,57,57
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3257,84070017.0,US,USA,840,,Southeast Utah,Utah,US,38.996171,-110.701396,...,0,6,7,7,7,7,8,11,12,13
3258,84070018.0,US,USA,840,,Southwest Utah,Utah,US,37.854472,-111.441876,...,0,66,70,70,70,76,81,83,87,89
3259,84070019.0,US,USA,840,,TriCounty,Utah,US,40.124915,-109.517442,...,0,10,10,10,9,9,9,9,10,11
3260,84070020.0,US,USA,840,,Weber-Morgan,Utah,US,41.271160,-111.914512,...,0,119,124,126,130,136,140,143,145,148


In [24]:
# Some of the FIPS codes contain NA's. Remove those data points. TODO fred review (romeo)
raw_confirmed = raw_confirmed[~raw_confirmed["FIPS"].isna()]
raw_deaths = raw_deaths[~raw_deaths["FIPS"].isna()]
raw_recovered = raw_recovered[~raw_recovered["FIPS"].isna()]


# For some reason, the FIPS codes are encoded as floats. Fix that.
for df in [raw_confirmed, raw_deaths, raw_recovered]:
    df["FIPS"] = df["FIPS"].astype("Int64")
    
raw_confirmed

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if __name__ == '__main__':


Unnamed: 0,UID,iso2,iso3,code3,FIPS,Admin2,Province_State,Country_Region,Lat,Long_,...,4/18/20,4/19/20,4/20/20,4/21/20,4/22/20,4/23/20,4/24/20,4/25/20,4/26/20,4/27/20
0,16.0,AS,ASM,16,60,,American Samoa,US,-14.2710,-170.1320,...,0,0,0,0,0,0,0,0,0,0
1,316.0,GU,GUM,316,66,,Guam,US,13.4443,144.7937,...,136,136,136,136,136,139,141,141,141,141
2,580.0,MP,MNP,580,69,,Northern Mariana Islands,US,15.0979,145.6739,...,14,14,14,14,14,14,14,14,14,14
3,630.0,PR,PRI,630,72,,Puerto Rico,US,18.2208,-66.5901,...,1118,1213,1252,1298,1252,1416,1276,1307,1371,1389
4,850.0,VI,VIR,850,78,,Virgin Islands,US,18.3358,-64.8963,...,53,53,53,53,54,54,54,55,57,57
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3249,84090054.0,US,USA,840,90054,Unassigned,West Virginia,US,0.0000,0.0000,...,0,0,0,0,0,0,0,0,0,0
3250,84090055.0,US,USA,840,90055,Unassigned,Wisconsin,US,0.0000,0.0000,...,0,0,0,0,0,0,0,0,0,0
3251,84090056.0,US,USA,840,90056,Unassigned,Wyoming,US,0.0000,0.0000,...,0,0,0,0,0,0,0,0,0,0
3252,84099999.0,US,USA,840,99999,,Grand Princess,US,0.0000,0.0000,...,103,103,103,103,103,103,103,103,103,103


In [None]:
# Filter down to just U.S. counties
def counties_df(df):
    return df[(df["FIPS"] >= 1000)  # Territories have FIPS codes < 1000
              & (~df["Admin2"].isna())  # Countries don't have the "Admin2" field set
              & (df["Admin2"] != "Unassigned")  # States have Admin2 set to "Unassigned"
              & (df["FIPS"] <= 60000)  # Expatriates are coded by state in values > 80k
              ].copy()

county_confirmed = counties_df(raw_confirmed)
county_deaths = counties_df(raw_deaths)
county_recovered = counties_df(raw_recovered)

county_confirmed

In [None]:
# The time series in the raw data are spread across multiple columns.
# Rotate them by 90 degrees so that they are spread across rows.

def shred_time_series(df: pd.DataFrame, colname: str):
    """
    Turn a time series encoded as a range of columns into a time series
    encoded as a range of rows.
    
    This function hard-codes the column name mapping for Johns Hopkins 
    data, so it will only work on that data.
    
    :param df: Dataframe with a time series across the columns of each row,
     with an additional outer join indicator column at the very end.
    :param colname: Name of the new column where the time series should go
    
    :returns: A dataframe with one time series element per row.
     The returned dataframe will have a column called "Date" with the date
     of each time series element, and a column with the name `colname` with
     the associated value for each date.
    """
    for i in range(len(df.columns)):
        if str(df.columns[i]).endswith(_FIRST_DATE_SUFFIX):
            ts_start_index = i
            break
    
    # Note the -1 index to strip off the outer join indicator variable
    ts_matrix = df[df.columns[ts_start_index:-1]].values
    ts_lists = ts_matrix.tolist()

    date_list = [datetime.strptime(s, _DATE_FORMAT) for s in df.columns[ts_start_index:-1]]

    # Create a new dataframe where the time series is a list
    nested_df = df[df.columns[:ts_start_index]].copy()
    nested_df[colname] = ts_lists

    # Expand out the list and add the dates back.
    flat_df = nested_df.explode(colname)
    flat_df["Date"] = date_list * len(nested_df.index)
    return flat_df

shredded_confirmed = shred_time_series(county_confirmed, "Confirmed")
shredded_deaths = shred_time_series(county_deaths, "Deaths")
shredded_recovered = shred_time_series(county_recovered, "Recovered")
shredded_confirmed

In [None]:
# Combine the three time series into a single table.

# Sort by FIPS code and Date and clean up some columns that don't match
# perfectly.
sorted_deaths = shredded_deaths.sort_values(["FIPS", "Date"])
sorted_confirmed = shredded_confirmed.sort_values(["FIPS", "Date"])
sorted_recovered = shredded_recovered.sort_values(["FIPS", "Date"])

# The "confirmed" time series is missing the "population" column that is
# present in the "deaths" and "recovered" time series.
# Add it back in.
sorted_confirmed["Population"] = sorted_deaths["Population"]

# The floating point numbers in the "Lat" and "Long_" fields also have
# some discrepancies due to rounding error. Use the values in the 
# "confirmed" time series as the gold standard.
sorted_deaths["Lat"] = sorted_confirmed["Lat"]
sorted_deaths["Long_"] = sorted_confirmed["Long_"]
sorted_recovered["Lat"] = sorted_confirmed["Lat"]
sorted_recovered["Long_"] = sorted_confirmed["Long_"]

# Now we can combine the three time series into a single table
combined = (
    sorted_confirmed
    .merge(sorted_deaths, how="outer")
    .merge(sorted_recovered, how="outer"))

# Check for missing data
missing_rows = combined[combined["Confirmed"].isna()]
if len(missing_rows.index) > 0:
    raise ValueError(f"Missing 'Confirmed' time series data for the following rows:\n{missing_rows}")

combined  

In [None]:
# The outer joins in the previous cell convert some of the integer
# columns to object types. Fix that.

# Data types before:
combined.dtypes

In [None]:
combined["iso2"] = combined["iso2"].astype("string")
combined["iso3"] = combined["iso3"].astype("string")
combined["Admin2"] = combined["Admin2"].astype("string")
combined["Province_State"] = combined["Province_State"].astype("string")
combined["Country_Region"] = combined["Country_Region"].astype("string")
combined["Combined_Key"] = combined["Combined_Key"].astype("string")

combined["Confirmed"] = combined["Confirmed"].astype(np.int64)
combined["Deaths"] = combined["Deaths"].astype(np.int64)
combined["Recovered"] = combined["Recovered"].astype(np.int64)

# Data types after:
combined.dtypes

In [None]:
# Massage the column names a bit and drop unnecessary columns
to_retain = combined[["Date", "FIPS", "Province_State", "Admin2", 
                      "Population",
                      "Confirmed", "Deaths", "Recovered"]]
to_write = to_retain.rename(columns={
    "Province_State": "State",
    "Admin2": "County"
})
to_write

In [None]:
# Write the data out to a CSV file + a JSON file of type info.
to_write.to_csv("../data/us_counties.csv", index=False)
col_type_mapping = {
    key: str(value) for key, value in to_write.dtypes.iteritems()
}
with open("../data/us_counties_meta.json", "w") as f:
    json.dump(col_type_mapping, f)

In [None]:
!ls -lh ../data/us_counties*