Skip to content

Commit

Permalink
changes from Nat's code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ananya Ashish Joshi authored and Ananya Ashish Joshi committed Nov 7, 2020
1 parent f19cb97 commit 8cd7f89
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 165 deletions.
250 changes: 115 additions & 135 deletions validator/delphi_validator/validate.py
Expand Up @@ -10,7 +10,6 @@
from datetime import date, datetime, timedelta
import pandas as pd
from .errors import ValidationError, APIDataFetchError
import time
from .datafetcher import filename_regex, \
read_filenames, load_csv, get_geo_signal_combos, \
fetch_api_reference
Expand Down Expand Up @@ -609,137 +608,143 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date



def data_corrections(self, source_df, api_frames, geo, sig, checking_date):
def check_positive_negative_spikes(self, source_df, api_frames, geo, sig):
"""
Adapt Dan's/Balasubramanian's corrections package to Python (only consider spikes) : https://github.com/cmu-delphi/covidcast-forecast/tree/dev/corrections/data_corrections
Adapt Dan's corrections package to Python (only consider spikes) :
https://github.com/cmu-delphi/covidcast-forecast/tree/dev/corrections/data_corrections
Arguments:
- df_to_test: pandas dataframe of CSV source data
- df_to_reference: pandas dataframe of reference data, either from the
Statistics for a right shifted rolling window and a centered rolling window are used
to determine outliers for both positive and negative spikes.
As it is now, ststat will always be NaN for source frames.
Arguments:
- source_df: pandas dataframe of CSV source data
- api_frames: pandas dataframe of reference data, either from the
COVIDcast API or semirecent data
- checking_date: datetime date
- geo: str; geo type name (county, msa, hrr, state) as in the CSV name
- sig: str; signal name as in the CSV name
"""
# Check inputs:
if not (geo == "state" or geo =="county"):
self.increment_total_checks()
self.raised_errors.append(ValidationError(
("data_corrections_sig", geo, sig),
None,
"geo_type should be one of 'state' or 'county'!"))

return

source_df.to_csv("source" + str(checking_date.date()) + ".csv")
# Combine all possible frames so that the rolling window calculations make sense, even if the before or after source frame is None
all_frames = pd.concat([api_frames, source_df]).drop_duplicates().sort_values(by=['time_value'])

# Tuned Variables from Dan's Code
self.increment_total_checks()
# Combine all possible frames so that the rolling window calculations make sense.
source_frame_start = source_df["time_value"].min()
source_frame_end = source_df["time_value"].max()
api_frames_start = api_frames["time_value"].min()
api_frames_end = api_frames["time_value"].max()
all_frames = pd.concat([api_frames, source_df]).drop_duplicates(). \
sort_values(by=['time_value']).reset_index()

# Tuned Variables from Dan's Code for flagging outliers. Size_cut is a
# check on the minimum value reported, sig_cut is a check
# on the ftstat or ststat reported (t-statistics) and sig_consec
# is a lower check for determining outliers that are next to each other.
size_cut = 20
sig_cut = 3
sig_consec = 2.25

# A function mapped to each row to determine outliers based on fstat and ststat values
# Functions mapped to rows to determine outliers based on fstat and ststat values
def outlier_flag(frame):
if (abs(frame["val"]) > size_cut) and not (pd.isna(frame["ststat"])) and (frame["ststat"] > sig_cut):
if (abs(frame["val"]) > size_cut) and not (pd.isna(frame["ststat"])) \
and (frame["ststat"] > sig_cut):
return 1
if (abs(frame["val"]) > size_cut) and (pd.isna(frame["ststat"])) and \
not (pd.isna(frame["ftstat"])) and (frame["ftstat"] > sig_cut):
return 1
if (abs(frame["val"]) > size_cut) and (pd.isna(frame["ststat"])) and not (pd.isna(frame["ftstat"])) and (frame["ftstat"] > sig_cut):
if (frame["val"] < -size_cut) and not (pd.isna(frame["ststat"])) and \
not pd.isna(frame["ftstat"]):
return 1
if (frame["val"] < -size_cut) and not (pd.isna(frame["ststat"])) and not (pd.isna(frame["ftstat"])):
return 0

