In [3]:
import logging
import os
import sys

from dotenv import load_dotenv

In [2]:
# Set up logging
logging.basicConfig(
    format='%(asctime)s %(name)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)

In [3]:
# Load credentials from .env file, if needed
if not os.getenv('PG_CONN_STRING'):
    load_dotenv()
PG_CONN_STRING = os.getenv('PG_CONN_STRING')

In [4]:
PG_CONN_STRING

In [5]:
# Parse command line options and arguments
logger.info('Parsing command...')

2023-05-20 22:16:42 __main__ INFO     Parsing command...


In [6]:

list_ = [ '-name',
          'nyt_cases_counties',
          'https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv',
          'bitdotio/simple_pipeline.cases_counties']

opts = [opt[1:] for opt in list_ if opt.startswith("-")]

local_source = 'local_source' in opts
validate_data = 'validate_data' in opts

In [7]:

opts = [opt for opt in opts if opt not in ['local_source', 'validate_data']]
args = [arg for arg in list_ if not arg.startswith("-")]

In [8]:
if len(args) != len(opts) + 2:
    raise ValueError("At least one argument is missing, check the README.")

In [9]:
args[-2:]

['https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv',
 'bitdotio/simple_pipeline.cases_counties']

In [10]:
# Set up local variables
source, destination = args[-2:]

In [11]:
source

'https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv'

In [12]:
destination

'bitdotio/simple_pipeline.cases_counties'

In [13]:
opt_args = args[:-2]

In [14]:
opt_args

['nyt_cases_counties']

In [15]:
option_args = dict(zip(opts, opt_args))

In [16]:
option_args

{'name': 'nyt_cases_counties'}

In [17]:
# Execute ETL
logger.info('Starting ETL...')

2023-05-20 22:16:43 __main__ INFO     Starting ETL...


In [18]:
logger.info('Starting extract...')

2023-05-20 22:16:43 __main__ INFO     Starting extract...


In [19]:
src = source
dest = destination
local_src = local_source
validate_data = local_source
options = option_args

In [20]:
print("1:",src)
print("2:",dest)
print("3:",local_src)
print("4:",validate_data)
print("5:",options)

1: https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv
2: bitdotio/simple_pipeline.cases_counties
3: False
4: False
5: {'name': 'nyt_cases_counties'}


In [21]:
if local_src:
    df = extract.csv_from_local(src)
else:
    df = extract.csv_from_get_request(src)    

NameError: name 'extract' is not defined

In [None]:
# EXTRACT data

In [4]:
import io
import logging

import pandas as pd
import requests

In [5]:
#url = src
url='https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv'
r = requests.get(url, timeout=5)
data = r.content.decode('utf-8')
df =  pd.read_csv(io.StringIO(data), low_memory=False)

In [6]:
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1,0.0
3,2020-01-24,Cook,Illinois,17031.0,1,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1,0.0


In [25]:
ls

Covid-ETL.ipynb  [31mextract.py[m[m*      main.py          transform.py
[34m__pycache__[m[m/     load.py          [34msimple-pipeline[m[m/ validate.py


In [26]:
path = 'simple-pipeline/simple_pipeline/acs_5yr_population_data.csv'
df_2 = pd.read_csv(path, low_memory=False)
df_2.head()

Unnamed: 0,GEO_ID,NAME,S0101_C01_001E,S0101_C01_001M,S0101_C01_002E,S0101_C01_002M,S0101_C01_003E,S0101_C01_003M,S0101_C01_004E,S0101_C01_004M,...,S0101_C06_034E,S0101_C06_034M,S0101_C06_035E,S0101_C06_035M,S0101_C06_036E,S0101_C06_036M,S0101_C06_037E,S0101_C06_037M,S0101_C06_038E,S0101_C06_038M
0,id,Geographic Area Name,Estimate!!Total!!Total population,Margin of Error!!Total!!Total population,Estimate!!Total!!Total population!!AGE!!Under ...,Margin of Error!!Total!!Total population!!AGE!...,Estimate!!Total!!Total population!!AGE!!5 to 9...,Margin of Error!!Total!!Total population!!AGE!...,Estimate!!Total!!Total population!!AGE!!10 to ...,Margin of Error!!Total!!Total population!!AGE!...,...,Estimate!!Percent Female!!Total population!!SU...,Margin of Error!!Percent Female!!Total populat...,Estimate!!Percent Female!!Total population!!SU...,Margin of Error!!Percent Female!!Total populat...,Estimate!!Percent Female!!Total population!!SU...,Margin of Error!!Percent Female!!Total populat...,Estimate!!Percent Female!!Total population!!PE...,Margin of Error!!Percent Female!!Total populat...,Estimate!!Percent Female!!Total population!!PE...,Margin of Error!!Percent Female!!Total populat...
1,0500000US01001,"Autauga County, Alabama",55380,*****,3217,107,3814,352,3600,350,...,(X),(X),(X),(X),(X),(X),(X),(X),(X),(X)
2,0500000US01003,"Baldwin County, Alabama",212830,*****,11689,30,12058,831,14262,859,...,(X),(X),(X),(X),(X),(X),(X),(X),(X),(X)
3,0500000US01005,"Barbour County, Alabama",25361,*****,1349,26,1622,171,1422,170,...,(X),(X),(X),(X),(X),(X),(X),(X),(X),(X)
4,0500000US01007,"Bibb County, Alabama",22493,*****,1315,170,1219,270,1132,263,...,(X),(X),(X),(X),(X),(X),(X),(X),(X),(X)


In [27]:
# TRANSFORM data

In [28]:
options

{'name': 'nyt_cases_counties'}

In [29]:
if 'name' in options:
    print( options['name'])

nyt_cases_counties


In [30]:
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1,0.0
3,2020-01-24,Cook,Illinois,17031.0,1,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1,0.0


In [31]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2502832 entries, 0 to 2502831
Data columns (total 6 columns):
 #   Column  Dtype  
---  ------  -----  
 0   date    object 
 1   county  object 
 2   state   object 
 3   fips    float64
 4   cases   int64  
 5   deaths  float64
dtypes: float64(2), int64(1), object(3)
memory usage: 114.6+ MB


In [32]:
df['date'] = pd.to_datetime(df['date'])

In [33]:
df['county'].value_counts()

Washington                   24215
Unknown                      21401
Jefferson                    20291
Franklin                     19457
Jackson                      18567
                             ...  
Esmeralda                      547
Loving                         543
Kalawao                        520
Rota                           114
Pending County Assignment        5
Name: county, Length: 1932, dtype: int64

In [42]:
min(df['date'])

Timestamp('2020-01-21 00:00:00')

In [35]:
max(df['date'])

Timestamp('2022-05-13 00:00:00')

In [36]:
df[df['county'] == 'Unknown']

Unnamed: 0,date,county,state,fips,cases,deaths
418,2020-03-01,Unknown,Rhode Island,,2,0.0
450,2020-03-02,Unknown,Rhode Island,,2,0.0
485,2020-03-03,Unknown,Rhode Island,,2,0.0
522,2020-03-04,Unknown,Rhode Island,,2,0.0
569,2020-03-05,Unknown,Rhode Island,,2,0.0
...,...,...,...,...,...,...
2501996,2022-05-13,Unknown,Rhode Island,,31231,17.0
2502197,2022-05-13,Unknown,Tennessee,,19465,306.0
2502485,2022-05-13,Unknown,Utah,,4044,92.0
2502502,2022-05-13,Unknown,Vermont,,1496,2.0


In [37]:
df = df.loc[(df['county'] != 'Unknown') & (df['fips'].notnull())].copy()

In [38]:
#check if partial string 'Eas' exists in conference column
#df['fips'].str.contains("[A-Za-z]").any()

In [43]:
df['fips'].head()

0    53061
1    53061
2    53061
3    17031
4    53061
Name: fips, dtype: object

In [44]:
df['fips'] = df['fips'].astype(str).str.extract('(^[^/.]*).*', expand=False).str.zfill(5)

In [45]:
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061,1,0.0
1,2020-01-22,Snohomish,Washington,53061,1,0.0
2,2020-01-23,Snohomish,Washington,53061,1,0.0
3,2020-01-24,Cook,Illinois,17031,1,0.0
4,2020-01-24,Snohomish,Washington,53061,1,0.0


In [46]:
df[df['deaths'] == 1].head()

Unnamed: 0,date,county,state,fips,cases,deaths
394,2020-02-29,King,Washington,53033,4,1.0
501,2020-03-04,Placer,California,6061,2,1.0
540,2020-03-05,Placer,California,6061,2,1.0
590,2020-03-06,Placer,California,6061,5,1.0
606,2020-03-06,Lee,Florida,12071,1,1.0


In [114]:
df = df[df['fips'].str.slice(0,2) <= '56'].copy()
df['deaths'] = df['deaths'].astype(int)

In [50]:
# cases_vs_deaths
"""Checks that death count is no more than case count."""
(df['deaths'] <= df['cases']).all()

False

In [133]:
# unique_records
"""Checks that each date and FIPs combination is unique."""
df[['date', 'fips']].drop_duplicates().shape[0] == df.shape[0]

False

In [134]:
# no_nulls_test
"""Checks that each date and FIPs combination is unique."""
df.isnull().values.sum() == 0

True

In [136]:
def range_test(series, min, max):
    """Checks that all values in a series are within a range, inclusive"""
    return (series >= min).all() and (series <= max).all() 

In [139]:
# cases_range_test
"""Checks that all cases are non-negative and <= 10M"""

range_test(df['cases'], 0, 10e6)

True

In [140]:
# deaths_range_test(df):
"""Checks that all deaths are non-negative and <= 100K"""

range_test(df['deaths'], 0, 1e5)

True

In [143]:
def cases_vs_deaths(df):
    """Checks that death count is no more than case count."""
    return (df['deaths'] <= df['cases']).all()


def unique_records(df):
    """Checks that each date and FIPs combination is unique."""
    return df[['date', 'fips']].drop_duplicates().shape[0] == df.shape[0]


def no_nulls_test(df):
    """Checks that all elements are not null"""
    return df.isnull().values.sum() == 0


def range_test(series, min, max):
    """Checks that all values in a series are within a range, inclusive"""
    return (series >= min).all() and (series <= max).all() 


def cases_range_test(df):
    """Checks that all cases are non-negative and <= 10M"""
    return range_test(df['cases'], 0, 10e6)


def deaths_range_test(df):
    """Checks that all deaths are non-negative and <= 100K"""
    return range_test(df['deaths'], 0, 1e5)

In [144]:
nyt_cases_counties = [
    (cases_vs_deaths, "Death counts cannot exceed case counts."),
    (unique_records, "Only one record per FIPs, per date allowed."),
    (no_nulls_test, "All values are expected to be non-null."),
    (cases_range_test, "Cases must be non-negative and <= 10M."),
    (deaths_range_test, "Deaths must be non-negative and <= 100K.")
]

In [145]:
nyt_cases_counties

[(<function __main__.cases_vs_deaths(df)>,
  'Death counts cannot exceed case counts.'),
 (<function __main__.unique_records(df)>,
  'Only one record per FIPs, per date allowed.'),
 (<function __main__.no_nulls_test(df)>,
  'All values are expected to be non-null.'),
 (<function __main__.cases_range_test(df)>,
  'Cases must be non-negative and <= 10M.'),
 (<function __main__.deaths_range_test(df)>,
  'Deaths must be non-negative and <= 100K.')]

In [146]:
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061,1,0
1,2020-01-22,Snohomish,Washington,53061,1,0
2,2020-01-23,Snohomish,Washington,53061,1,0
3,2020-01-24,Cook,Illinois,17031,1,0
4,2020-01-24,Snohomish,Washington,53061,1,0


In [150]:
tests = options['name']
tests

'nyt_cases_counties'

In [162]:
results = []
for test_func, failure_message in nyt_cases_counties:
    print(test_func,"->",failure_message)
    results.append(test_func(df.copy()))
    print(results[-1])
    if results[-1]:
        logger.info(f'Data test {test_func.__name__} passed.')
    else:
         logger.error(f'Data test {test_func.__name__} failed. {failure_message}')

2023-05-20 18:21:48 __main__ ERROR    Data test cases_vs_deaths failed. Death counts cannot exceed case counts.


<function cases_vs_deaths at 0x7fcf237513a0> -> Death counts cannot exceed case counts.
False
<function unique_records at 0x7fcf23751040> -> Only one record per FIPs, per date allowed.


2023-05-20 18:21:49 __main__ ERROR    Data test unique_records failed. Only one record per FIPs, per date allowed.


False
<function no_nulls_test at 0x7fcf237511f0> -> All values are expected to be non-null.


2023-05-20 18:21:50 __main__ INFO     Data test no_nulls_test passed.
2023-05-20 18:21:50 __main__ INFO     Data test cases_range_test passed.
2023-05-20 18:21:50 __main__ INFO     Data test deaths_range_test passed.


True
<function cases_range_test at 0x7fcf23751940> -> Cases must be non-negative and <= 10M.
True
<function deaths_range_test at 0x7fcf237518b0> -> Deaths must be non-negative and <= 100K.
True


In [160]:
results[-1]

True

In [163]:
# LOAD data 

In [None]:
logger.info(f"Loading data to bit.io...")