Skip to content

Commit

Permalink
chunk physical activiety
Browse files Browse the repository at this point in the history
  • Loading branch information
toliwaga committed Aug 25, 2016
1 parent 0681440 commit 5f09638
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 68 deletions.
47 changes: 28 additions & 19 deletions bca4abm/bca4abm.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def to_series(x, target=None):
if trace_results is not None:
trace_results = undupe_column_names(pd.DataFrame.from_items(trace_results))

# add df columns to trace_results
trace_results = pd.concat([df[trace_rows], trace_results], axis=1)

return variables, trace_results


Expand All @@ -281,7 +284,8 @@ def assign_variables_locals(settings, locals_tag=None):
return locals_dict


def chunked_df(df, trace_rows, chunk_size):
# generator for chunked iteration over dataframe by chunk_size
def size_chunked_df(df, trace_rows, chunk_size):
# generator to iterate over chooses in chunk_size chunks
if chunk_size == 0:
yield 0, df, trace_rows
Expand All @@ -296,38 +300,43 @@ def chunked_df(df, trace_rows, chunk_size):
i += chunk_size


# FIXME - need this for chunking physical activity processor
# def hh_chunked_choosers(df, trace_rows):
# # generator to iterate over chooses in chunk_size chunks
# last_chooser = df['chunk_id'].max()
# i = 0
# while i <= last_chooser:
# chunk_me = (df['chunk_id'] == i)
# if trace_rows is not None:
# yield i, df[chunk_me]
# else:
# yield i, df[chunk_me], trace_rows[chunk_me]
# i += 1
# generator for chunked iteration over dataframe by chunk_id
def id_chunked_df(df, trace_rows, chunk_id_col):
# generator to iterate over chooses in chunk_size chunks
last_chooser = df[chunk_id_col].max()
i = 0
while i <= last_chooser:
chunk_me = (df[chunk_id_col] == i)
if trace_rows is None:
yield i, df[chunk_me], None
else:
yield i, df[chunk_me], trace_rows[chunk_me]
i += 1


# return the appropriate generator for iterating over dataframe by either chunk_size or chunk_id
def chunked_df(df, trace_rows, chunk_size=None, chunk_id_col='chunk_id'):
if chunk_size is None:
return id_chunked_df(df, trace_rows, chunk_id_col)
else:
return size_chunked_df(df, trace_rows, chunk_size)


def eval_group_and_sum(assignment_expressions, df, locals_dict, group_by_column_names,
df_alias=None, chunk_size=0, trace_rows=None):

if group_by_column_names == [None]:
raise RuntimeError("calculate_benefits: group_by_column_names not initialized")
raise RuntimeError("eval_group_and_sum: group_by_column_names not initialized")

summary = trace_results = trace_rows_chunk = None
summary = trace_results = None
chunks = 0

for i, df_chunk, trace_rows_chunk in chunked_df(df, trace_rows_chunk, chunk_size):
for i, df_chunk, trace_rows_chunk in chunked_df(df, trace_rows, chunk_size):

chunks += 1

# print "eval_and_sum chunk %s i: %s" % (chunks, i)

if trace_rows is not None:
trace_rows_chunk = trace_rows[i: i+chunk_size]