def outlier_nearby(frame):
if (not pd.isna(frame['ststat'])) and (frame['ststat'] > sig_consec):
return 1
if pd.isna(frame['ststat']) and (frame['ftstat'] > sig_consec):
return 1
return 0

# Calculate ftstat and ststat values for the rolling windows, group fames by geo region


# Calculate ftstat and ststat values for the rolling windows, group fames by geo region
region_group = all_frames.groupby("geo_id")
window_size = 14
shift_val = 0

# Shift the window to match how R calculates rolling windows with even numbers
if (window_size%2 == 0):
if window_size%2 == 0:
shift_val = -1

# Calculate the t-statistics for the two rolling windows (windows center and windows right)
all_full_frames = []
for cat, group in region_group:
for _, group in region_group:
rolling_windows = group["val"].rolling(window_size, min_periods=window_size)
center_windows = group["val"].rolling(window_size, min_periods=window_size, center=True)
fmean = rolling_windows.mean()
fmedian = rolling_windows.median()
smedian = center_windows.median().shift(shift_val)
fsd = rolling_windows.std()
ssd = center_windows.std().shift(shift_val)
vals_modified_f = group["val"] - fmedian.fillna(0)
vals_modified_s = group["val"] - smedian.fillna(0)
rolling_windows_f = vals_modified_f.rolling(window_size, min_periods=window_size)
center_windows_s = vals_modified_s.rolling(window_size, min_periods=window_size, center=True)
fmad = rolling_windows_f.median()
smad = center_windows_s.median().shift(shift_val)
ftstat = abs(vals_modified_f)/fsd
ststat = abs(vals_modified_s)/ssd
#print(vals_modified_f)
group['fmean'] = fmean
group['fmedian'] = fmedian
group['smedian'] = smedian
group['fsd'] = fsd
group['ssd'] = ssd
group['fmad'] = fmad
group['smad'] = smad
group['ftstat'] = ftstat
group['ststat'] = ststat
all_full_frames.append(group)
group['ststat'] = ststat
all_full_frames.append(group)

all_frames = pd.concat(all_full_frames)

# Determine outliers
outlier_source_df = all_frames.sort_values(by=['time_value']).copy()
outlier_source_df["flag"] = outlier_source_df.apply(outlier_flag, axis = 1)
outlier_group = outlier_source_df.groupby("geo_id")

outlier_append = pd.DataFrame()
for cat, group in outlier_group:
group = group.reset_index()
for index, row in group.iterrows():
if row["flag"] == 1:
try:
eval_next = group.iloc[index+1, :]
if (not pd.isna(eval_next['ststat'])) and (eval_next['ststat'] > sig_consec):
eval_next["flag"] == 1
outlier_append = outlier_append.append(eval_next, ignore_index=True)

if pd.isna(eval_next['ststat']) and (eval_next['ftstat'] > sig_consec):
eval_next["flag"] == 1
outlier_append = outlier_append.append(eval_next, ignore_index=True)

except:
continue
try:
eval_prev = group.iloc[index-1, :]
if (not pd.isna(eval_prev['ststat'])) and (eval_prev['ststat'] > sig_consec):
eval_prev["flag"] == 1
outlier_append = outlier_append.append(eval_prev, ignore_index=True)
if pd.isna(eval_prev['ststat']) and (eval_prev['ftstat'] > sig_consec):
eval_prev["flag"] == 1
outlier_append = outlier_append.append(eval_prev, ignore_index=True)
except:
continue

outlier_append["flag"] = 1
outliers = outlier_source_df[outlier_source_df["flag"] == 1]
all_o = pd.concat([outliers, outlier_append]).drop(columns=['index']).sort_values(by=['time_value','geo_id']).drop_duplicates()
all_o = all_o.reset_index().drop(columns=['index'])
all_o.to_csv(str(checking_date.date()) + sig + "outliers.csv")
all_frames.to_csv(str(checking_date.date()) + sig + "all_frames.csv")

