-
Notifications
You must be signed in to change notification settings - Fork 218
/
cova_epi_scraper.py
147 lines (111 loc) · 4.69 KB
/
cova_epi_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import logging
import pandas as pd
import sciris as sc
class Scraper(sc.prettyobj):
'''
Standard methods for scrapers
'''
def __init__(self, parameters):
assert parameters.get("load_path", False), "Must provide load_path"
self.load_path = parameters["load_path"]
self.output_folder = parameters.get(
"output_folder", "epi_data")
self.renames = parameters.get("renames")
self.fields_to_drop = parameters.get("fields_to_drop")
self.cumulative_fields = parameters.get("cumulative_fields")
self.df = None
self.grouping = None
self.log = logging.getLogger(__name__)
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
if parameters.get("scape_on_init", False):
self.scrape()
def scrape(self):
self.preload()
self.load()
self.transform()
self.test_quality()
self.output()
## PRELOAD
def preload(self):
pass
## LOAD DATA
def load(self):
# Read ito a dataframe
self.log.info(f"Loading data from {self.load_path}")
self.df = pd.read_csv(self.load_path)
self.log.info(f"Loaded {len(self.df)} records.")
self.log.info(f"Original columns: {', '.join(self.df.columns)}")
## TRANSFORM DATA
def transform(self):
self.rename_fields()
self.create_date()
self.create_key()
self.group_data()
self.create_day()
self.convert_cumulative_fields()
self.drop_fields()
def rename_fields(self):
if self.renames is not None:
self.log.info(f"Renaming fields: {self.renames}")
self.df = self. df.rename(columns=self.renames)
def create_date(self):
self.df['date'] = pd.to_datetime(self.df.date)
def create_key(self):
pass
def group_data(self):
assert 'key' in self.df.columns, 'No column named "key"; do you need to rename?'
assert 'date' in self.df.columns, 'No column named "date"; do you neeed to define a create_date method?'
self.df = self.df.sort_values(['key', 'date'])
self.grouping = self.df.groupby('key')
def create_day(self):
print(len(self.df))
self.df['day'] = (self.grouping['date'].transform(
lambda x: (x - min(x))).apply(lambda x: x.days))
def convert_cumulative_fields(self):
if self.cumulative_fields:
for cum, num in self.cumulative_fields.items():
self.convert_cum_to_num(cum, num)
def convert_cum_to_num(self, cum_field, num_field):
lag_field = f"lagged_{cum_field}"
self.df[lag_field] = self.grouping[cum_field].shift(1)
self.df[num_field] = self.df[cum_field].sub(
self.df[lag_field]).fillna(self.df[cum_field])
self.df.drop([lag_field], inplace=True, axis=1)
def drop_fields(self):
if self.fields_to_drop:
self.log.info(f"Dropping fields {', '.join(self.fields_to_drop)}.")
self.df.drop(self.fields_to_drop, inplace=True, axis=1)
## TEST DATA QUALITY
def test_quality(self):
self.run_general_data_quality_tests()
self.run_additional_data_quality_tests()
def run_general_data_quality_tests(self):
# date and day present?
assert 'date' in self.df.columns, f"Data must have a 'date' field. Current columns are {', '.join(self.df.columns)}"
assert 'day' in self.df.columns, f"Data must have a 'day' field. Current columns are {', '.join(self.df.columns)}"
# are data in sequence?
for g in self.grouping['date']:
number_of_days = (g[1].max() - g[1].min()).days + 1
records = len(g[1])
if len(g[1]) != number_of_days:
self.log.warn(f"Entity {g[0]} does not have as many records ({records}) as days ({number_of_days}). Should be ok, though.")
def run_additional_data_quality_tests(self):
pass
## OUTPUT DATA
def output(self):
self.log.info(f"Final columns: {', '.join(self.df.columns)}")
self.log.info("First rows of data:")
self.log.info(self.df.head())
here = sc.thisdir(__file__)
data_home = os.path.join(here, self.output_folder)
for g in self.grouping:
key_value = g[0]
filename = f'{sc.sanitizefilename(key_value)}.csv'
filepath = sc.makefilepath(filename=filename, folder=data_home)
self.log.info(f'Creating {filepath}')
mini_df = self.df[self.df.key == key_value]
mini_df.to_csv(filepath)
self.log.info(
f"There are {len(self.grouping)} entities in this dataset.")
self.log.info(f"Saved {len(self.df)} records.")