assigned_chunk, trace_chunk = assign_variables(assignment_expressions,
df_chunk,
locals_dict=locals_dict,
Expand Down
4 changes: 1 addition & 3 deletions bca4abm/processors/auto_ownership.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def auto_ownership_processor(persons_merged,

if trace_results is not None:

# add persons_df columns to trace_results
df = pd.concat([persons_df[trace_rows], trace_results], axis=1)
tracing.write_csv(df,
tracing.write_csv(trace_results,
file_name="auto_ownership_processor",
index_label='person_id',
columns=None,
Expand Down
4 changes: 1 addition & 3 deletions bca4abm/processors/demographics.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def demographics_processor(persons_merged, demographics_spec, settings,

if trace_results is not None:

# add trips_df columns to trace_results
df = pd.concat([persons_df[trace_rows], trace_results], axis=1)
tracing.write_csv(df,
tracing.write_csv(trace_results,
file_name="demographics_processor",
index_label='person_idx',
columns=None,
Expand Down
6 changes: 4 additions & 2 deletions bca4abm/processors/person_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ def person_trips_processor(trips_with_demographics,

if trace_results is not None:

# FIXME - moved this into assign_variables
# add trips_df columns to trace_results
df = pd.concat([trips_df[trace_rows], trace_results], axis=1)
tracing.write_csv(df,
# trace_results = pd.concat([trips_df[trace_rows], trace_results], axis=1)

tracing.write_csv(trace_results,
file_name="person_trips_processor",
index_label='trip_id',
columns=None,
Expand Down
136 changes: 96 additions & 40 deletions bca4abm/processors/physical_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from bca4abm import bca4abm as bca
from ..util.misc import add_grouped_results

from ..util.misc import add_result_columns, add_summary_results

from bca4abm import tracing

"""
physical activity processor
"""
Expand All @@ -27,48 +31,100 @@ def physical_activity_processor(trips_with_demographics,
persons_merged,
physical_activity_trip_spec,
physical_activity_person_spec,
settings):

print "---------- physical_activity_processor"
coc_column_names,
settings,
hh_chunk_size,
trace_hh_id):

chunk_id_col = 'chunk_id'
trips_df = trips_with_demographics.to_frame()
persons_df = persons_merged.to_frame()

locals_dict = bca.assign_variables_locals(settings, 'locals_physical_activity')
locals_dict['trips'] = trips_df

assigned_columns, trace_results = bca.assign_variables(physical_activity_trip_spec,
df=trips_df,
locals_dict=locals_dict)

# add assigned columns to local trips df
trips_df = pd.concat([trips_df, assigned_columns], axis=1)
tracing.info(__name__,
"Running physical_activity_processor with %d trips for %d persons "
"(hh_chunk_size size = %s)"
% (len(trips_df), len(persons_df), hh_chunk_size))

# aggregate trips to persons: sum assigned_columns, group by hh_id, person_idx
summary_columns = assigned_columns.columns
grouped = trips_df.groupby(['hh_id', 'person_idx'])
aggregations = {column: 'sum' for column in summary_columns}
persons_activity_df = grouped.agg(aggregations)
persons_activity_df.reset_index(inplace=True)
if coc_column_names == [None]:
raise RuntimeError("physical_activity_processor: coc_column_names not initialized")

# merge aggregated assigned_columns to persons_df
persons_df = persons_merged.to_frame()
persons_df = pd.merge(persons_df, persons_activity_df, on=['hh_id', 'person_idx'])

# eval physical_activity_person_spec in context of merged trips_with_demographics
locals_dict = {'settings': settings, 'persons': persons_df}
if 'locals_physical_activity' in settings:
locals_dict.update(settings['locals_physical_activity'])
assigned_columns, trace_results = bca.assign_variables(physical_activity_person_spec,
df=persons_df,
locals_dict=locals_dict)

# merge aggregated assigned_columns to local persons_df
persons_df = pd.concat([persons_df, assigned_columns], axis=1)

add_grouped_results(persons_df, assigned_columns.columns, prefix='PA_')

if settings.get("dump", False) and settings.get("dump_physical_activity", True):
output_dir = orca.eval_variable('output_dir')
csv_file_name = os.path.join(output_dir, 'physical_activity.csv')
persons_df = persons_df[['hh_id', 'person_idx'] + list(assigned_columns.columns)]
persons_df.to_csv(csv_file_name, index=False)
locals_dict = bca.assign_variables_locals(settings, 'locals_physical_activity')
trip_trace_rows = trace_hh_id and trips_df['hh_id'] == trace_hh_id

coc_summary = None
chunks = 0

# because the persons table doesn't have an identity column
# we need to use a compound key to group trips by person
person_identity_columns = ['hh_id', 'person_idx']

# iterate over trips df chunked by hh_id
for chunk_id, trips_chunk, trace_rows_chunk \
in bca.chunked_df(trips_df, trip_trace_rows, chunk_size=None):

chunks += 1

# slice persons_df for this chunk
persons_chunk = persons_df[persons_df[chunk_id_col] == chunk_id]

trip_activity, trace_results = \
bca.assign_variables(physical_activity_trip_spec,
trips_chunk,
locals_dict=locals_dict,
df_alias='trips',
trace_rows=trace_rows_chunk)

if trace_results is not None:
tracing.write_csv(trace_results,
file_name="physical_activity_processor_trips",
index_label='trip_id',
columns=None,
column_labels=['label', 'trip'],
transpose=True)

# concat the person_group_by_column_names columns into trip_activity
trip_activity = pd.concat([trips_chunk[person_identity_columns], trip_activity], axis=1)

# sum trip activity for each unique person
trip_activity = trip_activity.groupby(person_identity_columns).sum()
trip_activity.reset_index(inplace=True)

# merge person-level trip activity sums into persons_chunk
persons_chunk = pd.merge(persons_chunk, trip_activity, on=person_identity_columns)

person_trace_rows = trace_hh_id and persons_chunk['hh_id'] == trace_hh_id

person_activity, trace_results = \
bca.assign_variables(physical_activity_person_spec,
persons_chunk,
locals_dict=locals_dict,
df_alias='persons',
trace_rows=person_trace_rows)

if trace_results is not None:
tracing.write_csv(trace_results,
file_name="physical_activity_processor_persons",
index_label='persons_merged_table_index',
column_labels=['label', 'person'],
transpose=True)

# concat in the coc columns
person_activity = pd.concat([persons_chunk[coc_column_names], person_activity], axis=1)

# summarize the chunk by coc
chunk_summary = person_activity.groupby(coc_column_names).sum()

# accumulate chunk_summaries in df
if coc_summary is None:
coc_summary = chunk_summary
else:
coc_summary = pd.concat([coc_summary, chunk_summary], axis=0)

if chunks > 1:
# squash the accumulated chunk summaries by reapplying group and sum
coc_summary.reset_index(inplace=True)
coc_summary = coc_summary.groupby(coc_column_names).sum()

result_prefix = 'PA_'
add_result_columns("coc_results", coc_summary, result_prefix)
add_summary_results(coc_summary, prefix=result_prefix, spec=physical_activity_person_spec)
3 changes: 2 additions & 1 deletion example/configs/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dump: True
# trace household id; comment out for no trace
trace_hh_id: 60

chunk_size: 5
chunk_size: 10
hh_chunk_size: 5

scenario_year: sample

Expand Down

0 comments on commit 5f09638

Please sign in to comment.