if outliers.shape[0] > 0:
# Determine outliers in source frames only, only need the reference
# data from just before the start of the source data
# because lead and lag outlier calculations are only one day
outlier_df = all_frames.query \
('time_value >= @api_frames_end & time_value <= @source_frame_end')
outlier_df = outlier_df.sort_values(by=['geo_id', 'time_value']) \
.drop(columns=['index']).reset_index().copy()
outlier_df["flag"] = 0
outlier_df["flag"] = outlier_df.apply(outlier_flag, axis = 1)
outliers = outlier_df[outlier_df["flag"] == 1]
outliers_reset = outliers.copy().reset_index() \
.drop(columns=['index', 'level_0'])

# Find the lead outliers and the lag outliers. Check that the selected row
# is actually a leading and lagging row for given geo_id
upper_index = list(filter(lambda x: x < outlier_df.shape[0], \
list(outliers.index+1)))
upper_df = outlier_df.iloc[upper_index, :].reset_index().drop(columns=['level_0', 'index'])
upper_compare = outliers_reset[:len(upper_index)]
sel_upper_df = upper_df[upper_compare["geo_id"] == upper_df["geo_id"]].copy()
lower_index = list(filter(lambda x: x >= 0, list(outliers.index-1)))
lower_df = outlier_df.iloc[lower_index, :].reset_index().drop(columns=['level_0', 'index'])
lower_compare = outliers_reset[-len(lower_index):].reset_index()
sel_lower_df = lower_df[lower_compare["geo_id"] == lower_df["geo_id"]].copy()

sel_upper_df["flag"] = 0
sel_lower_df["flag"] = 0

sel_upper_df["flag"] = sel_upper_df.apply(outlier_nearby, axis = 1)
sel_lower_df["flag"] = sel_lower_df.apply(outlier_nearby, axis = 1)

upper_outliers = sel_upper_df[sel_upper_df["flag"] == 1]
lower_outliers = sel_lower_df[sel_lower_df["flag"] == 1]

all_outliers = pd.concat([outliers, upper_outliers, lower_outliers]). \
sort_values(by=['time_value','geo_id']).drop(columns=['index']). \
drop_duplicates().reset_index()


source_outliers = all_outliers.query \
("time_value >= @source_frame_start & time_value <= @source_frame_end")

if source_outliers.shape[0] > 0:
self.raised_errors.append(ValidationError(
("data_corrections_range",
(checking_date.date()-timedelta(days=1), checking_date.date()+timedelta(days=1)), geo, sig),
(outliers),
'Dates with flagged ouliers based on the previous 30 days of source data available'))
self.increment_total_checks()
("check_positive_negative_spikes",
source_frame_start, source_frame_end, geo, sig),
(source_outliers),
'Source dates with flagged ouliers based on the \
previous 14 days of data available'))



