-
Notifications
You must be signed in to change notification settings - Fork 17
/
pull.py
394 lines (339 loc) · 14.5 KB
/
pull.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# -*- coding: utf-8 -*-
"""Collect and process Quidel export files."""
from os.path import join
import os
import time
from datetime import datetime, timedelta
import boto3
import pandas as pd
import numpy as np
from .constants import AGE_GROUPS
def get_from_s3(start_date, end_date, bucket, logger):
"""
Get raw data from aws s3 bucket.
Args:
start_date: datetime.datetime
pull data from file tagged with date on/after the start date
end_date: datetime.datetime
pull data from file tagged with date on/before the end date
bucket: s3.Bucket
the aws s3 bucket that stores quidel data
logger: logging.Logger
The structured logger.
output:
df: pd.DataFrame
time_flag: datetime.datetime
"""
time_flag = None
selected_columns = ['SofiaSerNum', 'TestDate', 'Facility', 'City',
'State', 'Zip', 'PatientAge', 'Result1',
'Result2', 'OverallResult', 'StorageDate',
'fname']
s3_files = {}
for obj in bucket.objects.all():
if "-sars" in obj.key:
date_string = obj.key.split("/")[1]
try:
yy = int(date_string.split("_")[0])
mm = int(date_string.split("_")[1])
dd = int(date_string.split("_")[2])
received_date = datetime(yy, mm, dd)
except ValueError:
continue
if received_date not in s3_files.keys():
s3_files[received_date] = [obj.key]
else:
s3_files[received_date].append(obj.key)
n_days = (end_date - start_date).days + 1
df_list = []
seen_files = set()
for search_date in [start_date + timedelta(days=x) for x in range(n_days)]:
if search_date in s3_files.keys():
logger.info(f"Pulling data received on {search_date.date()}")
# Fetch data received on the same day
for fn in s3_files[search_date]:
# Skip non-CSV files, such as directories
if ".csv" not in fn:
continue
# Avoid appending duplicate datasets
if fn in seen_files:
continue
obj = bucket.Object(key=fn)
newdf = pd.read_csv(obj.get()["Body"],
parse_dates=["StorageDate", "TestDate"],
low_memory=False)
seen_files.add(fn)
newdf["fname"] = fn
df_list.append(newdf[selected_columns])
time_flag = search_date
return pd.concat(df_list), time_flag
def fix_zipcode(df):
"""Fix zipcode that is 9 digit instead of 5 digit."""
zipcode5 = []
fixnum = 0
for zipcode in df['Zip'].values:
if isinstance(zipcode, str) and '-' in zipcode:
zipcode5.append(int(zipcode.split('-')[0]))
fixnum += 1
else:
zipcode = int(float(zipcode))
zipcode5.append(zipcode)
df['zip'] = zipcode5
return df
def fix_date(df, logger):
"""
Remove invalid dates and select correct test date to use.
Quidel Covid Test are labeled with Test Date and Storage Date. In principle,
the TestDate should reflect when the test was performed and the StorageDate
when the test was logged in the MyVirena cloud storage device. We expect
that the test date should precede the storage date by several days. However,
in the actual data the test date can be far earlier than the storage date
and the test date can also occur after the storage date.
- For most of the cases, use test date as the timestamp
- Remove tests with a storage date which is earlier than the test date
- If the storage date is 90 days later than the test date, the storage
will be adopted instead
"""
df.insert(2, "timestamp", df["TestDate"])
mask = df["TestDate"] <= df["StorageDate"]
logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f}% of unusual data")
df = df[mask]
mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90)
logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f}% of outdated data")
df["timestamp"].values[mask] = df["StorageDate"].values[mask]
return df
def preprocess_new_data(start_date, end_date, params, test_mode, logger):
"""
Pull and pre-process Quidel Covid Test data.
Drop unnecessary columns. Temporarily consider the positive rate
sensor only which is related to number of total tests and number
of positive tests.
Args:
start_date: datetime.datetime
pull data from file tagged with date on/after start date
end_date: datetime.datetime
pull data from file tagged with date on/before the end date
params: dict
read from params.json
test_mode: bool
pull raw data from s3 or not
logger: logging.Logger
The structured logger.
output:
df: pd.DataFrame
time_flag: datetime.date:
the actual pull end date on which we successfully pull the data
"""
if test_mode:
test_data_dir = "./test_data/test_data.csv"
df, time_flag = pd.read_csv(
test_data_dir,
parse_dates=["StorageDate", "TestDate"]
), datetime(2020, 8, 17)
else:
# connect aws s3 bucket
aws_access_key_id = params["aws_credentials"]["aws_access_key_id"]
aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"]
bucket_name = params["bucket_name"]
s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
bucket = s3.Bucket(bucket_name)
# Get new data from s3
df, time_flag = get_from_s3(start_date, end_date, bucket, logger)
# No new data can be pulled
if time_flag is None:
return df, time_flag
# Fix some of the fipcodes that are 9 digit instead of 5 digit
df = fix_zipcode(df)
# Create a column CanonicalDate according to StarageDate and TestDate
df = fix_date(df, logger)
# Compute overallPositive
overall_pos = df[df["OverallResult"] == "positive"].groupby(
by=["timestamp", "zip"],
as_index=False)['OverallResult'].count()
overall_pos["positiveTest_total"] = overall_pos["OverallResult"]
overall_pos.drop(labels="OverallResult", axis="columns", inplace=True)
# Compute overallTotal
overall_total = df.groupby(
by=["timestamp", "zip"],
as_index=False)['OverallResult'].count()
overall_total["totalTest_total"] = overall_total["OverallResult"]
overall_total.drop(labels="OverallResult", axis="columns", inplace=True)
# Compute numUniqueDevices
numUniqueDevices = df.groupby(
by=["timestamp", "zip"],
as_index=False)["SofiaSerNum"].agg({"SofiaSerNum": "nunique"}).rename(
columns={"SofiaSerNum": "numUniqueDevices_total"}
)
df_merged = overall_total.merge(
numUniqueDevices, on=["timestamp", "zip"], how="left"
).merge(
overall_pos, on=["timestamp", "zip"], how="left"
).fillna(0).drop_duplicates()
# Compute Summary info for age groups
df["PatientAge"] = df["PatientAge"].fillna(-1)
df.loc[df["PatientAge"] == "<1", "PatientAge"] = 0.5
df.loc[df["PatientAge"] == ">85", "PatientAge"] = 100
df["PatientAge"] = df["PatientAge"] .astype(float)
# Should match the suffixes of signal names
df["label"] = None
df.loc[df["PatientAge"] < 5, "label"] = "age_0_4"
df.loc[((df["PatientAge"] >= 5)) & (df["PatientAge"] < 18), "label"] = "age_5_17"
df.loc[((df["PatientAge"] >= 18)) & (df["PatientAge"] < 50), "label"] = "age_18_49"
df.loc[((df["PatientAge"] >= 50)) & (df["PatientAge"] < 65), "label"] = "age_50_64"
df.loc[(df["PatientAge"] >= 65), "label"] = "age_65plus"
df.loc[df["PatientAge"] == -1, "label"] = "NA"
for agegroup in AGE_GROUPS[1:]: # Exclude total
if agegroup == "age_0_17":
ages = ["age_0_4", "age_5_17"]
else:
ages = [agegroup]
# Compute overallPositive
group_pos = df.loc[(df["OverallResult"] == "positive")
& (df["label"].isin(ages))].groupby(
by=["timestamp", "zip"],
as_index=False)['OverallResult'].count()
group_pos[f"positiveTest_{agegroup}"] = group_pos["OverallResult"]
group_pos.drop(labels="OverallResult", axis="columns", inplace=True)
# Compute overallTotal
group_total = df.loc[df["label"].isin(ages)].groupby(
by=["timestamp", "zip"],
as_index=False)['OverallResult'].count()
group_total[f"totalTest_{agegroup}"] = group_total["OverallResult"]
group_total.drop(labels="OverallResult", axis="columns", inplace=True)
# Compute numUniqueDevices
group_numUniqueDevices = df.loc[df["label"].isin(ages)].groupby(
by=["timestamp", "zip"],
as_index=False)["SofiaSerNum"].agg({"SofiaSerNum": "nunique"}).rename(
columns={"SofiaSerNum": f"numUniqueDevices_{agegroup}"}
)
df_merged = df_merged.merge(
group_numUniqueDevices, on=["timestamp", "zip"], how="left"
).merge(
group_pos, on=["timestamp", "zip"], how="left"
).merge(
group_total, on=["timestamp", "zip"], how="left"
).fillna(0).drop_duplicates()
return df_merged, time_flag
def check_intermediate_file(cache_dir, pull_start_date):
"""Check whether there is a cache file containing historical data already."""
for filename in os.listdir(cache_dir):
if ".csv" in filename:
pull_start_date = datetime.strptime(filename.split("_")[2].split(".")[0],
'%Y%m%d') + timedelta(days=1)
previous_df = pd.read_csv(os.path.join(cache_dir, filename),
sep=",", parse_dates=["timestamp"])
return previous_df, pull_start_date
return None, pull_start_date
def pull_quidel_covidtest(params, logger):
"""Pull the quidel covid test data.
Conditionally merge new data with historical data from ./cache.
Parameters:
params: dict
including all the information read from params.json
logger: logging.Logger
The structured logger.
Returns:
DataFrame:
A data frame containinig the pre-process data with columns:
timestamp, numUniqueDevices, positiveTest, totalTest
datetime.datetime
the first date of the report
datetime.datetime
the last date of the report
"""
cache_dir = params["input_cache_dir"]
test_mode = params["test_mode"]
# pull new data only that has not been ingested
previous_df, pull_start_date = check_intermediate_file(
cache_dir,
datetime.strptime(params["pull_start_date"], '%Y-%m-%d'))
if params["pull_end_date"] == "":
pull_end_date = datetime.today()
else:
pull_end_date = datetime.strptime(params["pull_end_date"], '%Y-%m-%d')
# Pull data from the file at 5 digit zipcode level
# Use _end_date to check the most recent date that we received data
df, _end_date = preprocess_new_data(
pull_start_date, pull_end_date, params, test_mode, logger)
# Utilize previously stored data
if previous_df is not None:
df = pd.concat(
[previous_df, df]
).groupby(
["timestamp", "zip"]
).sum(
numeric_only=True
).reset_index(
)
return df, _end_date
def check_export_end_date(input_export_end_date, _end_date,
end_from_today_minus):
"""
Update the export_end_date according to the data received.
By default, set the export end date to be the last pulling date - 5 days
(end_from_today_minus = 5).
Otherwise, use the required date if it is earlier than the default one.
Parameter:
input_export_end_date: str
read from params
_end_date: datetime.datetime
updated according the data received
end_from_today_minus: int
report data until - X days
Returns:
datetime.datetime
export data from which date
"""
export_end_date = _end_date - timedelta(days=end_from_today_minus)
if input_export_end_date != "":
input_export_end_date = datetime.strptime(input_export_end_date, '%Y-%m-%d')
if input_export_end_date < export_end_date:
return input_export_end_date
return export_end_date
def check_export_start_date(export_start_date, export_end_date,
export_day_range):
"""
Ensure that the starte date, end date, and day range are mutually consistent.
Parameters:
export_start_date: str
Read from params
export_end_date: datetime.datetime
Calculated according to the data received
export_day_range: int
Number of days to report
Returns:
datetime.datetime
export data until which date
"""
if export_start_date == "":
export_start_date = datetime(2020, 5, 26)
else:
export_start_date = datetime.strptime(export_start_date, '%Y-%m-%d')
# Only export data from -50 days to -5 days
if (export_end_date - export_start_date).days > export_day_range:
export_start_date = export_end_date - timedelta(days=export_day_range)
if export_start_date < datetime(2020, 5, 26):
return datetime(2020, 5, 26)
return export_start_date
def update_cache_file(df, _end_date, cache_dir, logger):
"""
Update cache file. Remove the old one, export the new one.
Parameter:
df: pd.DataFrame
Pre-process file at ZipCode level
_end_date:
The most recent date when the raw data is received
cache_dir:
./cache where the cache file is stored
logger: logging.Logger
Structured logger.
"""
start_time = time.time()
for fn in os.listdir(cache_dir):
if ".csv" in fn:
os.remove(join(cache_dir, fn))
df.to_csv(join(cache_dir, "pulled_until_%s.csv") % _end_date.strftime("%Y%m%d"), index=False)
logger.info("Completed cache file update",
end_date = _end_date.strftime('%Y-%m-%d'),
elapsed_time_in_seconds = round(time.time() - start_time, 2))