-
Notifications
You must be signed in to change notification settings - Fork 38
/
setup_data_structures.py
417 lines (302 loc) · 15.3 KB
/
setup_data_structures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
from __future__ import absolute_import
# PopulationSim
# See full license in LICENSE.txt.
from builtins import zip
import logging
import os
import pandas as pd
import numpy as np
from activitysim.core import inject
from activitysim.core import pipeline
from ..assign import assign_variable
from .helper import control_table_name
from .helper import get_control_table
from .helper import get_control_data_table
from populationsim.util import setting
logger = logging.getLogger(__name__)
def read_control_spec(data_filename, configs_dir):
# read the csv file
data_file_path = os.path.join(configs_dir, data_filename)
if not os.path.exists(data_file_path):
raise RuntimeError(
"initial_seed_balancing - control file not found: %s" % (data_file_path,))
logger.info("Reading control file %s" % data_file_path)
control_spec = pd.read_csv(data_file_path, comment='#')
geographies = setting('geographies')
if 'geography' not in control_spec.columns:
raise RuntimeError("missing geography column in controls file")
for g in control_spec.geography.unique():
if g not in geographies:
raise RuntimeError("unknown geography column '%s' in control file" % g)
return control_spec
def build_incidence_table(control_spec, households_df, persons_df, crosswalk_df):
hh_col = setting('household_id_col')
incidence_table = pd.DataFrame(index=households_df.index)
seed_tables = {
'households': households_df,
'persons': persons_df,
}
for control_row in control_spec.itertuples():
logger.info("control target %s" % control_row.target)
logger.debug("control_row.seed_table %s" % control_row.seed_table)
logger.debug("control_row.expression %s" % control_row.expression)
incidence, trace_results = assign_variable(
target=control_row.target,
expression=control_row.expression,
df=seed_tables[control_row.seed_table],
locals_dict={'np': np},
df_alias=control_row.seed_table,
trace_rows=None
)
# convert boolean True/False values to 1/0
incidence = incidence * 1
# aggregate person incidence counts to household
if control_row.seed_table == 'persons':
df = pd.DataFrame({
hh_col: persons_df[hh_col],
'incidence': incidence
})
incidence = df.groupby([hh_col], as_index=True).sum()
incidence_table[control_row.target] = incidence
return incidence_table
def add_geography_columns(incidence_table, households_df, crosswalk_df):
"""
Add seed and meta geography columns to incidence_table
Parameters
----------
incidence_table
households_df
crosswalk_df
Returns
-------
"""
geographies = setting('geographies')
meta_geography = geographies[0]
seed_geography = setting('seed_geography')
# add seed_geography col to incidence table
incidence_table[seed_geography] = households_df[seed_geography]
# add meta column to incidence table
seed_to_meta = \
crosswalk_df[[seed_geography, meta_geography]] \
.groupby(seed_geography, as_index=True).min()[meta_geography]
incidence_table[meta_geography] = incidence_table[seed_geography].map(seed_to_meta)
return incidence_table
def build_control_table(geo, control_spec, crosswalk_df):
# control_geographies is list with target geography and the geographies beneath it
control_geographies = setting('geographies')
assert geo in control_geographies
control_geographies = control_geographies[control_geographies.index(geo):]
# only want controls for control_geographies
control_spec = control_spec[control_spec['geography'].isin(control_geographies)]
controls_list = []
# for each geography at or beneath target geography
for g in control_geographies:
# control spec rows for this geography
spec = control_spec[control_spec['geography'] == g]
# are there any controls specified for this geography? (e.g. seed has none)
if len(spec.index) == 0:
continue
# control_data for this geography
control_data_df = get_control_data_table(g)
control_data_columns = [geo] + spec.control_field.tolist()
if g == geo:
# for top level, we expect geo_col, and need to group and sum
assert geo in control_data_df.columns
controls = control_data_df[control_data_columns]
controls.set_index(geo, inplace=True)
else:
# aggregate sub geography control totals to the target geo level
# add geo_col to control_data table
if geo not in control_data_df.columns:
# create series mapping sub_geo id to geo id
sub_to_geog = crosswalk_df[[g, geo]].groupby(g, as_index=True).min()[geo]
control_data_df[geo] = control_data_df[g].map(sub_to_geog)
# aggregate (sum) controls to geo level
controls = control_data_df[control_data_columns].groupby(geo, as_index=True).sum()
controls_list.append(controls)
# concat geography columns
controls = pd.concat(controls_list, axis=1)
# rename columns from seed_col to target
columns = {c: t for c, t in zip(control_spec.control_field, control_spec.target)}
controls.rename(columns=columns, inplace=True)
# reorder columns to match order of control_spec rows
controls = controls[control_spec.target]
# drop controls for zero-household geographies
total_hh_control_col = setting('total_hh_control')
empty = (controls[total_hh_control_col] == 0)
if empty.any():
controls = controls[~empty]
logger.info("dropping %s %s control rows with empty total_hh_control" % (empty.sum(), geo))
return controls
def build_crosswalk_table():
"""
build crosswalk table filtered to include only zones in lowest geography
"""
geographies = setting('geographies')
crosswalk_data_table = inject.get_table('geo_cross_walk').to_frame()
# dont need any other geographies
crosswalk = crosswalk_data_table[geographies]
# filter geo_cross_walk_df to only include geo_ids with lowest_geography controls
# (just in case geo_cross_walk_df table contains rows for unused low zones)
low_geography = geographies[-1]
low_control_data_df = get_control_data_table(low_geography)
rows_in_low_controls = crosswalk[low_geography].isin(low_control_data_df[low_geography])
crosswalk = crosswalk[rows_in_low_controls]
return crosswalk
def build_grouped_incidence_table(incidence_table, control_spec, seed_geography):
hh_incidence_table = incidence_table
household_id_col = setting('household_id_col')
hh_groupby_cols = list(control_spec.target) + [seed_geography]
hh_grouper = hh_incidence_table.groupby(hh_groupby_cols)
group_incidence_table = hh_grouper.max()
group_incidence_table['sample_weight'] = hh_grouper.sum()['sample_weight']
group_incidence_table['group_size'] = hh_grouper.count()['sample_weight']
group_incidence_table = group_incidence_table.reset_index()
logger.info("grouped incidence table has %s entries, ungrouped has %s"
% (len(group_incidence_table.index), len(hh_incidence_table.index)))
# add group_id of each hh to hh_incidence_table
group_incidence_table['group_id'] = group_incidence_table.index
hh_incidence_table['group_id'] = hh_incidence_table[hh_groupby_cols].merge(
group_incidence_table[hh_groupby_cols + ['group_id']],
on=hh_groupby_cols,
how='left').group_id.astype(int).values
# it doesn't really matter what the incidence_table index is until we create population
# when we need to expand each group to constituent households
# but incidence_table should have the same name whether grouped or ungrouped
# so that the rest of the steps can handle them interchangeably
group_incidence_table.index.name = hh_incidence_table.index.name
# create table mapping household_groups to households and their sample_weights
# explicitly provide hh_id as a column to make it easier for use when expanding population
household_groups = hh_incidence_table[['group_id', 'sample_weight']].copy()
household_groups[household_id_col] = household_groups.index.astype(int)
return group_incidence_table, household_groups
def filter_households(households_df, persons_df, crosswalk_df):
"""
Filter households and persons tables, removing zero weight households
and any households not in seed zones.
Returns filtered households_df and persons_df
"""
# drop any zero weight households (there are some in calm data)
hh_weight_col = setting('household_weight_col')
households_df = households_df[households_df[hh_weight_col] > 0]
# remove any households not in seed zones
seed_geography = setting('seed_geography')
seed_ids = crosswalk_df[seed_geography].unique()
rows_in_seed_zones = households_df[seed_geography].isin(seed_ids)
if rows_in_seed_zones.any():
households_df = households_df[rows_in_seed_zones]
logger.info("dropped %s households not in seed zones" % (~rows_in_seed_zones).sum())
logger.info("kept %s households in seed zones" % len(households_df))
return households_df, persons_df
@inject.step()
def setup_data_structures(settings, configs_dir, households, persons):
"""
Setup geographic correspondence (crosswalk), control sets, and incidence tables.
A control tables for target geographies should already have been read in by running
input_pre_processor. The zone control tables contains one row for each zone, with columns
specifying control field totals for that control
This step reads in the global control file, which specifies which control control fields
in the control table should be used for balancing, along with their importance and the
recipe (seed table and expression) for determining household incidence for that control.
If GROUP_BY_INCIDENCE_SIGNATURE setting is enabled, then incidence table rows are
household group ids and and additional household_groups table is created mapping hh group ids
to actual hh_ids.
Parameters
----------
settings: dict
contents of settings.yaml as dict
configs_dir: str
households: pipeline table
persons: pipeline table
creates pipeline tables:
crosswalk
controls
geography-specific controls
incidence_table
household_groups (if GROUP_BY_INCIDENCE_SIGNATURE setting is enabled)
modifies tables:
households
persons
"""
seed_geography = setting('seed_geography')
households_df = households.to_frame()
persons_df = persons.to_frame()
crosswalk_df = build_crosswalk_table()
inject.add_table('crosswalk', crosswalk_df)
control_spec = read_control_spec(setting('control_file_name', 'controls.csv'), configs_dir)
inject.add_table('control_spec', control_spec)
geographies = settings['geographies']
for g in geographies:
controls = build_control_table(g, control_spec, crosswalk_df)
inject.add_table(control_table_name(g), controls)
households_df, persons_df = filter_households(households_df, persons_df, crosswalk_df)
pipeline.replace_table('households', households_df)
pipeline.replace_table('persons', persons_df)
incidence_table = \
build_incidence_table(control_spec, households_df, persons_df, crosswalk_df)
incidence_table = add_geography_columns(incidence_table, households_df, crosswalk_df)
# add sample_weight col to incidence table
hh_weight_col = setting('household_weight_col')
incidence_table['sample_weight'] = households_df[hh_weight_col]
if setting('GROUP_BY_INCIDENCE_SIGNATURE') and not setting('NO_INTEGERIZATION_EVER', False):
group_incidence_table, household_groups \
= build_grouped_incidence_table(incidence_table, control_spec, seed_geography)
inject.add_table('household_groups', household_groups)
inject.add_table('incidence_table', group_incidence_table)
else:
inject.add_table('incidence_table', incidence_table)
@inject.step()
def repop_setup_data_structures(configs_dir, households, persons):
"""
Setup geographic correspondence (crosswalk), control sets, and incidence tables for repop run.
A new lowest-level geography control tables should already have been read in by rerunning
input_pre_processor with a table_list override. The control table contains one row for
each zone, with columns specifying control field totals for that control
This step reads in the repop control file, which specifies which control control fields
in the control table should be used for balancing, along with their importance and the
recipe (seed table and expression) for determining household incidence for that control.
Parameters
----------
configs_dir : str
households: pipeline table
persons: pipeline table
Returns
-------
"""
seed_geography = setting('seed_geography')
geographies = setting('geographies')
low_geography = geographies[-1]
# replace crosswalk table
crosswalk_df = build_crosswalk_table()
pipeline.replace_table('crosswalk', crosswalk_df)
# replace control_spec
control_file_name = setting('repop_control_file_name', 'repop_controls.csv')
control_spec = read_control_spec(control_file_name, configs_dir)
# repop control spec should only specify controls for lowest level geography
assert control_spec.geography.unique() == [low_geography]
pipeline.replace_table('control_spec', control_spec)
# build incidence_table with repop controls and households in repop zones
# filter households (dropping any not in crosswalk) in order to build incidence_table
# We DO NOT REPLACE households and persons as we need full tables to synthesize population
# (There is no problem, however, with overwriting the incidence_table and household_groups
# because the expand_households step has ALREADY created the expanded_household_ids table
# for the original simulated population. )
households_df = households.to_frame()
persons_df = persons.to_frame()
households_df, persons_df = filter_households(households_df, persons_df, crosswalk_df)
incidence_table = build_incidence_table(control_spec, households_df, persons_df, crosswalk_df)
incidence_table = add_geography_columns(incidence_table, households_df, crosswalk_df)
# add sample_weight col to incidence table
hh_weight_col = setting('household_weight_col')
incidence_table['sample_weight'] = households_df[hh_weight_col]
# rebuild control tables with only the low level controls (aggregated at higher levels)
for g in geographies:
controls = build_control_table(g, control_spec, crosswalk_df)
pipeline.replace_table(control_table_name(g), controls)
if setting('GROUP_BY_INCIDENCE_SIGNATURE') and not setting('NO_INTEGERIZATION_EVER', False):
group_incidence_table, household_groups \
= build_grouped_incidence_table(incidence_table, control_spec, seed_geography)
pipeline.replace_table('household_groups', household_groups)
pipeline.replace_table('incidence_table', group_incidence_table)
else:
pipeline.replace_table('incidence_table', incidence_table)