-
Notifications
You must be signed in to change notification settings - Fork 17
/
export.py
132 lines (123 loc) · 4.86 KB
/
export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
from datetime import datetime
from os.path import join
from typing import Optional
import logging
from epiweeks import Week
import numpy as np
import pandas as pd
from .nancodes import Nans
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
masks = [
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
for column in columns
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
return df
def create_export_csv(
df: pd.DataFrame,
export_dir: str,
geo_res: str,
sensor: str,
metric: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False,
write_empty_days: Optional[bool] = False,
logger: Optional[logging.Logger] = None,
weekly_dates = False,
sort_geos: bool = False
):
"""Export data in the format expected by the Delphi API.
This function will round the signal and standard error values to 7 decimals places.
Parameters
----------
df: pd.DataFrame
Columns: geo_id, timestamp, val, se, sample_size
export_dir: str
Export directory
geo_res: str
Geographic resolution to which the data has been aggregated
sensor: str
Sensor that has been calculated (cumulative_counts vs new_counts)
metric: Optional[str]
Metric we are considering, if any.
start_date: Optional[datetime]
Earliest date to export or None if no minimum date restrictions should be applied.
end_date: Optional[datetime]
Latest date to export or None if no maximum date restrictions should be applied.
remove_null_samples: Optional[bool]
Whether to remove entries whose sample sizes are null.
write_empty_days: Optional[bool]
If true, every day in between start_date and end_date will have a CSV file written
even if there is no data for the day. If false, only the days present are written.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.
weekly_dates: Optional[bool]
Whether the output data are weekly or not. If True, will prefix files with
"weekly_YYYYWW" where WW is the epiweek instead of the usual YYYYMMDD for daily files.
sort_geos: bool
If True, the dataframe is sorted by geo before writing. Otherwise, the dataframe is
written as is.
Returns
---------
dates: pd.Series[datetime]
Series of dates for which CSV files were exported.
"""
df = df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"])
if start_date is None:
start_date = min(df["timestamp"])
if end_date is None:
end_date = max(df["timestamp"])
if not write_empty_days:
dates = pd.Series(
df[np.logical_and(df["timestamp"] >= start_date,
df["timestamp"] <= end_date)]["timestamp"].unique()
).sort_values()
else:
dates = pd.date_range(start_date, end_date)
for date in dates:
if weekly_dates:
t = Week.fromdate(pd.to_datetime(str(date)))
date_str = "weekly_" + str(t.year) + str(t.week).zfill(2)
else:
date_str = date.strftime('%Y%m%d')
if metric is None:
export_filename = f"{date_str}_{geo_res}_{sensor}.csv"
else:
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
if sort_geos:
export_df = export_df.sort_values(by="geo_id")
export_df.to_csv(export_file, index=False, na_rep="NA")
return dates