def check_avg_val_vs_reference(self, df_to_test, df_to_reference, checking_date, geo_type,
signal_type):
"""
Expand Down Expand Up @@ -865,12 +870,11 @@ def validate(self, export_dir):
# Get relevant data file names and info.
export_files = read_filenames(export_dir)
date_filter = make_date_filter(self.start_date, self.end_date)


# Make list of tuples of CSV names and regex match objects.
validate_files = [(f, m) for (f, m) in export_files if date_filter(m)]
self.check_missing_date_files(validate_files)
self.check_missing_dates(validate_files)
self.check_settings()

all_frames = []
Expand Down Expand Up @@ -899,12 +903,6 @@ def validate(self, export_dir):

all_frames = pd.concat(all_frames)

# Get list of dates we expect to see in the source data.
date_slist = all_frames['date'].unique().tolist()
date_list = list(
map(lambda x: datetime.strptime(x, '%Y%m%d'), date_slist))
date_list.sort()

# recent_lookbehind: start from the check date and working backward in time,
# how many days at a time do we want to check for anomalies?
# Choosing 1 day checks just the daily data.
Expand All @@ -918,24 +916,20 @@ def validate(self, export_dir):
date_list = [self.start_date + timedelta(days=days)
for days in range(self.span_length.days + 1)]

#get 30 days prior to the earliest list date
outlier_lookbehind = timedelta(days=30)
#get 14 days prior to the earliest list date
outlier_lookbehind = timedelta(days=14)

# Get all expected combinations of geo_type and signal.
geo_signal_combos = get_geo_signal_combos(self.data_source)

all_api_df = self.threaded_api_calls(
self.start_date - min(semirecent_lookbehind,
self.max_check_lookbehind),
self.start_date - outlier_lookbehind,
self.end_date, geo_signal_combos)

# Keeps script from checking all files in a test run.
if self.test_mode:
kroc = 0


prev_df = None
next_df = None
# Comparison checks
# Run checks for recent dates in each geo-sig combo vs semirecent (previous
# week) API data.
Expand All @@ -954,7 +948,7 @@ def validate(self, export_dir):
"file with geo_type-signal combo does not exist"))
continue


max_date = geo_sig_df["time_value"].max()
self.check_min_allowed_max_date(max_date, geo_type, signal_type)
self.check_max_allowed_max_date(max_date, geo_type, signal_type)
Expand All @@ -965,15 +959,25 @@ def validate(self, export_dir):
if geo_sig_api_df is None:
continue

min_date = min(date_list)
max_date = max(date_list)
# Source frame with the day before and after
source_df = geo_sig_df.query(
'time_value <= @max_date & time_value >= @min_date')
earliest_available_date = source_df["time_value"].min()

# Outlier dataframe
outlier_start_date = earliest_available_date - outlier_lookbehind
outlier_end_date = earliest_available_date - timedelta(days=1)
outlier_api_df = geo_sig_api_df.query \
('time_value <= @outlier_end_date & time_value >= @outlier_start_date')
self.check_positive_negative_spikes(source_df, outlier_api_df, geo_type, signal_type)

# Check data from a group of dates against recent (previous 7 days,
# by default) data from the API.
for checking_date in date_list:
recent_cutoff_date = checking_date - \
recent_lookbehind + timedelta(days=1)
# Check data from a group of dates against recent (previous 7 days, by default) and against all
# data from the API.
for index, checking_date in enumerate(date_list):
recent_cutoff_date = checking_date - recent_lookbehind
recent_df = geo_sig_df.query(
'time_value <= @checking_date & time_value >= @recent_cutoff_date')

Expand Down Expand Up @@ -1008,29 +1012,6 @@ def validate(self, export_dir):
"reference data is empty; comparative checks could not be performed"))
continue




# Source frame with the day before and after
next_cutoff_date = checking_date + recent_lookbehind
source_prev_next_df = geo_sig_df.query(
'time_value <= @next_cutoff_date & time_value >= @recent_cutoff_date')


earliest_available_date = source_prev_next_df["time_value"].min()
# Outlier dataframe runs backwards from the checking date, in the future we should reduce the number of api calls
outlier_start_date = recent_cutoff_date - outlier_lookbehind
outlier_end_date = earliest_available_date - timedelta(days=1)
outlier_api_df = fetch_api_reference(
self.data_source, outlier_start_date, outlier_end_date, geo, sig)

print(outlier_start_date, outlier_end_date, recent_cutoff_date, next_cutoff_date, earliest_available_date)
self.data_corrections(source_prev_next_df, outlier_api_df, geo, sig, checking_date)
prev_df = recent_df




self.check_max_date_vs_reference(
recent_df, reference_api_df, checking_date, geo_type, signal_type)

Expand All @@ -1042,9 +1023,8 @@ def validate(self, export_dir):
self.check_avg_val_vs_reference(
recent_df, reference_api_df, checking_date, geo_type, signal_type)



# Keeps script from checking all files in a test run.
# Keeps script from checking all files in a test run.
if self.test_mode:
kroc += 1
if kroc == 2:
Expand Down

0 comments on commit 8cd7f89

Please sign in to comment.