/
rowparser.py
executable file
·396 lines (360 loc) · 14.3 KB
/
rowparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import copy
import logging
from datetime import datetime
from operator import itemgetter
from typing import Dict, Generator, Iterator, List, Optional, Tuple
import hxl
from dateutil.relativedelta import relativedelta # noqa: F401
from ..utilities import match_template
from hdx.location.adminlevel import AdminLevel
from hdx.location.country import Country
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import dict_of_lists_add
logger = logging.getLogger(__name__)
class RowParser:
"""RowParser class for parsing each row.
Args:
name (str): Name of scraper
countryiso3s (List[str]): List of ISO3 country codes to process
adminlevel (Optional[AdminLevel]): AdminLevel object from HDX Python Country library
level (str): Can be national, subnational or single
datelevel (str): Can be global, regional, national, subnational
today (datetime): Date today
datasetinfo (Dict): Dictionary of information about dataset
headers (List[str]): Row headers
header_to_hxltag (Optional[Dict[str, str]]): Mapping from headers to HXL hashtags or None
subsets (List[Dict]): List of subset definitions
maxdateonly (bool): Whether to only take the most recent date. Defaults to True.
"""
def __init__(
self,
name: str,
countryiso3s: List[str],
adminlevel: Optional[AdminLevel],
level: str,
datelevel: str,
today: datetime,
datasetinfo: Dict,
headers: List[str],
header_to_hxltag: Optional[Dict[str, str]],
subsets: List[Dict],
maxdateonly: bool = True,
) -> None:
def get_level(level: str) -> Optional[int]:
"""Get the level_name as a number. "Single" valued outputs are typically
regional or global
Args:
level (str): Can be national, subnational or single (for a single value)
Returns:
Optional[int]: Level as a number
"""
if level == "single":
return None
elif level == "national":
return 0
else:
return 1
self.name = name
self.level = get_level(level)
self.datelevel = get_level(datelevel)
self.today = today
self.sort = datasetinfo.get("sort")
self.stop_row = datasetinfo.get("stop_row")
self.datecol = datasetinfo.get("date")
self.datetype = datasetinfo.get("date_type")
if self.datetype:
if self.datetype == "date":
date = parse_date("1900-01-01")
else:
date = 0
else:
date = 0
self.maxdate = date
self.single_maxdate = datasetinfo.get("single_maxdate", False)
self.ignore_future_date = datasetinfo.get("ignore_future_date", True)
self.adminlevel = adminlevel
self.admcols = datasetinfo.get("admin", [])
self.admexact = datasetinfo.get("admin_exact", False)
self.admsingle = datasetinfo.get("admin_single", None)
if self.admsingle:
self.datelevel = None
self.subsets = subsets
self.filter_cols = datasetinfo.get("filter_cols", [])
prefilter = datasetinfo.get("prefilter")
if prefilter is not None:
prefilter = self.get_filter_str_for_eval(prefilter)
self.prefilter = prefilter
adms = datasetinfo.get("admin_filter")
if adminlevel:
if adms is None:
self.adms = [countryiso3s, adminlevel.pcodes]
else:
if self.datelevel == 1:
self.adms = adms
else:
self.adms = [adms, adminlevel.pcodes]
else:
if adms is None:
self.adms = [countryiso3s]
else:
if self.datelevel == 1:
self.adms = adms
else:
self.adms = [adms]
if self.datelevel is None:
self.maxdates = {i: date for i, _ in enumerate(subsets)}
else:
if self.datelevel > len(self.admcols):
raise ValueError(
"No admin columns specified for required level_type!"
)
self.maxdates = {
i: {adm: date for adm in self.adms[self.datelevel]}
for i, _ in enumerate(subsets)
}
self.maxdateonly = maxdateonly
self.flatteninfo = datasetinfo.get("flatten")
self.headers = headers
self.header_to_hxltag: Optional[Dict[str, str]] = header_to_hxltag
self.filters = {}
self.read_external_filter(datasetinfo.get("external_filter"))
def read_external_filter(self, external_filter: Optional[Dict]) -> None:
"""Read filter list from external url pointing to a HXLated file
Args:
external_filter (Optional[Dict]): External filter information in dictionary
Returns:
None
"""
if not external_filter:
return
hxltags = external_filter["hxl"]
data = hxl.data(external_filter["url"])
for row in data:
for hxltag in data.columns:
if hxltag.display_tag in hxltags:
if self.header_to_hxltag:
header = hxltag.display_tag
else:
header = hxltag.header
dict_of_lists_add(
self.filters, header, row.get("#country+code")
)
def get_filter_str_for_eval(self, filter: str) -> str:
"""Replace filter string variables with columns in row of data
Args:
filter (str): Filter string
Returns:
str: Filter string with variables replaced
"""
if self.filter_cols:
for col in self.filter_cols:
filter = filter.replace(col, f"row['{col}']")
else:
if self.datecol:
filter = filter.replace(self.datecol, f"row['{self.datecol}']")
for subset in self.subsets:
for col in subset["input"]:
filter = filter.replace(col, f"row['{col}']")
return filter
def filter_sort_rows(self, iterator: Iterator[Dict]) -> Iterator[Dict]:
"""Apply prefilter and sort the input data before processing. If date_col is
specified along with any of sum or process, and sorting is not specified, then
apply a sort by date to ensure correct results.
Args:
iterator (Iterator[Dict]): Input data
Returns:
Iterator[Dict]: Input data with prefilter applied if specified and sorted if specified or deemed necessary
"""
rows = []
for row in iterator:
if self.header_to_hxltag:
newrow = {}
for header in row:
newrow[self.header_to_hxltag[header]] = row[header]
row = newrow
if self.stop_row:
if all(
row[key] == value for key, value in self.stop_row.items()
):
break
for newrow in self.flatten(row):
rows.append(newrow)
if not self.sort:
if self.datecol:
for subset in self.subsets:
apply_sort = subset.get(
"sum",
subset.get("process", subset.get("input_append")),
)
if apply_sort:
logger.warning(
"sum or process used without sorting. Applying sort by date to ensure correct results!"
)
self.sort = {"keys": [self.datecol], "reverse": True}
break
if self.prefilter:
rows = [row for row in rows if eval(self.prefilter)]
if self.sort:
keys = self.sort["keys"]
reverse = self.sort.get("reverse", False)
rows = sorted(rows, key=itemgetter(*keys), reverse=reverse)
return rows
def flatten(self, row: Dict) -> Generator[Dict, None, None]:
"""Flatten a wide spreadsheet format into a long one
Args:
row (Dict): Row to flatten
Returns:
Generator[Dict]: Flattened row(s)
"""
if not self.flatteninfo:
yield row
return
counters = [-1 for _ in self.flatteninfo]
while True:
newrow = copy.deepcopy(row)
for i, flatten in enumerate(self.flatteninfo):
colname = flatten["original"]
template_string, replace_string = match_template(colname)
if not template_string:
raise ValueError(
"Column name for flattening lacks an incrementing number!"
)
if counters[i] == -1:
counters[i] = int(replace_string)
else:
replace_string = f"{counters[i]}"
colname = colname.replace(template_string, replace_string)
if colname not in row:
return
newrow[flatten["new"]] = row[colname]
extracol = flatten.get("extracol")
if extracol:
newrow[extracol] = colname
counters[i] += 1
yield newrow
def get_maxdate(self) -> datetime:
"""Get the most recent date of the rows so far
Returns:
datetime: Most recent date in processed rows
"""
return self.maxdate
def filtered(self, row: Dict) -> bool:
"""Check if the row should be filtered out
Args:
row (Dict): Row to check for filters
Returns:
bool: Whether row is filtered out or not
"""
for header in self.filters:
if header not in row:
continue
if row[header] not in self.filters[header]:
return True
return False
def parse(self, row: Dict) -> Tuple[Optional[str], Optional[List[bool]]]:
"""Parse row checking for valid admin information and if the row should be filtered out in each subset given
its definition.
Args:
row (Dict): Row to parse
Returns:
Tuple[Optional[str], Optional[List[bool]]]: (admin name, should process subset list) or (None, None)
"""
if self.filtered(row):
return None, None
adms = [None for _ in range(len(self.admcols))]
def get_adm(admcol, i):
template_string, match_string = match_template(admcol)
if template_string and self.headers:
admcol = self.headers[int(match_string)]
adm = row[admcol]
if not adm:
return False
adm = adm.strip()
adms[i] = adm
if adm in self.adms[i]:
return True
exact = False
if self.admexact:
adms[i] = None
else:
if i == 0:
adms[i], exact = Country.get_iso3_country_code_fuzzy(adm)
elif i == 1:
adms[i], exact = self.adminlevel.get_pcode(
adms[0], adm, self.name
)
if adms[i] not in self.adms[i]:
adms[i] = None
return exact
for i, admcol in enumerate(self.admcols):
if admcol is None:
continue
if isinstance(admcol, str):
admcol = [admcol]
elif isinstance(admcol, dict):
value = admcol.get("value")
if not value:
continue
adms[i] = value
continue
for admcl in admcol:
exact = get_adm(admcl, i)
if adms[i] and exact:
break
if not adms[i]:
return None, None
should_process_subset = []
for subset in self.subsets:
filter = subset["filter"]
process = True
if filter:
filter = self.get_filter_str_for_eval(filter)
if not eval(filter):
process = False
should_process_subset.append(process)
if self.datecol:
if isinstance(self.datecol, list):
dates = [str(row[x]) for x in self.datecol]
date = "".join(dates)
else:
date = row[self.datecol]
if self.datetype == "date":
if not isinstance(date, datetime):
date = parse_date(date)
if date > self.today and self.ignore_future_date:
return None, None
elif self.datetype == "year":
date = int(date)
if date > self.today.year and self.ignore_future_date:
return None, None
else:
date = int(date)
for i, process in enumerate(should_process_subset):
if not process:
continue
if date < self.maxdate:
if self.single_maxdate:
should_process_subset[i] = False
else:
self.maxdate = date
if self.datelevel is None:
if self.maxdateonly:
if date < self.maxdates[i]:
should_process_subset[i] = False
else:
self.maxdates[i] = date
else:
self.maxdates[i] = date
else:
if self.maxdateonly:
if date < self.maxdates[i][adms[self.datelevel]]:
should_process_subset[i] = False
else:
self.maxdates[i][adms[self.datelevel]] = date
else:
self.maxdates[i][adms[self.datelevel]] = date
if self.level is None:
return "value", should_process_subset
if self.admsingle:
return self.admsingle, should_process_subset
return adms[self.level], should_process_subset