diff --git a/activitysim/abm/models/__init__.py b/activitysim/abm/models/__init__.py index fbe6422bf0..976c790bca 100644 --- a/activitysim/abm/models/__init__.py +++ b/activitysim/abm/models/__init__.py @@ -18,5 +18,7 @@ import atwork_subtour_destination import atwork_subtour_scheduling +import utility_steps + # parameterized models import annotate_table diff --git a/activitysim/abm/models/annotate_table.py b/activitysim/abm/models/annotate_table.py index a4e14721c3..a15114f73c 100644 --- a/activitysim/abm/models/annotate_table.py +++ b/activitysim/abm/models/annotate_table.py @@ -18,7 +18,6 @@ from activitysim.core.util import assign_in_place logger = logging.getLogger(__name__) -DUMP = True @inject.step() diff --git a/activitysim/abm/models/atwork_subtour_destination.py b/activitysim/abm/models/atwork_subtour_destination.py index e050a16455..d07f85ebb5 100644 --- a/activitysim/abm/models/atwork_subtour_destination.py +++ b/activitysim/abm/models/atwork_subtour_destination.py @@ -273,6 +273,8 @@ def atwork_subtour_destination_simulate(tours, pipeline.replace_table("tours", tours) + pipeline.drop_table('atwork_subtour_destination_sample') + if trace_hh_id: tracing.trace_df(tours, label=trace_label, diff --git a/activitysim/abm/models/atwork_subtour_frequency.py b/activitysim/abm/models/atwork_subtour_frequency.py index ca15113c25..3e04287d15 100644 --- a/activitysim/abm/models/atwork_subtour_frequency.py +++ b/activitysim/abm/models/atwork_subtour_frequency.py @@ -76,6 +76,7 @@ def atwork_subtour_frequency(tours, spec=atwork_subtour_frequency_spec, nest_spec=nest_spec, locals_d=constants, + chunk_size=chunk_size, trace_label=trace_label, trace_choice_name='atwork_subtour_frequency') @@ -97,13 +98,13 @@ def atwork_subtour_frequency(tours, subtours = process_atwork_subtours(work_tours, atwork_subtour_frequency_alternatives) - pipeline.extend_table("tours", subtours) - tracing.register_traceable_table('tours', subtours) + print subtours + + tours = pipeline.extend_table("tours", subtours) + tracing.register_traceable_table('tours', tours) pipeline.get_rn_generator().add_channel(subtours, 'tours') if trace_hh_id: - trace_columns = ['atwork_subtour_frequency'] - tracing.trace_df(inject.get_table('tours').to_frame(), - label=trace_label, - columns=trace_columns, + tracing.trace_df(tours, + label='atwork_subtour_frequency.tours', warn_if_empty=True) diff --git a/activitysim/abm/models/atwork_subtour_scheduling.py b/activitysim/abm/models/atwork_subtour_scheduling.py index d390acd236..67ef0a1dc9 100644 --- a/activitysim/abm/models/atwork_subtour_scheduling.py +++ b/activitysim/abm/models/atwork_subtour_scheduling.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -DUMP = True +DUMP = False @inject.injectable() diff --git a/activitysim/abm/models/auto_ownership.py b/activitysim/abm/models/auto_ownership.py index 69fe3691d0..b128a096de 100644 --- a/activitysim/abm/models/auto_ownership.py +++ b/activitysim/abm/models/auto_ownership.py @@ -3,7 +3,7 @@ import logging -from activitysim.core import simulate as asim +from activitysim.core import simulate from activitysim.core import tracing from activitysim.core import pipeline from activitysim.core import config @@ -14,7 +14,7 @@ @inject.injectable() def auto_ownership_spec(configs_dir): - return asim.read_model_spec(configs_dir, 'auto_ownership.csv') + return simulate.read_model_spec(configs_dir, 'auto_ownership.csv') @inject.injectable() @@ -26,23 +26,26 @@ def auto_ownership_settings(configs_dir): def auto_ownership_simulate(households_merged, auto_ownership_spec, auto_ownership_settings, + chunk_size, trace_hh_id): """ Auto ownership is a standard model which predicts how many cars a household with given characteristics owns """ + trace_label = 'auto_ownership_simulate' logger.info("Running auto_ownership_simulate with %d households" % len(households_merged)) nest_spec = config.get_logit_model_settings(auto_ownership_settings) constants = config.get_model_constants(auto_ownership_settings) - choices = asim.simple_simulate( + choices = simulate.simple_simulate( choosers=households_merged.to_frame(), spec=auto_ownership_spec, nest_spec=nest_spec, locals_d=constants, - trace_label=trace_hh_id and 'auto_ownership', + chunk_size=chunk_size, + trace_label=trace_label, trace_choice_name='auto_ownership') tracing.print_summary('auto_ownership', choices, value_counts=True) diff --git a/activitysim/abm/models/initialize.py b/activitysim/abm/models/initialize.py index bbd00fb758..76d49ebd48 100644 --- a/activitysim/abm/models/initialize.py +++ b/activitysim/abm/models/initialize.py @@ -19,43 +19,37 @@ @inject.step() def initialize(): """ - Because random seed is set differently for each step, the sampling of households depends - on which step they are initially loaded in. - - We load them explicitly up front, so that + on which step they are initially loaded in so we force them to load here and they get + stored to the pipeline, """ t0 = tracing.print_elapsed_time() inject.get_table('land_use').to_frame() - t0 = tracing.print_elapsed_time("preload land_use") + t0 = tracing.print_elapsed_time("preload land_use", t0, debug=True) inject.get_table('households').to_frame() - t0 = tracing.print_elapsed_time("preload households") + t0 = tracing.print_elapsed_time("preload households", t0, debug=True) inject.get_table('persons').to_frame() - t0 = tracing.print_elapsed_time("preload persons") + t0 = tracing.print_elapsed_time("preload persons", t0, debug=True) inject.get_table('person_windows').to_frame() - t0 = tracing.print_elapsed_time("preload person_windows") - - pass + t0 = tracing.print_elapsed_time("preload person_windows", t0, debug=True) @inject.injectable(cache=True) def preload_injectables(): """ - called after pipeline is + preload bulky injectables up front - stuff that isn't inserted into eh pipeline """ - # could simply list injectables as arguments, but this way we can report timing... - logger.info("preload_injectables") t0 = tracing.print_elapsed_time() if inject.get_injectable('skim_dict', None) is not None: - t0 = tracing.print_elapsed_time("preload skim_dict") + t0 = tracing.print_elapsed_time("preload skim_dict", t0, debug=True) if inject.get_injectable('skim_stack', None) is not None: - t0 = tracing.print_elapsed_time("preload skim_stack") + t0 = tracing.print_elapsed_time("preload skim_stack", t0, debug=True) diff --git a/activitysim/abm/models/mandatory_scheduling.py b/activitysim/abm/models/mandatory_scheduling.py index 373a6a7c91..d161e8b485 100644 --- a/activitysim/abm/models/mandatory_scheduling.py +++ b/activitysim/abm/models/mandatory_scheduling.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -DUMP = True +DUMP = False @inject.injectable() diff --git a/activitysim/abm/models/mandatory_tour_frequency.py b/activitysim/abm/models/mandatory_tour_frequency.py index 9360a09b03..509ce23b9d 100644 --- a/activitysim/abm/models/mandatory_tour_frequency.py +++ b/activitysim/abm/models/mandatory_tour_frequency.py @@ -6,7 +6,7 @@ import pandas as pd -from activitysim.core import simulate as asim +from activitysim.core import simulate from activitysim.core import tracing from activitysim.core import pipeline from activitysim.core import config @@ -20,7 +20,7 @@ @inject.injectable() def mandatory_tour_frequency_spec(configs_dir): - return asim.read_model_spec(configs_dir, 'mandatory_tour_frequency.csv') + return simulate.read_model_spec(configs_dir, 'mandatory_tour_frequency.csv') @inject.injectable() @@ -41,12 +41,15 @@ def mandatory_tour_frequency_alternatives(configs_dir): def mandatory_tour_frequency(persons_merged, mandatory_tour_frequency_spec, mandatory_tour_frequency_settings, + chunk_size, trace_hh_id): """ This model predicts the frequency of making mandatory trips (see the alternatives above) - these trips include work and school in some combination. """ + trace_label = 'mandatory_tour_frequency' + choosers = persons_merged.to_frame() # filter based on results of CDAP choosers = choosers[choosers.cdap_activity == 'M'] @@ -55,12 +58,13 @@ def mandatory_tour_frequency(persons_merged, nest_spec = config.get_logit_model_settings(mandatory_tour_frequency_settings) constants = config.get_model_constants(mandatory_tour_frequency_settings) - choices = asim.simple_simulate( + choices = simulate.simple_simulate( choosers, spec=mandatory_tour_frequency_spec, nest_spec=nest_spec, locals_d=constants, - trace_label=trace_hh_id and 'mandatory_tour_frequency', + chunk_size=chunk_size, + trace_label=trace_label, trace_choice_name='mandatory_tour_frequency') # convert indexes to alternative names @@ -72,16 +76,16 @@ def mandatory_tour_frequency(persons_merged, inject.add_column("persons", "mandatory_tour_frequency", choices) - create_mandatory_tours() + create_mandatory_tours(trace_hh_id) # add mandatory_tour-dependent columns (e.g. tour counts) to persons pipeline.add_dependent_columns("persons", "persons_mtf") if trace_hh_id: trace_columns = ['mandatory_tour_frequency'] - tracing.trace_df(inject.get_table('persons_merged').to_frame(), - label="mandatory_tour_frequency", - columns=trace_columns, + tracing.trace_df(inject.get_table('persons').to_frame(), + label="mandatory_tour_frequency.persons", + # columns=trace_columns, warn_if_empty=True) @@ -92,7 +96,7 @@ def mandatory_tour_frequency(persons_merged, """ -def create_mandatory_tours(): +def create_mandatory_tours(trace_hh_id): # FIXME - move this to body? @@ -105,14 +109,19 @@ def create_mandatory_tours(): tour_frequency_alternatives = inject.get_injectable('mandatory_tour_frequency_alternatives') - tours = process_mandatory_tours(persons, tour_frequency_alternatives) + mandatory_tours = process_mandatory_tours(persons, tour_frequency_alternatives) expressions.assign_columns( - df=tours, + df=mandatory_tours, model_settings='annotate_tours_with_dest', configs_dir=configs_dir, trace_label='create_mandatory_tours') - pipeline.extend_table("tours", tours) + tours = pipeline.extend_table("tours", mandatory_tours) tracing.register_traceable_table('tours', tours) - pipeline.get_rn_generator().add_channel(tours, 'tours') + pipeline.get_rn_generator().add_channel(mandatory_tours, 'tours') + + if trace_hh_id: + tracing.trace_df(mandatory_tours, + label="mandatory_tour_frequency.mandatory_tours", + warn_if_empty=True) diff --git a/activitysim/abm/models/mode_choice.py b/activitysim/abm/models/mode_choice.py index f288402185..293482cae8 100644 --- a/activitysim/abm/models/mode_choice.py +++ b/activitysim/abm/models/mode_choice.py @@ -7,12 +7,11 @@ import pandas as pd import yaml -from activitysim.core import simulate as asim +from activitysim.core import simulate from activitysim.core import tracing from activitysim.core import config from activitysim.core import inject -from activitysim.core.util import memory_info - +from activitysim.core.util import force_garbage_collect from activitysim.core.util import assign_in_place from .util.mode import _mode_choice_spec @@ -31,6 +30,7 @@ def _mode_choice_simulate(records, spec, constants, nest_spec, + chunk_size, trace_label=None, trace_choice_name=None ): """ @@ -56,13 +56,15 @@ def _mode_choice_simulate(records, if od_skim_stack_wrapper is not None: skims.append(od_skim_stack_wrapper) - choices = asim.simple_simulate(records, - spec, - nest_spec, - skims=skims, - locals_d=locals_d, - trace_label=trace_label, - trace_choice_name=trace_choice_name) + choices = simulate.simple_simulate( + records, + spec, + nest_spec, + skims=skims, + locals_d=locals_d, + chunk_size=chunk_size, + trace_label=trace_label, + trace_choice_name=trace_choice_name) alts = spec.columns choices = choices.map(dict(zip(range(len(alts)), alts))) @@ -102,7 +104,7 @@ def tour_mode_choice_settings(configs_dir): @inject.injectable() def tour_mode_choice_spec_df(configs_dir): - return asim.read_model_spec(configs_dir, 'tour_mode_choice.csv') + return simulate.read_model_spec(configs_dir, 'tour_mode_choice.csv') @inject.injectable() @@ -114,10 +116,12 @@ def tour_mode_choice_coeffs(configs_dir): @inject.injectable() def tour_mode_choice_spec(tour_mode_choice_spec_df, tour_mode_choice_coeffs, - tour_mode_choice_settings): + tour_mode_choice_settings, + trace_hh_id): return _mode_choice_spec(tour_mode_choice_spec_df, tour_mode_choice_coeffs, tour_mode_choice_settings, + trace_spec=trace_hh_id, trace_label='tour_mode_choice') @@ -127,6 +131,7 @@ def atwork_subtour_mode_choice_simulate(tours, tour_mode_choice_spec, tour_mode_choice_settings, skim_dict, skim_stack, + chunk_size, trace_hh_id): """ At-work subtour mode choice simulate @@ -174,6 +179,7 @@ def atwork_subtour_mode_choice_simulate(tours, spec=spec, constants=constants, nest_spec=nest_spec, + chunk_size=chunk_size, trace_label=trace_label, trace_choice_name='tour_mode_choice') @@ -191,8 +197,7 @@ def atwork_subtour_mode_choice_simulate(tours, columns=trace_columns, warn_if_empty=True) - # FIXME - this forces garbage collection - memory_info() + force_garbage_collect() @inject.step() @@ -200,12 +205,13 @@ def tour_mode_choice_simulate(tours_merged, tour_mode_choice_spec, tour_mode_choice_settings, skim_dict, skim_stack, + chunk_size, trace_hh_id): """ Tour mode choice simulate """ - trace_label = trace_hh_id and 'tour_mode_choice' + trace_label = 'tour_mode_choice' tours = tours_merged.to_frame() @@ -258,6 +264,7 @@ def tour_mode_choice_simulate(tours_merged, spec=spec, constants=constants, nest_spec=nest_spec, + chunk_size=chunk_size, trace_label=tracing.extend_trace_label(trace_label, tour_type), trace_choice_name='tour_mode_choice') @@ -267,8 +274,7 @@ def tour_mode_choice_simulate(tours_merged, choices_list.append(choices) # FIXME - force garbage collection - mem = memory_info() - logger.debug('memory_info tour_type %s, %s' % (tour_type, mem)) + force_garbage_collect() choices = pd.concat(choices_list) @@ -286,9 +292,6 @@ def tour_mode_choice_simulate(tours_merged, columns=trace_columns, warn_if_empty=True) - # FIXME - this forces garbage collection - memory_info() - """ Trip mode choice is run for all trips to determine the transportation mode that @@ -303,7 +306,7 @@ def trip_mode_choice_settings(configs_dir): @inject.injectable() def trip_mode_choice_spec_df(configs_dir): - return asim.read_model_spec(configs_dir, 'trip_mode_choice.csv') + return simulate.read_model_spec(configs_dir, 'trip_mode_choice.csv') @inject.injectable() @@ -315,10 +318,13 @@ def trip_mode_choice_coeffs(configs_dir): @inject.injectable() def trip_mode_choice_spec(trip_mode_choice_spec_df, trip_mode_choice_coeffs, - trip_mode_choice_settings): + trip_mode_choice_settings, + trace_hh_id): return _mode_choice_spec(trip_mode_choice_spec_df, trip_mode_choice_coeffs, - trip_mode_choice_settings) + trip_mode_choice_settings, + trace_spec=trace_hh_id, + trace_label='trip_mode_choice') @inject.step() @@ -327,10 +333,12 @@ def trip_mode_choice_simulate(trips_merged, trip_mode_choice_settings, skim_dict, skim_stack, + chunk_size, trace_hh_id): """ Trip mode choice simulate """ + trace_label = 'tour_mode_choice' trips = trips_merged.to_frame() @@ -339,8 +347,6 @@ def trip_mode_choice_simulate(trips_merged, logger.info("Running trip_mode_choice_simulate with %d trips" % len(trips)) - print "\ntrips.columns\n", trips.columns - odt_skim_stack_wrapper = skim_stack.wrap(left_key='OTAZ', right_key='DTAZ', skim_key="start_period") @@ -358,8 +364,6 @@ def trip_mode_choice_simulate(trips_merged, # FIXME - check that destination is not null - trace_label = trace_hh_id and ('trip_mode_choice_%s' % tour_type) - choices = _mode_choice_simulate( segment, odt_skim_stack_wrapper=odt_skim_stack_wrapper, @@ -368,7 +372,8 @@ def trip_mode_choice_simulate(trips_merged, spec=get_segment_and_unstack(trip_mode_choice_spec, tour_type), constants=constants, nest_spec=nest_spec, - trace_label=trace_label, + chunk_size=chunk_size, + trace_label=tracing.extend_trace_label(trace_label, tour_type), trace_choice_name='trip_mode_choice') # FIXME - no point in printing verbose value_counts now that we have tracing? @@ -378,8 +383,7 @@ def trip_mode_choice_simulate(trips_merged, choices_list.append(choices) # FIXME - force garbage collection - mem = memory_info() - logger.debug('memory_info tour_type %s, %s' % (tour_type, mem)) + force_garbage_collect() choices = pd.concat(choices_list) @@ -397,5 +401,4 @@ def trip_mode_choice_simulate(trips_merged, index_label='trip_id', warn_if_empty=True) - # FIXME - this forces garbage collection - memory_info() + force_garbage_collect() diff --git a/activitysim/abm/models/non_mandatory_scheduling.py b/activitysim/abm/models/non_mandatory_scheduling.py index 29a781cbe4..3ac6662f09 100644 --- a/activitysim/abm/models/non_mandatory_scheduling.py +++ b/activitysim/abm/models/non_mandatory_scheduling.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -DUMP = True +DUMP = False @inject.injectable() diff --git a/activitysim/abm/models/non_mandatory_tour_frequency.py b/activitysim/abm/models/non_mandatory_tour_frequency.py index 88f94ad928..5dbb321f0c 100644 --- a/activitysim/abm/models/non_mandatory_tour_frequency.py +++ b/activitysim/abm/models/non_mandatory_tour_frequency.py @@ -80,16 +80,15 @@ def non_mandatory_tour_frequency(persons_merged, spec=non_mandatory_tour_frequency_spec[[name]], locals_d=constants, chunk_size=chunk_size, - trace_label=trace_hh_id and 'non_mandatory_tour_frequency.%s' % name, + trace_label='non_mandatory_tour_frequency.%s' % name, trace_choice_name='non_mandatory_tour_frequency') choices_list.append(choices) - t0 = print_elapsed_time("non_mandatory_tour_frequency.%s" % name, t0) + t0 = print_elapsed_time("non_mandatory_tour_frequency.%s" % name, t0, debug=True) # FIXME - force garbage collection - # mem = memory_info() - # logger.info('memory_info ptype %s, %s' % (name, mem)) + # force_garbage_collect() choices = pd.concat(choices_list) @@ -99,20 +98,20 @@ def non_mandatory_tour_frequency(persons_merged, # FIXME - how about the persons not processed inject.add_column("persons", "non_mandatory_tour_frequency", choices) - create_non_mandatory_tours() + create_non_mandatory_tours(trace_hh_id) # add non_mandatory_tour-dependent columns (e.g. tour counts) to persons pipeline.add_dependent_columns("persons", "persons_nmtf") if trace_hh_id: trace_columns = ['non_mandatory_tour_frequency'] - tracing.trace_df(inject.get_table('persons_merged').to_frame(), - label="non_mandatory_tour_frequency", - columns=trace_columns, + tracing.trace_df(inject.get_table('persons').to_frame(), + label="non_mandatory_tour_frequency.persons", + # columns=trace_columns, warn_if_empty=True) -def create_non_mandatory_tours(): +def create_non_mandatory_tours(trace_hh_id): """ We have now generated non-mandatory tours, but they are attributes of the person table Now we create a "tours" table which has one row per tour that has been generated @@ -122,11 +121,16 @@ def create_non_mandatory_tours(): persons = inject.get_table('persons') alts = inject.get_injectable('non_mandatory_tour_frequency_alts') - df = process_non_mandatory_tours( + non_mandatory_tours = process_non_mandatory_tours( persons.non_mandatory_tour_frequency.dropna(), alts ) - pipeline.extend_table("tours", df) - tracing.register_traceable_table('tours', df) - pipeline.get_rn_generator().add_channel(df, 'tours') + tours = pipeline.extend_table("tours", non_mandatory_tours) + tracing.register_traceable_table('tours', tours) + pipeline.get_rn_generator().add_channel(non_mandatory_tours, 'tours') + + if trace_hh_id: + tracing.trace_df(non_mandatory_tours, + label="non_mandatory_tour_frequency.non_mandatory_tours", + warn_if_empty=True) diff --git a/activitysim/abm/models/school_location.py b/activitysim/abm/models/school_location.py index 61b41cbd26..cf5522359a 100644 --- a/activitysim/abm/models/school_location.py +++ b/activitysim/abm/models/school_location.py @@ -2,7 +2,9 @@ # See full license in LICENSE.txt. import logging +from collections import OrderedDict +import numpy as np import pandas as pd from activitysim.core import tracing @@ -31,6 +33,9 @@ logger = logging.getLogger(__name__) DUMP = False +# use int not str to identify school type in sample df +SCHOOL_TYPE_ID = OrderedDict([('university', 1), ('highschool', 2), ('gradeschool', 3)]) + @inject.injectable() def school_location_sample_spec(configs_dir): @@ -96,7 +101,7 @@ def school_location_sample( choosers = choosers[chooser_columns] choices_list = [] - for school_type in ['university', 'highschool', 'gradeschool']: + for school_type, school_type_id in SCHOOL_TYPE_ID.iteritems(): locals_d['segment'] = school_type @@ -121,11 +126,14 @@ def school_location_sample( chunk_size=chunk_size, trace_label=tracing.extend_trace_label(trace_label, school_type)) - choices['school_type'] = school_type + choices['school_type'] = school_type_id choices_list.append(choices) choices = pd.concat(choices_list) + # - # NARROW + choices['school_type'] = choices['school_type'].astype(np.uint8) + inject.add_table('school_location_sample', choices) @@ -173,7 +181,7 @@ def school_location_logsums( persons_merged = persons_merged.to_frame() school_location_sample = school_location_sample.to_frame() - logger.info("Running school_location_sample with %s rows" % len(school_location_sample)) + logger.info("Running school_location_logsums with %s rows" % school_location_sample.shape[0]) # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = school_location_settings['LOGSUM_CHOOSER_COLUMNS'] @@ -182,11 +190,11 @@ def school_location_logsums( tracing.dump_df(DUMP, persons_merged, trace_label, 'persons_merged') logsums_list = [] - for school_type in ['university', 'highschool', 'gradeschool']: + for school_type, school_type_id in SCHOOL_TYPE_ID.iteritems(): logsums_spec = mode_choice_logsums_spec(configs_dir, school_type) - choosers = school_location_sample[school_location_sample['school_type'] == school_type] + choosers = school_location_sample[school_location_sample['school_type'] == school_type_id] choosers = pd.merge( choosers, @@ -261,12 +269,13 @@ def school_location_simulate(persons_merged, tracing.dump_df(DUMP, choosers, 'school_location_simulate', 'choosers') choices_list = [] - for school_type in ['university', 'highschool', 'gradeschool']: + for school_type, school_type_id in SCHOOL_TYPE_ID.iteritems(): locals_d['segment'] = school_type choosers_segment = choosers[choosers["is_" + school_type]] - alts_segment = school_location_sample[school_location_sample['school_type'] == school_type] + alts_segment = \ + school_location_sample[school_location_sample['school_type'] == school_type_id] # alternatives are pre-sampled and annotated with logsums and pick_count # but we have to merge additional alt columns into alt sample list @@ -303,6 +312,8 @@ def school_location_simulate(persons_merged, pipeline.add_dependent_columns("persons", "persons_school") + pipeline.drop_table('school_location_sample') + if trace_hh_id: trace_columns = ['school_taz'] + inject.get_table('persons_school').columns tracing.trace_df(inject.get_table('persons_merged').to_frame(), diff --git a/activitysim/abm/models/util/cdap.py b/activitysim/abm/models/util/cdap.py index 5ba46be26e..8090a20020 100644 --- a/activitysim/abm/models/util/cdap.py +++ b/activitysim/abm/models/util/cdap.py @@ -10,9 +10,8 @@ from activitysim.core.simulate import eval_variables from activitysim.core.simulate import compute_utilities -from activitysim.core.simulate import hh_chunked_choosers -from activitysim.core.simulate import num_chunk_rows_for_chunk_size +from activitysim.core import chunk from activitysim.core import logit from activitysim.core import tracing @@ -599,23 +598,6 @@ def household_activity_choices(indiv_utils, interaction_coefficients, hhsize, # convert choice expressed as index into alternative name from util column label choices = pd.Series(utils.columns[idx_choices].values, index=utils.index) - # if DUMP: - # - # if hhsize > 1: - # tracing.trace_df(choosers, '%s.DUMP.hhsize%d_choosers' % (trace_label, hhsize), - # transpose=False, slicer='NONE') - # tracing.trace_df(vars, '%s.DUMP.hhsize%d_vars' % (trace_label, hhsize), - # transpose=False, slicer='NONE') - # - # tracing.trace_df(utils, '%s.DUMP.hhsize%d_utils' % (trace_label, hhsize), - # transpose=False, slicer='NONE') - # - # tracing.trace_df(probs, '%s.DUMP.hhsize%d_probs' % (trace_label, hhsize), - # transpose=False, slicer='NONE') - # - # tracing.trace_df(choices, '%s.DUMP.hhsize%d_activity_choices' % (trace_label, hhsize), - # transpose=False, slicer='NONE') - if trace_hh_id: if hhsize > 1: @@ -829,10 +811,34 @@ def _run_cdap( # tracing.trace_df(cdap_results, '%s.DUMP.cdap_results' % trace_label, # transpose=False, slicer='NONE') + cum_size = chunk.log_df_size(trace_label, 'persons', persons, cum_size=None) + chunk.log_chunk_size(trace_label, cum_size) + # return dataframe with two columns return cdap_results +# calc_rows_per_chunk(chunk_size, persons, by_chunk_id=True) +def calc_rows_per_chunk(chunk_size, choosers, trace_label=None): + + # NOTE we chunk chunk_id + num_choosers = choosers['chunk_id'].max() + 1 + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = choosers.shape[1] + + # scale row_size by average number of chooser rows per chunk_id + rows_per_chunk_id = choosers.shape[0] / float(num_choosers) + row_size = int(rows_per_chunk_id * chooser_row_size) + + logger.debug("%s #chunk_calc choosers %s" % (trace_label, choosers.shape)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def run_cdap( persons, cdap_indiv_spec, @@ -881,12 +887,11 @@ def run_cdap( trace_label = tracing.extend_trace_label(trace_label, 'cdap') - # FIXME - what is the actual size/cardinality of the chooser - rows_per_chunk = num_chunk_rows_for_chunk_size(chunk_size, persons, by_chunk_id=True) + rows_per_chunk = calc_rows_per_chunk(chunk_size, persons, trace_label=trace_label) result_list = [] # segment by person type and pick the right spec for each person type - for i, num_chunks, persons_chunk in hh_chunked_choosers(persons, rows_per_chunk): + for i, num_chunks, persons_chunk in chunk.hh_chunked_choosers(persons, rows_per_chunk): logger.info("Running chunk %s of %s with %d persons" % (i, num_chunks, len(persons_chunk))) diff --git a/activitysim/abm/models/util/logsums.py b/activitysim/abm/models/util/logsums.py index 50d2f7bc99..60bc8e3c4d 100644 --- a/activitysim/abm/models/util/logsums.py +++ b/activitysim/abm/models/util/logsums.py @@ -56,7 +56,7 @@ def compute_logsums(choosers, logsum_spec, logsum_settings, nest_spec = config.get_logit_model_settings(logsum_settings) constants = config.get_model_constants(logsum_settings) - logger.info("Running compute_logsums with %d choosers" % len(choosers.index)) + logger.info("Running %s with %d choosers" % (trace_label, choosers.shape[0])) if trace_hh_id: tracing.trace_df(logsum_spec, diff --git a/activitysim/abm/models/util/mode.py b/activitysim/abm/models/util/mode.py index c4702e2453..bfb4a0052b 100644 --- a/activitysim/abm/models/util/mode.py +++ b/activitysim/abm/models/util/mode.py @@ -137,8 +137,8 @@ def expand_alternatives(df): return df -def _mode_choice_spec(mode_choice_spec_df, mode_choice_coeffs, - mode_choice_settings, trace_label=None): +def _mode_choice_spec(mode_choice_spec_df, mode_choice_coeffs, mode_choice_settings, + trace_spec=False, trace_label=None): """ Ok we have read in the spec - we need to do several things to reformat it to the same style spec that all the other models have. @@ -178,7 +178,7 @@ def _mode_choice_spec(mode_choice_spec_df, mode_choice_coeffs, df = mode_choice_spec_df index_name = df.index.name - if trace_label: + if trace_spec: tracing.trace_df(df, tracing.extend_trace_label(trace_label, 'raw'), slicer='NONE', transpose=False) @@ -192,7 +192,7 @@ def _mode_choice_spec(mode_choice_spec_df, mode_choice_coeffs, # set index to ['Expression', 'Alternative'] df = df.set_index('Alternative', append=True) - if trace_label: + if trace_spec: tracing.trace_df(df, tracing.extend_trace_label(trace_label, 'pre_process_expressions'), slicer='NONE', transpose=False) @@ -213,14 +213,14 @@ def _mode_choice_spec(mode_choice_spec_df, mode_choice_coeffs, df[col], mode_choice_coeffs[col].to_dict()) - if trace_label: + if trace_spec: tracing.trace_df(df, tracing.extend_trace_label(trace_label, 'evaluate_expression_list'), slicer='NONE', transpose=False) df = expand_alternatives(df) - if trace_label: + if trace_spec: tracing.trace_df(df, tracing.extend_trace_label(trace_label, 'expand_alternatives'), slicer='NONE', transpose=False) diff --git a/activitysim/abm/models/util/tour_frequency.py b/activitysim/abm/models/util/tour_frequency.py index 10709732ee..c611047576 100644 --- a/activitysim/abm/models/util/tour_frequency.py +++ b/activitysim/abm/models/util/tour_frequency.py @@ -28,22 +28,29 @@ def canonical_tours(): # FIXME - this logic is hardwired in process_mandatory_tours() mandatory_tour_flavors = {'work': 2, 'school': 2} - # FIXME - should get this from alts table - atwork_subtour_flavors = {'eat': 1, 'business': 2, 'maint': 1} - tour_flavors = dict(non_mandatory_tour_flavors) tour_flavors.update(mandatory_tour_flavors) - tour_flavors.update(atwork_subtour_flavors) sub_channels = [tour_type + str(tour_num) for tour_type, max_count in tour_flavors.iteritems() for tour_num in range(1, max_count + 1)] + # FIXME - should get this from alts table + # we need to distinguish between subtours of different work tours + # (e.g. eat1_1 is eat subtour for parent work tour 1 and eat1_2 is for work tour 2) + max_work_tours = mandatory_tour_flavors['work'] + atwork_subtour_channels = ['eat1', 'business1', 'business2', 'maint1'] + atwork_subtour_channels = ['%s_%s' % (c, i+1) + for c in atwork_subtour_channels + for i in range(max_work_tours)] + + sub_channels = sub_channels + atwork_subtour_channels + sub_channels.sort() return sub_channels -def set_tour_index(tours, tour_num_col): +def set_tour_index(tours, tour_num_col, parent_tour_num_col=None): """ Parameters @@ -63,18 +70,25 @@ def set_tour_index(tours, tour_num_col): assert tour_num_col in tours.columns - tours['tour_id'] = tours.tour_type + tours[tour_num_col] .map(str) + tours['tour_id'] = tours.tour_type + tours[tour_num_col].map(str) + + if parent_tour_num_col: + # we need to distinguish between subtours of different work tours + # (e.g. eat1_1 is eat subtour for parent work tour 1 and eat1_2 is for work tour 2) + tours['tour_id'] = tours['tour_id'] + '_' + tours[parent_tour_num_col].map(str) # map recognized strings to ints tours.tour_id = tours.tour_id.replace(to_replace=possible_tours, value=range(possible_tours_count)) + # convert to numeric - shouldn't be any NaNs - this will raise error if there are tours.tour_id = pd.to_numeric(tours.tour_id, errors='coerce').astype(int) tours.tour_id = (tours.person_id * possible_tours_count) + tours.tour_id - if len(tours.tour_id) > len(tours.tour_id.unique()): - print "\ntours.tour_id not unique\n", tours + # if tours.tour_id.duplicated().any(): + # print "\ntours.tour_id not unique\n", tours[tours.tour_id.duplicated(keep=False)] + assert not tours.tour_id.duplicated().any() tours.set_index('tour_id', inplace=True, verify_integrity=True) @@ -107,6 +121,12 @@ def process_tours(tour_frequency, tour_frequency_alts, tour_category, parent_col source code - it has an index which is a unique tour identifier, a person_id column, and a tour type column which comes from the column names of the alternatives DataFrame supplied above. + + tours.tour_type - tour type (e.g. school, worl, shopping, eat) + tours.tour_type_num - if there are two 'school' type tours, they will be numbered 1 and 2 + tours.tour_type_count - number of tours of tour_type parent has (parent's max tour_type_num) + tours.tour_num - index of tour (of any type) for parent + tours.tour_count - number of tours of any type) for parent (parent's max tour_num) """ # get the actual alternatives for each person - have to go back to the @@ -156,10 +176,10 @@ def process_tours(tour_frequency, tour_frequency_alts, tour_category, parent_col """ tour_type tour_type_num tour_type_count tour_num tour_count - 0 2588676 alt1 1 2 1 2 - 0 2588676 alt1 2 2 2 2 - 0 2588676 alt1 1 1 1 2 - 0 2588676 alt2 1 1 2 2 + 0 2588676 alt1 1 2 1 4 + 0 2588676 alt1 2 2 2 4 + 0 2588676 alt2 1 1 3 4 + 0 2588676 alt3 1 1 4 4 """ # set these here to ensure consistency across different tour categories @@ -229,13 +249,15 @@ def process_mandatory_tours(persons, mandatory_tour_frequency_alts): tour_id 12413245 827549 school 2 1 2 2 12413244 827549 school 2 2 1 2 - 12413264 827550 work 2 1 2 2 + 12413264 827550 work 1 1 1 2 + 12413265 827550 school 1 1 2 2 ... mandatory non_mandatory tour_category destination True False mandatory 102 True False mandatory 102 True False mandatory 9 + True False mandatory 102 """ return tours @@ -350,11 +372,12 @@ def process_atwork_subtours(work_tours, atwork_subtour_frequency_alts): """ # merge person_id from parent work_tours - work_tours = work_tours[["person_id"]] + work_tours = work_tours[["person_id", "tour_num"]] + work_tours.rename(columns={'tour_num': 'parent_tour_num'}, inplace=True) tours = pd.merge(tours, work_tours, left_on=parent_col, right_index=True) # assign stable (predictable) tour_id - set_tour_index(tours, 'tour_type_num') + set_tour_index(tours, tour_num_col='tour_type_num', parent_tour_num_col='parent_tour_num') """ person_id tour_type tour_type_count tour_type_num tour_num tour_count diff --git a/activitysim/abm/models/util/vectorize_tour_scheduling.py b/activitysim/abm/models/util/vectorize_tour_scheduling.py index fef6836b31..033f384baa 100644 --- a/activitysim/abm/models/util/vectorize_tour_scheduling.py +++ b/activitysim/abm/models/util/vectorize_tour_scheduling.py @@ -10,6 +10,11 @@ from activitysim.core import tracing from activitysim.core import inject from activitysim.core import timetable as tt +from activitysim.core.util import memory_info +from activitysim.core.util import df_size +from activitysim.core.util import force_garbage_collect + +from activitysim.core import chunk logger = logging.getLogger(__name__) @@ -104,11 +109,9 @@ def tdd_interaction_dataset(tours, alts, timetable, choice_column, window_id_col return alt_tdd -def schedule_tours(tours, persons_merged, - alts, spec, constants, - timetable, - previous_tour, window_id_col, - chunk_size, tour_trace_label): +def _schedule_tours( + tours, persons_merged, alts, spec, constants, timetable, + previous_tour, window_id_col, tour_trace_label): """ previous_tour stores values used to add columns that can be used in the spec which have to do with the previous tours per person. Every column in the @@ -141,7 +144,6 @@ def schedule_tours(tours, persons_merged, window_id_col : str column name from tours that identifies 'owner' of this tour (person_id for non/mandatory tours or parent_tout_id for subtours) - chunk_size tour_trace_label Returns @@ -151,8 +153,11 @@ def schedule_tours(tours, persons_merged, logger.info("%s schedule_tours running %d tour choices" % (tour_trace_label, len(tours))) - # timetable can't handle multiple tours per person - assert len(tours.index) == len(np.unique(tours.person_id.values)) + if tours[window_id_col].duplicated().any(): + print "\ntours.person_id not unique\n", tours[tours[window_id_col].duplicated(keep=False)] + + # timetable can't handle multiple tours per window_id + assert not tours[window_id_col].duplicated().any() # merge persons into tours tours = pd.merge(tours, persons_merged, left_on='person_id', right_index=True) @@ -180,7 +185,7 @@ def schedule_tours(tours, persons_merged, spec, choice_column=choice_column, locals_d=locals_d, - chunk_size=chunk_size, + chunk_size=0, trace_label=tour_trace_label ) @@ -188,6 +193,92 @@ def schedule_tours(tours, persons_merged, timetable.assign(tours[window_id_col], choices) + cum_size = chunk.log_df_size(tour_trace_label, "tours", tours, cum_size=None) + cum_size = chunk.log_df_size(tour_trace_label, "alt_tdd", alt_tdd, cum_size) + chunk.log_chunk_size(tour_trace_label, cum_size) + + return choices + + +def calc_rows_per_chunk(chunk_size, tours, persons_merged, alternatives, trace_label=None): + + num_choosers = len(tours.index) + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = tours.shape[1] + sample_size = alternatives.shape[0] + + # persons_merged columns plus 2 previous tour columns + extra_chooser_columns = persons_merged.shape[1] + 2 + + # one column per alternative plus skim and join columns + alt_row_size = alternatives.shape[1] + 2 + + row_size = (chooser_row_size + extra_chooser_columns + alt_row_size) * sample_size + + logger.debug("%s #chunk_calc choosers %s" % (trace_label, tours.shape)) + logger.debug("%s #chunk_calc extra_chooser_columns %s" % (trace_label, extra_chooser_columns)) + logger.debug("%s #chunk_calc alternatives %s" % (trace_label, alternatives.shape)) + logger.debug("%s #chunk_calc alt_row_size %s" % (trace_label, alt_row_size)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + +def schedule_tours( + tours, persons_merged, alts, spec, constants, timetable, + previous_tour, window_id_col, chunk_size, tour_trace_label): + """ + chunking wrapper for _schedule_tours + + While interaction_sample_simulate provides chunking support, the merged tours, persons + dataframe and the tdd_interaction_dataset are very big, so we want to create them inside + the chunking loop to minimize memory footprint. So we implement the chunking loop here, + and pass a chunk_size of 0 to interaction_sample_simulate to disable its chunking support. + + """ + # return _schedule_tours(tours, persons_merged, alts, spec, constants, timetable, + # previous_tour, window_id_col, chunk_size, tour_trace_label) + + logger.info("%s schedule_tours running %d tour choices" % (tour_trace_label, len(tours))) + + # persons_merged columns plus 2 previous tour columns + extra_chooser_columns = persons_merged.shape[1] + 2 + + rows_per_chunk = \ + calc_rows_per_chunk(chunk_size, tours, persons_merged, alts, trace_label=tour_trace_label) + + logger.info("chunk_size %s rows_per_chunk %s" % (chunk_size, rows_per_chunk)) + + result_list = [] + for i, num_chunks, chooser_chunk \ + in chunk.chunked_choosers(tours, rows_per_chunk): + + logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + + chunk_trace_label = tracing.extend_trace_label(tour_trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else tour_trace_label + + choices = _schedule_tours(chooser_chunk, persons_merged, + alts, spec, constants, + timetable, + previous_tour, window_id_col, + tour_trace_label=chunk_trace_label) + + result_list.append(choices) + + force_garbage_collect() + + # FIXME: this will require 2X RAM + # if necessary, could append to hdf5 store on disk: + # http://pandas.pydata.org/pandas-docs/stable/io.html#id2 + if len(result_list) > 1: + choices = pd.concat(result_list) + + assert len(choices.index == len(tours.index)) + return choices @@ -230,8 +321,7 @@ def vectorize_tour_scheduling(tours, persons_merged, alts, spec, DataFrame and the values are the index of the alts DataFrame. """ - if not trace_label: - trace_label = 'vectorize_tour_scheduling' + trace_label = tracing.extend_trace_label(trace_label, 'vectorize_tour_scheduling') assert len(tours.index) > 0 assert 'tour_num' in tours.columns @@ -344,7 +434,7 @@ def vectorize_subtour_scheduling(parent_tours, subtours, persons_merged, alts, s # mask the periods outside parent tour footprint timetable.assign_subtour_mask(parent_tours.tour_id, parent_tours.tdd) - print timetable.windows + # print timetable.windows """ [[7 7 7 0 0 0 0 0 0 0 0 7 7 7 7 7 7 7 7 7 7] [7 0 0 0 0 0 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7] diff --git a/activitysim/abm/models/utility_steps.py b/activitysim/abm/models/utility_steps.py new file mode 100644 index 0000000000..c54a56e13a --- /dev/null +++ b/activitysim/abm/models/utility_steps.py @@ -0,0 +1,119 @@ +# aFreight +# See full license in LICENSE.txt. + +import logging +import os +import pandas as pd + +from activitysim.core import pipeline +from activitysim.core import inject + +from activitysim.core.config import setting + +logger = logging.getLogger(__name__) + + +@inject.step() +def write_data_dictionary(output_dir): + pd.options.display.max_columns = 500 + pd.options.display.max_rows = 100 + + output_tables = pipeline.checkpointed_tables() + + records = [] + + # write data dictionary for all checkpointed_tables + with open(os.path.join(output_dir, 'data_dict.txt'), 'w') as file: + for table_name in output_tables: + df = inject.get_table(table_name, None).to_frame() + + print >> file, "\n### %s %s" % (table_name, df.shape) + print >> file, df.dtypes + + rows, columns = df.shape + bytes = df.memory_usage(index=True).sum() + records.append((table_name, rows, columns, bytes)) + + df = pd.DataFrame.from_records(records, columns=['table_name', 'rows', 'columns', 'bytes']) + df.sort_values(by='table_name', inplace=True) + df.to_csv(os.path.join(output_dir, 'data_dict.csv')) + + +@inject.step() +def write_tables(output_dir): + """ + Write pipeline tables as csv files (in output directory) as specified by output_tables list + in settings file. + + 'output_tables' can specify either a list of output tables to include or to skip + if no output_tables list is specified, then no checkpointed tables will be written + + To write all output tables EXCEPT the households and persons tables: + + :: + + output_tables: + action: skip + tables: + - households + - persons + + To write ONLY the households table: + + :: + + output_tables: + action: include + tables: + - households + + Parameters + ---------- + output_dir: str + + """ + + output_tables_settings_name = 'output_tables' + + output_tables_settings = setting(output_tables_settings_name) + + output_tables_list = pipeline.checkpointed_tables() + + if output_tables_settings is None: + logger.info("No output_tables specified in settings file. Nothing to write.") + return + + action = output_tables_settings.get('action') + tables = output_tables_settings.get('tables') + prefix = output_tables_settings.get('prefix', 'final_') + + if action not in ['include', 'skip']: + raise "expected %s action '%s' to be either 'include' or 'skip'" % \ + (output_tables_settings_name, action) + + if action == 'include': + output_tables_list = tables + elif action == 'skip': + output_tables_list = [t for t in output_tables_list if t not in tables] + + # should provide option to also write checkpoints? + # output_tables_list.append("checkpoints.csv") + + for table_name in output_tables_list: + table = inject.get_table(table_name, None) + + if table is None: + logger.warn("Skipping '%s': Table not found." % table_name) + continue + + df = table.to_frame() + file_name = "%s%s.csv" % (prefix, table_name) + logger.info("writing output file %s" % file_name) + file_path = os.path.join(output_dir, file_name) + write_index = df.index.name is not None + df.to_csv(file_path, index=write_index) + + if (action == 'include') == ('checkpoints' in tables): + # write checkpoints + file_name = "%s%s.csv" % (prefix, 'checkpoints') + pipeline.get_checkpoints().to_csv(os.path.join(output_dir, file_name)) diff --git a/activitysim/abm/models/workplace_location.py b/activitysim/abm/models/workplace_location.py index 7ddadd899d..eb03d82ab5 100644 --- a/activitysim/abm/models/workplace_location.py +++ b/activitysim/abm/models/workplace_location.py @@ -148,7 +148,7 @@ def workplace_location_logsums(persons_merged, persons_merged = persons_merged.to_frame() workplace_location_sample = workplace_location_sample.to_frame() - logger.info("Running workplace_location_sample with %s rows" % len(workplace_location_sample)) + logger.info("Running workplace_location_logsums with %s rows" % len(workplace_location_sample)) # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = workplace_location_settings['LOGSUM_CHOOSER_COLUMNS'] @@ -200,6 +200,8 @@ def workplace_location_simulate(persons_merged, to select a work_taz from sample alternatives """ + trace_label = 'workplace_location_simulate' + # for now I'm going to generate a workplace location for everyone - # presumably it will not get used in downstream models for everyone - # it should depend on CDAP and mandatory tour generation as to whether @@ -250,7 +252,7 @@ def workplace_location_simulate(persons_merged, skims=skims, locals_d=locals_d, chunk_size=chunk_size, - trace_label=trace_hh_id and 'workplace_location', + trace_label=trace_label, trace_choice_name='workplace_location') # FIXME - no need to reindex since we didn't slice choosers @@ -262,6 +264,8 @@ def workplace_location_simulate(persons_merged, pipeline.add_dependent_columns("persons", "persons_workplace") + pipeline.drop_table('workplace_location_sample') + if trace_hh_id: trace_columns = ['workplace_taz'] + inject.get_table('persons_workplace').columns tracing.trace_df(inject.get_table('persons_merged').to_frame(), diff --git a/activitysim/abm/tables/households.py b/activitysim/abm/tables/households.py index af20c49750..c17b6d11d2 100644 --- a/activitysim/abm/tables/households.py +++ b/activitysim/abm/tables/households.py @@ -26,7 +26,9 @@ def households(store, households_sample_size, trace_hh_id): df = tracing.slice_ids(df_full, trace_hh_id) # if we need sample a subset of full store - elif households_sample_size > 0 and len(df_full.index) > households_sample_size: + elif households_sample_size > 0 and df_full.shape[0] > households_sample_size: + + logger.info("sampling %s of %s households" % (households_sample_size, df_full.shape[0])) # take the requested random sample df = asim.random_rows(df_full, households_sample_size) diff --git a/activitysim/abm/tables/size_terms.py b/activitysim/abm/tables/size_terms.py index bbfb4f83cf..292379cf8d 100644 --- a/activitysim/abm/tables/size_terms.py +++ b/activitysim/abm/tables/size_terms.py @@ -4,6 +4,7 @@ import os import logging +import numpy as np import pandas as pd from activitysim.core import inject @@ -61,4 +62,12 @@ def destination_size_terms(land_use, size_terms): df = pd.DataFrame({key: size_term(land_use, row) for key, row in size_terms.iterrows()}, index=land_use.index) df.index.name = "TAZ" + + if not (df.dtypes == 'float64').all(): + logger.warn('Surprised to find that not all size_terms were float64!') + + # - #NARROW + # float16 has 3.3 decimal digits of precision, float32 has 7.2 + df = df.astype(np.float16, errors='raise') + return df diff --git a/activitysim/abm/test/configs/settings.yaml b/activitysim/abm/test/configs/settings.yaml index 9c65379168..385363a010 100644 --- a/activitysim/abm/test/configs/settings.yaml +++ b/activitysim/abm/test/configs/settings.yaml @@ -8,6 +8,12 @@ rural_threshold: 6 households_sample_size: 100 +# trace household id; comment out for no trace +trace_hh_id: 961042 + +# trace origin, destination in accessibility calculation +#trace_od: [5, 11] + grade_school_max_age: 14 county_map: diff --git a/activitysim/abm/test/output/.gitignore b/activitysim/abm/test/output/.gitignore index fe43411af0..5c191ce353 100644 --- a/activitysim/abm/test/output/.gitignore +++ b/activitysim/abm/test/output/.gitignore @@ -1,3 +1,4 @@ *.csv *.log *.h5 +*.txt diff --git a/activitysim/abm/test/output/data_dict.txt b/activitysim/abm/test/output/data_dict.txt new file mode 100644 index 0000000000..a677166fa1 --- /dev/null +++ b/activitysim/abm/test/output/data_dict.txt @@ -0,0 +1,310 @@ + +### accessibility (25, 20) +AUTOPEAKRETAIL float64 +AUTOPEAKTOTAL float64 +AUTOOFFPEAKRETAIL float64 +AUTOOFFPEAKTOTAL float64 +TRANSITPEAKRETAIL float64 +TRANSITPEAKTOTAL float64 +TRANSITOFFPEAKRETAIL float64 +TRANSITOFFPEAKTOTAL float64 +NONMOTORIZEDRETAIL float64 +NONMOTORIZEDTOTAL float64 +trPkTotal float64 +auOpTotal float64 +auOpRetail float64 +nmRetail float64 +trPkRetail float64 +trOpRetail float64 +trOpTotal float64 +nmTotal float64 +auPkRetail float64 +auPkTotal float64 +dtype: object + +### person_windows (170, 21) +4 int64 +5 int64 +6 int64 +7 int64 +8 int64 +9 int64 +10 int64 +11 int64 +12 int64 +13 int64 +14 int64 +15 int64 +16 int64 +17 int64 +18 int64 +19 int64 +20 int64 +21 int64 +22 int64 +23 int64 +24 int64 +dtype: object + +### households (110, 68) +HHID +TAZ int64 +SERIALNO int64 +PUMA5 int64 +income int64 +PERSONS int64 +HHT int64 +UNITTYPE int64 +NOC int64 +BLDGSZ int64 +TENURE int64 +VEHICL int64 +hinccat1 int64 +hinccat2 int64 +hhagecat int64 +hsizecat int64 +hfamily int64 +hunittype int64 +hNOCcat int64 +hwrkrcat int64 +h0004 int64 +h0511 int64 +h1215 int64 +h1617 int64 +h1824 int64 +h2534 int64 +h3549 int64 +h5064 int64 +h6579 int64 +h80up int64 +workers int64 +hwork_f int64 +hwork_p int64 +huniv int64 +hnwork int64 +hretire int64 +hpresch int64 +hschpred int64 +hschdriv int64 +htypdwel int64 +hownrent int64 +hadnwst int64 +hadwpst int64 +hadkids int64 +bucketBin int64 +originalPUMA int64 +hmultiunit int64 +income_segment int64 +non_workers int64 +family bool +num_young_adults float64 +household_type object +home_is_rural bool +drivers float64 +home_taz int64 +work_tour_auto_time_savings int64 +non_family bool +hhsize int64 +chunk_id int64 +num_children float64 +num_adolescents float64 +income_in_thousands float64 +num_young_children float64 +home_is_urban bool +num_college_age float64 +auto_ownership int64 +no_cars bool +car_sufficiency int64 +num_under16_not_at_school int64 +dtype: object + +### persons (170, 73) +PERID +household_id int64 +age int64 +RELATE int64 +ESR int64 +GRADE int64 +PNUM int64 +PAUG int64 +DDP int64 +sex int64 +WEEKS int64 +HOURS int64 +MSP int64 +POVERTY int64 +EARNS int64 +pagecat int64 +pemploy int64 +pstudent int64 +ptype int64 +padkid int64 +has_preschool_kid bool +student_cat object +num_eat_j int64 +has_driving_kid bool +female bool +has_non_worker bool +home_taz int64 +is_university bool +max_window int64 +employed_cat object +student_is_employed bool +is_gradeschool bool +has_retiree bool +num_main_j int64 +num_joint_tours int64 +has_full_time bool +ptype_cat object +is_student bool +has_school_kid bool +is_worker bool +age_16_p bool +has_university bool +adult bool +nonstudent_to_school bool +age_16_to_19 bool +is_highschool bool +num_shop_j int64 +num_visi_j int64 +num_disc_j int64 +has_part_time bool +male bool +school_taz int64 +distance_to_school float64 +roundtrip_auto_time_to_school float64 +roundtrip_auto_time_to_work float64 +distance_to_work float64 +workplace_in_cbd bool +work_taz_area_type int64 +workplace_taz int64 +under16_not_at_school bool +cdap_rank int64 +cdap_activity object +has_preschool_kid_at_home bool +has_school_kid_at_home bool +work_and_school_and_student bool +num_mand float64 +work_and_school_and_worker bool +num_work_tours float64 +mandatory_tour_frequency object +num_eatout_tours float64 +num_non_mand float64 +num_escort_tours float64 +non_mandatory_tour_frequency float64 +num_non_escort_tours float64 +dtype: object + +### tours (194, 45) +tour_id +person_id float64 +tour_type object +tour_type_count float64 +tour_type_num float64 +tour_num float64 +tour_count float64 +mandatory object +non_mandatory object +tour_category object +destination float64 +destination_in_cbd float64 +start float64 +end float64 +duration float64 +tdd float64 +daily_parking_cost float64 +dest_density_index float64 +dest_topology float64 +destination_walk_time float64 +drive_commuter_available float64 +drive_express_available float64 +drive_heavyrail_available float64 +drive_local_available float64 +drive_lrf_available float64 +hov2_available float64 +hov2toll_available float64 +hov3_available float64 +in_period object +is_joint float64 +number_of_participants float64 +origin_walk_time float64 +out_period object +sov_available float64 +sovtoll_available float64 +terminal_time float64 +walk_commuter_available float64 +walk_express_available float64 +walk_heavyrail_available float64 +walk_local_available float64 +walk_lrf_available float64 +work_tour_is_drive float64 +mode object +atwork_subtour_frequency object +parent_tour_id float64 +parent_tour_num float64 +dtype: object + +### land_use (25, 49) +DISTRICT int64 +SD int64 +COUNTY int64 +TOTHH int64 +HHPOP int64 +TOTPOP int64 +EMPRES int64 +SFDU int64 +MFDU int64 +HHINCQ1 int64 +HHINCQ2 int64 +HHINCQ3 int64 +HHINCQ4 int64 +TOTACRE float64 +RESACRE int64 +CIACRE int64 +SHPOP62P float64 +TOTEMP int64 +AGE0004 int64 +AGE0519 int64 +AGE2044 int64 +AGE4564 int64 +AGE65P int64 +RETEMPN int64 +FPSEMPN int64 +HEREMPN int64 +OTHEMPN int64 +AGREMPN int64 +MWTEMPN int64 +PRKCST float64 +OPRKCST float64 +area_type int64 +HSENROLL float64 +COLLFTE float64 +COLLPTE float64 +TOPOLOGY int64 +TERMINAL float64 +ZERO int64 +hhlds int64 +sftaz int64 +gqpop int64 +employment_density float64 +total_acres float64 +county_id int64 +density_index float64 +county_name object +household_density float64 +total_households int64 +total_employment int64 +dtype: object + +### trips (368, 9) +trip_id +tour_id int64 +INBOUND bool +trip_num int64 +OTAZ int64 +DTAZ float64 +start_trip float64 +end_trip float64 +start_period object +trip_mode object +dtype: object diff --git a/activitysim/abm/test/test_pipeline.py b/activitysim/abm/test/test_pipeline.py index 70eb97f7db..541fc72eaa 100644 --- a/activitysim/abm/test/test_pipeline.py +++ b/activitysim/abm/test/test_pipeline.py @@ -177,7 +177,7 @@ def test_mini_pipeline_run(): # try to get a non-existant table with pytest.raises(RuntimeError) as excinfo: pipeline.get_table("bogus") - assert "not in checkpointed tables" in str(excinfo.value) + assert "never checkpointed" in str(excinfo.value) # try to get an existing table from a non-existant checkpoint with pytest.raises(RuntimeError) as excinfo: @@ -318,8 +318,8 @@ def get_trace_csv(file_name): EXPECT_PERSON_IDS = ['1888694', '1888695', '1888696'] EXPECT_TOUR_TYPES = ['work', 'school', 'othdiscr'] -EXPECT_MODES = ['DRIVE_LOC', 'DRIVE_LOC', 'DRIVE_LOC'] -EXPECT_TOUR_COUNT = 173 +EXPECT_MODES = ['DRIVE_LOC', 'DRIVE_LOC', 'DRIVEALONEPAY'] +EXPECT_TOUR_COUNT = 177 def test_full_run1(): @@ -375,7 +375,7 @@ def test_full_run_with_chunks(): tour_count = full_run(trace_hh_id=HH_ID, households_sample_size=HOUSEHOLDS_SAMPLE_SIZE, - chunk_size=10000) + chunk_size=500000) assert(tour_count == EXPECT_TOUR_COUNT) @@ -405,3 +405,10 @@ def test_full_run_stability(): assert (mode_df.person_id.values == EXPECT_PERSON_IDS).any() assert (mode_df.tour_type.values == EXPECT_TOUR_TYPES).any() assert (mode_df['mode'].values == EXPECT_MODES).any() + + +# if __name__ == "__main__": +# +# print "\n\ntest_mini_pipeline_run" +# test_mini_pipeline_run() +# teardown_function(None) diff --git a/activitysim/core/chunk.py b/activitysim/core/chunk.py new file mode 100644 index 0000000000..5338b6844a --- /dev/null +++ b/activitysim/core/chunk.py @@ -0,0 +1,187 @@ +# ActivitySim +# See full license in LICENSE.txt. + +from math import ceil +import os +import logging + +import numpy as np +import pandas as pd + +from .skim import SkimDictWrapper, SkimStackWrapper +from . import logit +from . import tracing +from . import pipeline +from . import util + +logger = logging.getLogger(__name__) + + +def log_df_size(trace_label, table_name, df, cum_size): + + if isinstance(df, pd.Series): + elements = df.shape[0] + bytes = df.memory_usage(index=True) + elif isinstance(df, pd.DataFrame): + elements = df.shape[0] * df.shape[1] + bytes = df.memory_usage(index=True).sum() + else: + assert False + + logger.debug("%s #chunk log_df_size %s %s %s %s" % + (trace_label, table_name, df.shape, elements, util.GB(bytes))) + + if cum_size: + elements += cum_size[0] + bytes += cum_size[1] + + return elements, bytes + + +def log_chunk_size(trace_label, cum): + + elements = cum[0] + bytes = cum[1] + + logger.debug("%s #chunk CUM %s %s" % (trace_label, elements, util.GB(bytes))) + logger.debug("%s %s" % (trace_label, util.memory_info())) + + +def rows_per_chunk(chunk_size, row_size, num_choosers, trace_label): + + # closest number of chooser rows to achieve chunk_size + rpc = int(round(chunk_size / float(row_size))) + rpc = max(rpc, 1) + rpc = min(rpc, num_choosers) + + chunks = int(ceil(num_choosers / float(rpc))) + effective_chunk_size = row_size * rpc + + logger.debug("%s #chunk_calc chunk_size %s" % (trace_label, chunk_size)) + logger.debug("%s #chunk_calc num_choosers %s" % (trace_label, num_choosers)) + + logger.debug("%s #chunk_calc total row_size %s" % (trace_label, row_size)) + logger.debug("%s #chunk_calc rows_per_chunk %s" % (trace_label, rpc)) + logger.debug("%s #chunk_calc effective_chunk_size %s" % (trace_label, effective_chunk_size)) + logger.debug("%s #chunk_calc chunks %s" % (trace_label, chunks)) + + return rpc + + +def chunked_choosers(choosers, rows_per_chunk): + # generator to iterate over choosers in chunk_size chunks + num_choosers = len(choosers.index) + num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) + + i = offset = 0 + while offset < num_choosers: + yield i+1, num_chunks, choosers.iloc[offset: offset+rows_per_chunk] + offset += rows_per_chunk + i += 1 + + +def chunked_choosers_and_alts(choosers, alternatives, rows_per_chunk): + """ + generator to iterate over choosers and alternatives in chunk_size chunks + + like chunked_choosers, but also chunks alternatives + for use with sampled alternatives which will have different alternatives (and numbers of alts) + + There may be up to sample_size (or as few as one) alternatives for each chooser + because alternatives may have been sampled more than once, but pick_count for those + alternatives will always sum to sample_size. + + When we chunk the choosers, we need to take care chunking the alternatives as there are + varying numbers of them for each chooser. Since alternatives appear in the same order + as choosers, we can use cumulative pick_counts to identify boundaries of sets of alternatives + + Parameters + ---------- + choosers + alternatives : pandas DataFrame + sample alternatives including pick_count column in same order as choosers + rows_per_chunk : int + + Yields + ------- + i : int + one-based index of current chunk + num_chunks : int + total number of chunks that will be yielded + choosers : pandas DataFrame slice + chunk of choosers + alternatives : pandas DataFrame slice + chunk of alternatives for chooser chunk + """ + + assert 'pick_count' in alternatives.columns or choosers.index.name == alternatives.index.name + + num_choosers = len(choosers.index) + num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) + + if choosers.index.name == alternatives.index.name: + assert choosers.index.name == alternatives.index.name + + # alt chunks boundaries are where index changes + alt_ids = alternatives.index.values + alt_chunk_end = np.where(alt_ids[:-1] != alt_ids[1:])[0] + 1 + alt_chunk_end = np.append([0], alt_chunk_end) # including the first... + alt_chunk_end = alt_chunk_end[rows_per_chunk::rows_per_chunk] + + else: + # used to do it this way for school and workplace (which are sampled based on prob) + # since the utility expressions need to know pick_count for sample correction + # but for now the assumption that choosers and alternatives share indexes is more general + # leaving this (previously correct) code here for now in case that changes... + assert False + + # assert 'pick_count' in alternatives.columns + # assert 'cum_pick_count' not in alternatives.columns + # alternatives['cum_pick_count'] = alternatives['pick_count'].cumsum() + # + # # currently no convenient way to remember sample_size across steps + # pick_count = alternatives.cum_pick_count.iat[-1] + # sample_size = pick_count / len(choosers.index) + # assert pick_count % sample_size == 0 + # + # alt_chunk_size = rows_per_chunk * sample_size + # + # # array of indices of starts of alt chunks + # alt_chunk_end = np.where(alternatives['cum_pick_count'] % alt_chunk_size == 0)[0] + 1 + + # add index to end of array to capture any final partial chunk + alt_chunk_end = np.append(alt_chunk_end, [len(alternatives.index)]) + + i = offset = alt_offset = 0 + while offset < num_choosers: + + alt_end = alt_chunk_end[i] + + chooser_chunk = choosers[offset: offset + rows_per_chunk] + alternative_chunk = alternatives[alt_offset: alt_end] + + assert len(chooser_chunk.index) == len(np.unique(alternative_chunk.index.values)) + + yield i+1, num_chunks, chooser_chunk, alternative_chunk + + i += 1 + offset += rows_per_chunk + alt_offset = alt_end + + +def hh_chunked_choosers(choosers, rows_per_chunk): + # generator to iterate over choosers in chunk_size chunks + # like chunked_choosers but based on chunk_id field rather than dataframe length + # (the presumption is that choosers has multiple rows with the same chunk_id that + # all have to be included in the same chunk) + # FIXME - we pathologically know name of chunk_id col in households table + + num_choosers = choosers['chunk_id'].max() + 1 + num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) + + i = offset = 0 + while offset < num_choosers: + chooser_chunk = choosers[choosers['chunk_id'].between(offset, offset + rows_per_chunk - 1)] + yield i+1, num_chunks, chooser_chunk + offset += rows_per_chunk + i += 1 diff --git a/activitysim/core/inject.py b/activitysim/core/inject.py index 0ccc482243..5473c87c08 100644 --- a/activitysim/core/inject.py +++ b/activitysim/core/inject.py @@ -22,7 +22,8 @@ def decorator(func): logger.debug("inject step %s" % name) - assert not _DECORATED_STEPS.get(name, False) + assert not _DECORATED_STEPS.get(name, False), \ + "step '%s' already decorated." % name _DECORATED_STEPS[name] = func orca.add_step(name, func) @@ -38,7 +39,8 @@ def decorator(func): logger.debug("inject table %s" % name) - assert not _DECORATED_TABLES.get(name, False) + assert not _DECORATED_TABLES.get(name, False), \ + "table '%s' already decorated." % name _DECORATED_TABLES[name] = func orca.add_table(name, func) @@ -56,7 +58,8 @@ def decorator(func): column_key = (table_name, name) - assert not _DECORATED_COLUMNS.get(column_key, False) + assert not _DECORATED_COLUMNS.get(column_key, False), \ + "column '%s' already decorated." % name _DECORATED_COLUMNS[column_key] = {'func': func, 'cache': cache} orca.add_column(table_name, name, func, cache=cache) @@ -88,6 +91,10 @@ def merge_tables(target, tables, columns=None): def add_table(table_name, table, cache=False): + + if orca.is_table(table_name): + logger.warn("inject add_table replacing existing table %s" % table_name) + return orca.add_table(table_name, table, cache=cache) diff --git a/activitysim/core/interaction_sample.py b/activitysim/core/interaction_sample.py index 0ec3c95d4b..9b6e0ac557 100644 --- a/activitysim/core/interaction_sample.py +++ b/activitysim/core/interaction_sample.py @@ -3,16 +3,17 @@ import logging +from math import ceil import numpy as np import pandas as pd -from activitysim.core.util import quick_loc_series +from activitysim.core.util import force_garbage_collect from . import logit from . import tracing +from . import chunk from .simulate import add_skims -from .simulate import chunked_choosers -from .simulate import num_chunk_rows_for_chunk_size + from .interaction_simulate import eval_interaction_utilities import pipeline @@ -189,7 +190,6 @@ def _interaction_sample( number of duplicate picks for chooser, alt """ - trace_label = tracing.extend_trace_label(trace_label, 'interaction_simulate') have_trace_targets = trace_label and tracing.has_trace_targets(choosers) assert len(choosers.index) > 0 @@ -205,9 +205,6 @@ def _interaction_sample( if len(spec.columns) > 1: raise RuntimeError('spec must have only one column') - alternative_count = len(alternatives) - logger.debug("_interaction_sample alternative_count %s" % alternative_count) - # if using skims, copy index into the dataframe, so it will be # available as the "destination" for the skims dereference below if skims: @@ -216,7 +213,11 @@ def _interaction_sample( # cross join choosers and alternatives (cartesian product) # for every chooser, there will be a row for each alternative # index values (non-unique) are from alternatives df - interaction_df = logit.interaction_dataset(choosers, alternatives, alternative_count) + alternative_count = alternatives.shape[0] + interaction_df = \ + logit.interaction_dataset(choosers, alternatives, sample_size=alternative_count) + + cum_size = chunk.log_df_size(trace_label, 'interaction_df', interaction_df, cum_size=None) assert alternative_count == len(interaction_df.index) / len(choosers.index) @@ -241,6 +242,10 @@ def _interaction_sample( interaction_utilities, trace_eval_results \ = eval_interaction_utilities(spec, interaction_df, locals_d, trace_label, trace_rows) + # interaction_utilities is a df with one utility column and one row per interaction_df row + + cum_size = chunk.log_df_size(trace_label, 'interaction_utils', interaction_utilities, cum_size) + if have_trace_targets: tracing.trace_interaction_eval_results(trace_eval_results, trace_ids, tracing.extend_trace_label(trace_label, 'eval')) @@ -258,6 +263,8 @@ def _interaction_sample( interaction_utilities.as_matrix().reshape(len(choosers), alternative_count), index=choosers.index) + cum_size = chunk.log_df_size(trace_label, 'utilities', utilities, cum_size) + if have_trace_targets: tracing.trace_df(utilities, tracing.extend_trace_label(trace_label, 'utilities'), column_labels=['alternative', 'utility']) @@ -269,6 +276,8 @@ def _interaction_sample( # probs is same shape as utilities, one row per chooser and one column for alternative probs = logit.utils_to_probs(utilities, trace_label=trace_label, trace_choosers=choosers) + cum_size = chunk.log_df_size(trace_label, 'probs', probs, cum_size) + if have_trace_targets: tracing.trace_df(probs, tracing.extend_trace_label(trace_label, 'probs'), column_labels=['alternative', 'probability']) @@ -307,9 +316,49 @@ def _interaction_sample( transpose=False, column_labels=['sample_alt', 'alternative']) + # don't need this after tracing + del choices_df['rand'] + + # - #NARROW + choices_df['prob'] = choices_df['prob'].astype(np.float32) + assert choices_df['pick_count'].max() < 4294967295 + choices_df['pick_count'] = choices_df['pick_count'].astype(np.uint32) + + chunk.log_chunk_size(trace_label, cum_size) + return choices_df +def calc_rows_per_chunk(chunk_size, choosers, alternatives, trace_label): + + num_choosers = choosers.shape[0] + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + # all columns from choosers + chooser_row_size = choosers.shape[1] + + # interaction_df has one column per alternative plus a skim column and a join column + alt_row_size = alternatives.shape[1] + 2 + + # interaction_utilities + alt_row_size += 1 + + # interaction_df includes all alternatives and is only afterwards sampled + row_size = (chooser_row_size + alt_row_size) * alternatives.shape[0] + + # utilities and probs have one row per chooser and one column per alternative row + row_size += 2 * alternatives.shape[0] + + logger.debug("%s #chunk_calc choosers %s" % (trace_label, choosers.shape)) + logger.debug("%s #chunk_calc alternatives %s" % (trace_label, alternatives.shape)) + logger.debug("%s #chunk_calc alt_row_size %s" % (trace_label, alt_row_size)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def interaction_sample( choosers, alternatives, spec, sample_size, alt_col_name=None, @@ -365,25 +414,32 @@ def interaction_sample( choices are simulated in the standard Monte Carlo fashion """ + trace_label = tracing.extend_trace_label(trace_label, 'interaction_sample') + assert sample_size > 0 sample_size = min(sample_size, len(alternatives.index)) - rows_per_chunk = num_chunk_rows_for_chunk_size(chunk_size, choosers, alternatives) - - logger.info("interaction_simulate chunk_size %s num_choosers %s rows_per_chunk %s" % - (chunk_size, len(choosers.index), rows_per_chunk)) + rows_per_chunk = \ + calc_rows_per_chunk(chunk_size, choosers, alternatives, trace_label) + logger.info("interaction_sample chunk_size %s num_choosers %s rows_per_chunk %s" % + (chunk_size, choosers.shape[0], rows_per_chunk)) result_list = [] - for i, num_chunks, chooser_chunk in chunked_choosers(choosers, rows_per_chunk): + for i, num_chunks, chooser_chunk in chunk.chunked_choosers(choosers, rows_per_chunk): logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + chunk_trace_label = tracing.extend_trace_label(trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else trace_label + choices = _interaction_sample(chooser_chunk, alternatives, spec, sample_size, alt_col_name, skims, locals_d, - tracing.extend_trace_label(trace_label, 'chunk_%s' % i)) + chunk_trace_label) result_list.append(choices) + force_garbage_collect() + # FIXME: this will require 2X RAM # if necessary, could append to hdf5 store on disk: # http://pandas.pydata.org/pandas-docs/stable/io.html#id2 diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index d7eb31a1ed..137a40c2e7 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -8,10 +8,10 @@ from . import logit from . import tracing +from . import chunk from .simulate import add_skims -from .simulate import chunked_choosers_and_alts -from .simulate import num_chunk_rows_for_chunk_size +from activitysim.core.util import force_garbage_collect from .interaction_simulate import eval_interaction_utilities logger = logging.getLogger(__name__) @@ -73,8 +73,6 @@ def _interaction_sample_simulate( assert len(choosers.index) == len(np.unique(alternatives.index.values)) - trace_label = tracing.extend_trace_label(trace_label, 'interaction_simulate') - have_trace_targets = trace_label and tracing.has_trace_targets(choosers) if have_trace_targets: @@ -216,9 +214,43 @@ def _interaction_sample_simulate( tracing.trace_df(rands, tracing.extend_trace_label(trace_label, 'rands'), columns=[None, 'rand']) + cum_size = chunk.log_df_size(trace_label, 'interaction_df', interaction_df, cum_size=None) + cum_size = chunk.log_df_size(trace_label, 'interaction_utils', interaction_utilities, cum_size) + + chunk.log_chunk_size(trace_label, cum_size) + return choices +def calc_rows_per_chunk(chunk_size, choosers, alt_sample, spec, trace_label=None): + + # It is hard to estimate the size of the utilities_df since it conflates duplicate picks. + # Currently we ignore it, but maybe we should chunk based on worst case? + + num_choosers = len(choosers.index) + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = len(choosers.columns) + + # one column per alternative plus skims and interaction_utilities + alt_row_size = alt_sample.shape[1] + 2 + # average sample size + sample_size = alt_sample.shape[0] / float(num_choosers) + + row_size = (chooser_row_size + alt_row_size) * sample_size + + logger.debug("%s #chunk_calc spec %s" % (trace_label, spec.shape)) + logger.debug("%s #chunk_calc chooser_row_size %s" % (trace_label, chooser_row_size)) + logger.debug("%s #chunk_calc sample_size %s" % (trace_label, sample_size)) + logger.debug("%s #chunk_calc alt_row_size %s" % (trace_label, alt_row_size)) + logger.debug("%s #chunk_calc alt_sample %s" % (trace_label, alt_sample.shape)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def interaction_sample_simulate( choosers, alternatives, spec, choice_column=None, skims=None, locals_d=None, chunk_size=0, @@ -270,24 +302,32 @@ def interaction_sample_simulate( choices are simulated in the standard Monte Carlo fashion """ - rows_per_chunk = num_chunk_rows_for_chunk_size(chunk_size, choosers, alternatives) + trace_label = tracing.extend_trace_label(trace_label, 'interaction_sample_simulate') + + rows_per_chunk = \ + calc_rows_per_chunk(chunk_size, choosers, alternatives, spec=spec, trace_label=trace_label) - logger.info("interaction_simulate chunk_size %s num_choosers %s" + logger.info("interaction_sample_simulate chunk_size %s num_choosers %s" % (chunk_size, len(choosers.index))) result_list = [] for i, num_chunks, chooser_chunk, alternative_chunk \ - in chunked_choosers_and_alts(choosers, alternatives, rows_per_chunk): + in chunk.chunked_choosers_and_alts(choosers, alternatives, rows_per_chunk): logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + chunk_trace_label = tracing.extend_trace_label(trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else trace_label + choices = _interaction_sample_simulate( chooser_chunk, alternative_chunk, spec, choice_column, skims, locals_d, - tracing.extend_trace_label(trace_label, 'chunk_%s' % i), trace_choice_name) + chunk_trace_label, trace_choice_name) result_list.append(choices) + force_garbage_collect() + # FIXME: this will require 2X RAM # if necessary, could append to hdf5 store on disk: # http://pandas.pydata.org/pandas-docs/stable/io.html#id2 diff --git a/activitysim/core/interaction_simulate.py b/activitysim/core/interaction_simulate.py index 5e2b37fbd2..c0dc73e2f4 100644 --- a/activitysim/core/interaction_simulate.py +++ b/activitysim/core/interaction_simulate.py @@ -9,8 +9,10 @@ from . import logit from . import tracing from .simulate import add_skims -from .simulate import chunked_choosers -from .simulate import num_chunk_rows_for_chunk_size +from . import chunk + +from activitysim.core.util import force_garbage_collect + logger = logging.getLogger(__name__) @@ -215,6 +217,8 @@ def _interaction_simulate( if skims: add_skims(interaction_df, skims) + cum_size = chunk.log_df_size(trace_label, 'interaction_df', interaction_df, cum_size=None) + # evaluate expressions from the spec multiply by coefficients and sum # spec is df with one row per spec expression and one col with utility coefficient # column names of model_design match spec index values @@ -284,9 +288,39 @@ def _interaction_simulate( tracing.trace_df(rands, tracing.extend_trace_label(trace_label, 'rands'), columns=[None, 'rand']) + chunk.log_chunk_size(trace_label, cum_size) + return choices +def calc_rows_per_chunk(chunk_size, choosers, alternatives, sample_size, skims, trace_label=None): + + num_choosers = len(choosers.index) + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = len(choosers.columns) + + # alternative columns plus join column + alt_row_size = alternatives.shape[1] + 1 + + if skims is not None: + alt_row_size += 1 + + sample_size = sample_size or alternatives.shape[0] + row_size = (chooser_row_size + alt_row_size) * sample_size + + logger.debug("%s #chunk_calc choosers %s" % (trace_label, choosers.shape)) + logger.debug("%s #chunk_calc alternatives %s" % (trace_label, alternatives.shape)) + logger.debug("%s #chunk_calc chooser_row_size %s" % (trace_label, chooser_row_size)) + logger.debug("%s #chunk_calc sample_size %s" % (trace_label, sample_size)) + logger.debug("%s #chunk_calc alt_row_size %s" % (trace_label, alt_row_size)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def interaction_simulate( choosers, alternatives, spec, skims=None, locals_d=None, sample_size=None, chunk_size=0, @@ -341,25 +375,35 @@ def interaction_simulate( choices are simulated in the standard Monte Carlo fashion """ + trace_label = tracing.extend_trace_label(trace_label, 'interaction_simulate') + assert len(choosers) > 0 - rows_per_chunk = num_chunk_rows_for_chunk_size(chunk_size, choosers, alternatives) + rows_per_chunk = \ + calc_rows_per_chunk(chunk_size, choosers, alternatives=alternatives, + sample_size=sample_size, skims=skims, + trace_label=trace_label) logger.info("interaction_simulate chunk_size %s num_choosers %s" % (chunk_size, len(choosers.index))) result_list = [] - for i, num_chunks, chooser_chunk in chunked_choosers(choosers, rows_per_chunk): + for i, num_chunks, chooser_chunk in chunk.chunked_choosers(choosers, rows_per_chunk): logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + chunk_trace_label = tracing.extend_trace_label(trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else trace_label + choices = _interaction_simulate(chooser_chunk, alternatives, spec, skims, locals_d, sample_size, - tracing.extend_trace_label(trace_label, 'chunk_%s' % i), + chunk_trace_label, trace_choice_name) result_list.append(choices) + force_garbage_collect() + # FIXME: this will require 2X RAM # if necessary, could append to hdf5 store on disk: # http://pandas.pydata.org/pandas-docs/stable/io.html#id2 diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index f16639caa8..2d7486e5f5 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -200,6 +200,9 @@ def interaction_dataset(choosers, alternatives, sample_size=None): Combine choosers and alternatives into one table for the purposes of creating interaction variables and/or sampling alternatives. + Any duplicate column names in alternatives table will be renamed with an '_r' suffix. + (e.g. TAZ field in alternatives will appear as TAZ_r so that it can be targeted in a skim) + Parameters ---------- choosers : pandas.DataFrame @@ -240,9 +243,32 @@ def interaction_dataset(choosers, alternatives, sample_size=None): alts_sample = alternatives.take(sample).copy() alts_sample['chooser_idx'] = np.repeat(choosers.index.values, sample_size) - alts_sample = pd.merge( - alts_sample, choosers, left_on='chooser_idx', right_index=True, - suffixes=('', '_r')) + logger.debug("interaction_dataset pre-merge choosers %s alternatives %s alts_sample %s" % + (choosers.shape, alternatives.shape, alts_sample.shape)) + + AVOID_PD_MERGE = True + if AVOID_PD_MERGE: + + for c in choosers.columns: + c_alts = ('%s_r' % c) if c in alts_sample.columns else c + alts_sample[c_alts] = np.repeat(choosers[c].values, sample_size) + + else: + + # FIXME - merge throws error trying to merge df with two many rows - may be a pandas bug? + # this sets limits to max chunk size - might work to merge in chunks and join + # no pressing as there is currently no obvious performance gain to larger chunk size + # DEBUG - merge choosers (564016, 4) alternatives (1443, 16) alts_sample (813875088, 17) + # + # File "..\pandas\core\internals.py", line 5573, in is_na + # for i in range(0, total_len, chunk_len): + # OverflowError: Python int too large to convert to C long + + alts_sample = pd.merge( + alts_sample, choosers, left_on='chooser_idx', right_index=True, + suffixes=('', '_r')) + + logger.debug("interaction_dataset merged alts_sample %s" % (alts_sample.shape, )) return alts_sample @@ -369,3 +395,16 @@ def each_nest(nest_spec, type=None, post_order=False): for node, nest in _each_nest(nest_spec, parent_nest=Nest(), post_order=post_order): if type is None or (type == nest.type): yield nest + + +def count_nests(nest_spec, type=None): + """ + count the nests of the specified type (or all nests if type is None) + return 0 if nest_spec is none + """ + count = 0 + if nest_spec is not None: + for node, nest in _each_nest(nest_spec, parent_nest=Nest(), post_order=False): + if type is None or nest.type == type: + count += 1 + return count diff --git a/activitysim/core/pipeline.py b/activitysim/core/pipeline.py index 5ff5513277..c2c793a5d6 100644 --- a/activitysim/core/pipeline.py +++ b/activitysim/core/pipeline.py @@ -10,6 +10,8 @@ import logging import inject +from util import memory_info +from util import df_size import random import tracing @@ -176,13 +178,9 @@ def read_df(table_name, checkpoint_name=None): else: key = table_name - t0 = print_elapsed_time() - store = get_pipeline_store() df = store[key] - t0 = print_elapsed_time("read_df %s shape %s" % (key, df.shape,), t0, debug=True) - return df @@ -213,13 +211,9 @@ def write_df(df, table_name, checkpoint_name=None): else: key = table_name - t0 = print_elapsed_time() - store = get_pipeline_store() store[key] = df - t0 = print_elapsed_time("write_df %s shape %s" % (key, df.shape,), t0, debug=True) - def rewrap(table_name, df=None): """ @@ -295,18 +289,15 @@ def add_checkpoint(checkpoint_name): # if we have not already checkpointed it or it has changed # FIXME - this won't detect if the orca table was modified if len(orca.list_columns_for_table(table_name)): - logger.debug("add_checkpoint table_name %s rewrap" % (table_name,)) # rewrap the changed orca table as a unitary DataFrame-backed DataFrameWrapper table df = rewrap(table_name) elif table_name not in _PIPELINE.last_checkpoint or table_name in _PIPELINE.replaced_tables: df = orca.get_table(table_name).to_frame() - logger.debug("add_checkpoint table_name %s get_table" % (table_name,)) else: continue - logger.debug("add_checkpoint %s writing %s to store" % (checkpoint_name, table_name, )) - - # write it to store + logger.debug("add_checkpoint '%s' table '%s' %s" % + (checkpoint_name, table_name, df_size(df))) write_df(df, table_name, checkpoint_name) # remember which checkpoint it was last written @@ -348,6 +339,10 @@ def checkpointed_tables(): """ Return a list of the names of all checkpointed tables """ + + return [name for name, checkpoint_name in _PIPELINE.last_checkpoint.iteritems() + if checkpoint_name and name not in NON_TABLE_COLUMNS] + return [name for name in _PIPELINE.last_checkpoint.keys() if name not in NON_TABLE_COLUMNS] @@ -389,8 +384,6 @@ def load_checkpoint(checkpoint_name): del checkpoint[key] # patch _CHECKPOINTS array of dicts - # del _PIPELINE.checkpoints[:] - # _PIPELINE.checkpoints.extend(checkpoints) _PIPELINE.checkpoints = checkpoints # patch _CHECKPOINTS dict with latest checkpoint info @@ -400,17 +393,21 @@ def load_checkpoint(checkpoint_name): logger.info("load_checkpoint %s timestamp %s" % (checkpoint_name, _PIPELINE.last_checkpoint['timestamp'])) - # table names in order that tracing.register_traceable_table wants us to register them - tables = tracing.sort_for_registration(checkpointed_tables()) + tables = checkpointed_tables() + loaded_tables = {} for table_name in tables: # read dataframe from pipeline store df = read_df(table_name, checkpoint_name=_PIPELINE.last_checkpoint[table_name]) logger.info("load_checkpoint table %s %s" % (table_name, df.shape)) # register it as an orca table rewrap(table_name, df) - # register for tracing - tracing.register_traceable_table(table_name, df) + loaded_tables[table_name] = df + + # register for tracing in order that tracing.register_traceable_table wants us to register them + for table_name in tracing.traceable_tables(): + if table_name in loaded_tables: + tracing.register_traceable_table(table_name, loaded_tables[table_name]) # set random state to pickled state at end of last checkpoint logger.debug("resetting random state") @@ -454,7 +451,6 @@ def run_model(model_name): if model_name in [checkpoint[CHECKPOINT_NAME] for checkpoint in _PIPELINE.checkpoints]: raise RuntimeError("Cannot run model '%s' more than once" % model_name) - t0 = print_elapsed_time() _PIPELINE.prng.begin_step(model_name) # check for args @@ -467,6 +463,13 @@ def run_model(model_name): step_name = model_name args = {} + # check for no_checkpoint prefix + if step_name[0] == '_': + step_name = step_name[1:] + checkpoint = False + else: + checkpoint = True + inject.set_step_args(args) orca.run([step_name]) @@ -474,9 +477,12 @@ def run_model(model_name): inject.set_step_args(None) _PIPELINE.prng.end_step(model_name) - t0 = print_elapsed_time("run_model '%s'" % model_name, t0) - add_checkpoint(model_name) - t0 = print_elapsed_time("add_checkpoint '%s'" % model_name, t0) + if checkpoint: + t0 = print_elapsed_time() + add_checkpoint(model_name) + t0 = print_elapsed_time("add_checkpoint '%s'" % model_name, t0, debug=True) + else: + logger.warn("##### skipping %s checkpoint for %s\n" % (step_name, model_name)) def open_pipeline(resume_after=None): @@ -494,14 +500,11 @@ def open_pipeline(resume_after=None): logger.info("open_pipeline...") - t0 = print_elapsed_time() - if resume_after: # open existing pipeline logger.debug("open_pipeline - open existing pipeline") open_pipeline_store(overwrite=False) load_checkpoint(resume_after) - t0 = print_elapsed_time("load_checkpoint '%s'" % resume_after, t0) else: # open new, empty pipeline logger.debug("open_pipeline - new, empty pipeline") @@ -510,7 +513,6 @@ def open_pipeline(resume_after=None): # could have exogenous tables or prng instantiation under some circumstance?? _PIPELINE.last_checkpoint[CHECKPOINT_NAME] = INITIAL_CHECKPOINT_NAME # add_checkpoint(INITIAL_CHECKPOINT_NAME) - # t0 = print_elapsed_time("add_checkpoint '%s'" % INITIAL_CHECKPOINT_NAME, t0) logger.debug("open_pipeline complete") @@ -529,17 +531,6 @@ def close_pipeline(): logger.info("close_pipeline") -def preload_injectables(): - - t0 = print_elapsed_time() - - # load skim_stack - if orca.is_injectable('preload_injectables'): - orca.get_injectable('preload_injectables') - - t0 = print_elapsed_time("preload_injectables", t0) - - def run(models, resume_after=None): """ run the specified list of models, optionally loading checkpoint and resuming after specified @@ -562,13 +553,24 @@ def run(models, resume_after=None): if resume_after and resume_after in models: models = models[models.index(resume_after) + 1:] + t0 = print_elapsed_time() + open_pipeline(resume_after) + t0 = print_elapsed_time('open_pipeline', t0) - preload_injectables() + # preload any bulky injectables (e.g. skims) not in pipeline + if orca.is_injectable('preload_injectables'): + orca.get_injectable('preload_injectables') + t0 = print_elapsed_time('preload_injectables', t0) t0 = print_elapsed_time() for model in models: + t1 = print_elapsed_time() run_model(model) + t1 = print_elapsed_time("run_model %s)" % model, t1) + + logger.debug('#mem after %s, %s' % (model, memory_info())) + t0 = print_elapsed_time("run (%s models)" % len(models), t0) # don't close the pipeline, as the user may want to read intermediate results from the store @@ -602,19 +604,35 @@ def get_table(table_name, checkpoint_name=None): return orca.get_table(table_name).to_frame() - # was table ever checkpointed? - if table_name not in checkpointed_tables(): - raise RuntimeError("table '%s' not in checkpointed tables." % table_name) - # if they want current version of table, no need to read from pipeline store - if checkpoint_name is None or _PIPELINE.last_checkpoint[table_name] == checkpoint_name: + if checkpoint_name is None: + + if table_name not in _PIPELINE.last_checkpoint: + raise RuntimeError("table '%s' never checkpointed." % table_name) + + if not _PIPELINE.last_checkpoint[table_name]: + raise RuntimeError("table '%s' was dropped." % table_name) + # return orca.get_table(table_name).local return orca.get_table(table_name).to_frame() - if checkpoint_name not in [checkpoint[CHECKPOINT_NAME] for checkpoint in _PIPELINE.checkpoints]: + # find the requested checkpoint + checkpoint = \ + next((x for x in _PIPELINE.checkpoints if x['checkpoint_name'] == checkpoint_name), None) + if checkpoint is None: raise RuntimeError("checkpoint '%s' not in checkpoints." % checkpoint_name) - return read_df(table_name, checkpoint_name) + # find the checkpoint that table was written to store + last_checkpoint_name = checkpoint.get(table_name, None) + + if not last_checkpoint_name: + raise RuntimeError("table '%s' not in checkpoint '%s'." % (table_name, checkpoint_name)) + + # if this version of table is same as current + if _PIPELINE.last_checkpoint.get(table_name, None) == last_checkpoint_name: + return orca.get_table(table_name).to_frame() + + return read_df(table_name, last_checkpoint_name) def get_checkpoints(): @@ -635,8 +653,6 @@ def get_checkpoints(): pipeline_file_path = orca.get_injectable('pipeline_path') df = pd.read_hdf(pipeline_file_path, CHECKPOINT_TABLE_NAME) - return df - # non-table columns first (column order in df is random because created from a dict) table_names = [name for name in df.columns.values if name not in NON_TABLE_COLUMNS] @@ -697,3 +713,34 @@ def extend_table(table_name, df): df = pd.concat([extend_df, df])[columns] replace_table(table_name, df) + + return df + + +def drop_table(table_name): + + if orca.is_table(table_name): + + logger.debug("drop_table dropping orca table '%s'" % table_name) + + # don't trigger function call of TableFuncWrapper + t = orca.get_raw_table(table_name) + t.clear_cached() + + for column_name in orca.list_columns_for_table(table_name): + # logger.debug("pop %s.%s: %s" % (table_name, column_name, t.column_type(column_name))) + orca.orca._COLUMNS.pop((table_name, column_name), None) + + # remove from orca's table list + orca.orca._TABLES.pop(table_name, None) + + if table_name in _PIPELINE.replaced_tables: + + logger.debug("drop_table forgetting replaced_tables '%s'" % table_name) + del _PIPELINE.replaced_tables[table_name] + + if table_name in _PIPELINE.last_checkpoint: + + logger.debug("drop_table removing table %s from last_checkpoint" % table_name) + + _PIPELINE.last_checkpoint[table_name] = '' diff --git a/activitysim/core/random.py b/activitysim/core/random.py index a012f6063a..2fb73000ea 100644 --- a/activitysim/core/random.py +++ b/activitysim/core/random.py @@ -707,10 +707,8 @@ def random_for_df(self, df, n=1): rands = np.asanyarray([rng.rand(n) for _ in range(len(df))]) return rands - t0 = print_elapsed_time() channel = self.get_channel_for_df(df) rands = channel.random_for_df(df, self.step_name, n) - t0 = print_elapsed_time("random_for_df for %s rows" % len(df.index), t0, debug=True) return rands def choice_for_df(self, df, a, size, replace): diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 264833c44c..bd78ad468b 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -1,6 +1,8 @@ # ActivitySim # See full license in LICENSE.txt. +from __future__ import print_function + from math import ceil import os import logging @@ -13,168 +15,11 @@ from . import tracing from . import pipeline +from . import util -logger = logging.getLogger(__name__) - - -def num_chunk_rows_for_chunk_size(chunk_size, choosers, alternatives=None, by_chunk_id=False): - - # FIXME - chunk size should take number of chooser and alternative columns into account - # FIXME - that is, chunk size should represent memory footprint (rows X columns) not just rows - - if by_chunk_id: - num_choosers = choosers['chunk_id'].max() + 1 - else: - num_choosers = len(choosers.index) - - # if not chunking, then return num_choosers - if chunk_size == 0: - return num_choosers - - row_size = len(choosers.columns) - - if alternatives is not None: - alt_row_size = len(alternatives.columns) - # logger.debug('num_chunk_rows_for_chunk_size row_size %s alt_row_size %s' - # % (row_size, alt_row_size,)) - row_size = row_size * alt_row_size - - if by_chunk_id: - # scale row_size by average number of chooser rows per chunk_id - rows_per_chunk_id = len(choosers.index) / float(num_choosers) - row_size = int(rows_per_chunk_id * row_size) - # logger.debug('num_chunk_rows_for_chunk_size by_chunk_id rows_per_chunk_id %s' - # % (rows_per_chunk_id,)) +from . import chunk - # closest number of chooser rows to achieve chunk_size - rows_per_chunk = int(round(chunk_size / float(row_size))) - rows_per_chunk = max(rows_per_chunk, 1) - - logger.info("num_chunk_rows_for_chunk_size %s row_size %s rows_per_chunk %s " - "num_choosers %s chunks %s" - % (chunk_size, row_size, rows_per_chunk, - num_choosers, int(ceil(num_choosers / float(rows_per_chunk))))) - - return rows_per_chunk - - -def chunked_choosers(choosers, rows_per_chunk): - # generator to iterate over choosers in chunk_size chunks - num_choosers = len(choosers.index) - num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) - - i = offset = 0 - while offset < num_choosers: - yield i+1, num_chunks, choosers.iloc[offset: offset+rows_per_chunk] - offset += rows_per_chunk - i += 1 - - -def chunked_choosers_and_alts(choosers, alternatives, rows_per_chunk): - """ - generator to iterate over choosers and alternatives in chunk_size chunks - - like chunked_choosers, but also chunks alternatives - for use with sampled alternatives which will have different alternatives (and numbers of alts) - - There may be up to sample_size (or as few as one) alternatives for each chooser - because alternatives may have been sampled more than once, but pick_count for those - alternatives will always sum to sample_size. - - When we chunk the choosers, we need to take care chunking the alternatives as there are - varying numbers of them for each chooser. Since alternatives appear in the same order - as choosers, we can use cumulative pick_counts to identify boundaries of sets of alternatives - - Parameters - ---------- - choosers - alternatives : pandas DataFrame - sample alternatives including pick_count column in same order as choosers - rows_per_chunk : int - - Yields - ------- - i : int - one-based index of current chunk - num_chunks : int - total number of chunks that will be yielded - choosers : pandas DataFrame slice - chunk of choosers - alternatives : pandas DataFrame slice - chunk of alternatives for chooser chunk - """ - - assert 'pick_count' in alternatives.columns or choosers.index.name == alternatives.index.name - - num_choosers = len(choosers.index) - num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) - - if choosers.index.name == alternatives.index.name: - assert choosers.index.name == alternatives.index.name - - # alt chunks boundaries are where index changes - alt_ids = alternatives.index.values - alt_chunk_end = np.where(alt_ids[:-1] != alt_ids[1:])[0] + 1 - alt_chunk_end = np.append([0], alt_chunk_end) # including the first... - alt_chunk_end = alt_chunk_end[rows_per_chunk::rows_per_chunk] - - else: - # used to do it this way for school and workplace (which are sampled based on prob) - # since the utility expressions need to know pick_count for sample correction - # but for now the assumption that choosers and alternatives share indexes is more general - # leaving this (previously correct) code here for now in case that changes... - assert False - - # assert 'pick_count' in alternatives.columns - # assert 'cum_pick_count' not in alternatives.columns - # alternatives['cum_pick_count'] = alternatives['pick_count'].cumsum() - # - # # currently no convenient way to remember sample_size across steps - # pick_count = alternatives.cum_pick_count.iat[-1] - # sample_size = pick_count / len(choosers.index) - # assert pick_count % sample_size == 0 - # - # alt_chunk_size = rows_per_chunk * sample_size - # - # # array of indices of starts of alt chunks - # alt_chunk_end = np.where(alternatives['cum_pick_count'] % alt_chunk_size == 0)[0] + 1 - - # add index to end of array to capture any final partial chunk - alt_chunk_end = np.append(alt_chunk_end, [len(alternatives.index)]) - - i = offset = alt_offset = 0 - while offset < num_choosers: - - alt_end = alt_chunk_end[i] - - chooser_chunk = choosers[offset: offset + rows_per_chunk] - alternative_chunk = alternatives[alt_offset: alt_end] - - assert len(chooser_chunk.index) == len(np.unique(alternative_chunk.index.values)) - - yield i+1, num_chunks, chooser_chunk, alternative_chunk - - i += 1 - offset += rows_per_chunk - alt_offset = alt_end - - -def hh_chunked_choosers(choosers, rows_per_chunk): - # generator to iterate over choosers in chunk_size chunks - # like chunked_choosers but based on chunk_id field rather than dataframe length - # (the presumption is that choosers has multiple rows with the same chunk_id that - # all have to be included in the same chunk) - # FIXME - we pathologically know name of chunk_id col in households table - - num_choosers = choosers['chunk_id'].max() + 1 - num_chunks = (num_choosers // rows_per_chunk) + (num_choosers % rows_per_chunk > 0) - - i = offset = 0 - while offset < num_choosers: - chooser_chunk = choosers[choosers['chunk_id'].between(offset, offset + rows_per_chunk - 1)] - yield i+1, num_chunks, chooser_chunk - offset += rows_per_chunk - i += 1 +logger = logging.getLogger(__name__) def random_rows(df, n): @@ -275,9 +120,11 @@ def to_series(x): return x value_list = [] - # need to be able to identify which variables causes an error, which keeps - # this from being expressed more parsimoniously + print('eval_variables', end='') for expr in exprs: + print('.', end='') + # logger.debug("eval_variables: %s" % expr) + # logger.debug("eval_variables %s" % util.memory_info()) try: if expr.startswith('@'): expr_values = to_series(eval(expr[1:], globals(), locals_d)) @@ -285,8 +132,10 @@ def to_series(x): expr_values = df.eval(expr) value_list.append((expr, expr_values)) except Exception as err: + print() logger.exception("Variable evaluation failed for: %s" % str(expr)) raise err + print() values = pd.DataFrame.from_items(value_list) @@ -534,6 +383,8 @@ def eval_mnl(choosers, spec, locals_d, """ trace_label = tracing.extend_trace_label(trace_label, 'mnl') + have_trace_targets = trace_label and tracing.has_trace_targets(choosers) + check_for_variability = tracing.check_for_variability() t0 = tracing.print_elapsed_time() @@ -558,7 +409,13 @@ def eval_mnl(choosers, spec, locals_d, choices, rands = logit.make_choices(probs, trace_label=trace_label, trace_choosers=choosers) t0 = tracing.print_elapsed_time("logit.make_choices", t0, debug=True) - if trace_label: + cum_size = chunk.log_df_size(trace_label, 'choosers', choosers, cum_size=None) + cum_size = chunk.log_df_size(trace_label, 'expression_values', expression_values, cum_size) + cum_size = chunk.log_df_size(trace_label, "utilities", utilities, cum_size) + cum_size = chunk.log_df_size(trace_label, "probs", probs, cum_size) + chunk.log_chunk_size(trace_label, cum_size) + + if have_trace_targets: tracing.trace_df(choosers, '%s.choosers' % trace_label) tracing.trace_df(utilities, '%s.utilities' % trace_label, @@ -609,6 +466,8 @@ def eval_nl(choosers, spec, nest_spec, locals_d, """ trace_label = tracing.extend_trace_label(trace_label, 'nl') + have_trace_targets = trace_label and tracing.has_trace_targets(choosers) + check_for_variability = tracing.check_for_variability() t0 = tracing.print_elapsed_time() @@ -657,7 +516,15 @@ def eval_nl(choosers, spec, nest_spec, locals_d, choices, rands = logit.make_choices(base_probabilities, trace_label, trace_choosers=choosers) t0 = tracing.print_elapsed_time("logit.make_choices", t0, debug=True) - if trace_label: + cum_size = chunk.log_df_size(trace_label, 'choosers', choosers, cum_size=None) + cum_size = chunk.log_df_size(trace_label, "expression_values", expression_values, cum_size) + cum_size = chunk.log_df_size(trace_label, "raw_utilities", raw_utilities, cum_size) + cum_size = chunk.log_df_size(trace_label, "nested_exp_utils", nested_exp_utilities, cum_size) + cum_size = chunk.log_df_size(trace_label, "nested_probs", nested_probabilities, cum_size) + cum_size = chunk.log_df_size(trace_label, "base_probs", base_probabilities, cum_size) + chunk.log_chunk_size(trace_label, cum_size) + + if have_trace_targets: tracing.trace_df(choosers, '%s.choosers' % trace_label) tracing.trace_df(raw_utilities, '%s.raw_utilities' % trace_label, column_labels=['alternative', 'utility']) @@ -717,11 +584,10 @@ def _simple_simulate(choosers, spec, nest_spec, skims=None, locals_d=None, Index will be that of `choosers`, values will match the columns of `spec`. """ + if skims: add_skims(choosers, skims) - trace_label = tracing.extend_trace_label(trace_label, 'simple_simulate') - if nest_spec is None: choices = eval_mnl(choosers, spec, locals_d, trace_label=trace_label, trace_choice_name=trace_choice_name) @@ -732,6 +598,42 @@ def _simple_simulate(choosers, spec, nest_spec, skims=None, locals_d=None, return choices +def simple_simulate_rpc(chunk_size, choosers, spec, nest_spec, trace_label): + """ + rows_per_chunk calculator for simple_simulate + """ + + num_choosers = len(choosers.index) + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = len(choosers.columns) + + if nest_spec is None: + # expression_values for each spec row + # utilities and probs for each alt + extra_columns = spec.shape[0] + (2 * spec.shape[1]) + else: + # expression_values for each spec row + # raw_utilities and base_probabilities) for each alt + # nested_exp_utilities, nested_probabilities for each nest + # less 1 as nested_probabilities lacks root + nest_count = logit.count_nests(nest_spec) + extra_columns = spec.shape[0] + (2 * spec.shape[1]) + (2 * nest_count) - 1 + + logger.debug("%s #chunk_calc nest_count %s" % (trace_label, nest_count)) + + row_size = chooser_row_size + extra_columns + + logger.debug("%s #chunk_calc choosers %s" % (trace_label, choosers.shape)) + logger.debug("%s #chunk_calc spec %s" % (trace_label, spec.shape)) + logger.debug("%s #chunk_calc extra_columns %s" % (trace_label, extra_columns)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def simple_simulate(choosers, spec, nest_spec, skims=None, locals_d=None, chunk_size=0, trace_label=None, trace_choice_name=None): """ @@ -740,23 +642,28 @@ def simple_simulate(choosers, spec, nest_spec, skims=None, locals_d=None, chunk_ properties and no need to sample from alternatives. """ + trace_label = tracing.extend_trace_label(trace_label, 'simple_simulate') + assert len(choosers) > 0 - num_chunk_rows = num_chunk_rows_for_chunk_size(chunk_size, choosers) + rows_per_chunk = simple_simulate_rpc(chunk_size, choosers, spec, nest_spec, trace_label) - logger.info("simple_simulate num_chunk_rows %s num_choosers %s" % - (num_chunk_rows, len(choosers.index))) + logger.info("simple_simulate rows_per_chunk %s num_choosers %s" % + (rows_per_chunk, len(choosers.index))) result_list = [] # segment by person type and pick the right spec for each person type - for i, num_chunks, chooser_chunk in chunked_choosers(choosers, num_chunk_rows): + for i, num_chunks, chooser_chunk in chunk.chunked_choosers(choosers, rows_per_chunk): logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + chunk_trace_label = tracing.extend_trace_label(trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else trace_label + choices = _simple_simulate( chooser_chunk, spec, nest_spec, skims, locals_d, - tracing.extend_trace_label(trace_label, 'chunk_%s' % i), + chunk_trace_label, trace_choice_name) result_list.append(choices) @@ -780,24 +687,34 @@ def eval_mnl_logsums(choosers, spec, locals_d, trace_label=None): """ trace_label = tracing.extend_trace_label(trace_label, 'mnl') + have_trace_targets = trace_label and tracing.has_trace_targets(choosers) + check_for_variability = tracing.check_for_variability() logger.debug("running eval_mnl_logsums") + t0 = tracing.print_elapsed_time() expression_values = eval_variables(spec.index, choosers, locals_d) + t0 = tracing.print_elapsed_time("eval_variables", t0, debug=True) if check_for_variability: _check_for_variability(expression_values, trace_label) # utility values utilities = compute_utilities(expression_values, spec) + t0 = tracing.print_elapsed_time("compute_utilities", t0, debug=True) # logsum is log of exponentiated utilities summed across columns of each chooser row utils_arr = utilities.as_matrix().astype('float') logsums = np.log(np.exp(utils_arr).sum(axis=1)) logsums = pd.Series(logsums, index=choosers.index) - if trace_label: + cum_size = chunk.log_df_size(trace_label, 'choosers', choosers, cum_size=None) + cum_size = chunk.log_df_size(trace_label, 'expression_values', expression_values, cum_size) + cum_size = chunk.log_df_size(trace_label, "utilities", utilities, cum_size) + chunk.log_chunk_size(trace_label, cum_size) + + if have_trace_targets: # add logsum to utilities for tracing utilities['logsum'] = logsums @@ -822,7 +739,9 @@ def eval_nl_logsums(choosers, spec, nest_spec, locals_d, trace_label=None): Index will be that of `choosers`, values will be nest logsum based on spec column values """ - trace_label = tracing.extend_trace_label(trace_label, 'nl_logsums') + trace_label = tracing.extend_trace_label(trace_label, 'nl') + have_trace_targets = trace_label and tracing.has_trace_targets(choosers) + check_for_variability = tracing.check_for_variability() logger.debug("running eval_nl_logsums") @@ -848,7 +767,13 @@ def eval_nl_logsums(choosers, spec, nest_spec, locals_d, trace_label=None): logsums = pd.Series(logsums, index=choosers.index) t0 = tracing.print_elapsed_time("logsums", t0, debug=True) - if trace_label: + cum_size = chunk.log_df_size(trace_label, 'choosers', choosers, cum_size=None) + cum_size = chunk.log_df_size(trace_label, 'expression_values', expression_values, cum_size) + cum_size = chunk.log_df_size(trace_label, "raw_utilities", raw_utilities, cum_size) + cum_size = chunk.log_df_size(trace_label, "nested_exp_utils", nested_exp_utilities, cum_size) + chunk.log_chunk_size(trace_label, cum_size) + + if have_trace_targets: # add logsum to nested_exp_utilities for tracing nested_exp_utilities['logsum'] = logsums @@ -873,11 +798,10 @@ def _simple_simulate_logsums(choosers, spec, nest_spec, logsums : pandas.Series Index will be that of `choosers`, values will be nest logsum based on spec column values """ + if skims: add_skims(choosers, skims) - trace_label = tracing.extend_trace_label(trace_label, 'simple_simulate_logsums') - if nest_spec is None: logsums = eval_mnl_logsums(choosers, spec, locals_d, trace_label=trace_label) else: @@ -886,6 +810,36 @@ def _simple_simulate_logsums(choosers, spec, nest_spec, return logsums +def simple_simulate_logsums_rpc(chunk_size, choosers, spec, nest_spec, trace_label): + + num_choosers = len(choosers.index) + + # if not chunking, then return num_choosers + if chunk_size == 0: + return num_choosers + + chooser_row_size = len(choosers.columns) + + if nest_spec is None: + # expression_values for each spec row + # utilities for each alt + extra_columns = spec.shape[0] + spec.shape[1] + logger.warn("simple_simulate_logsums_rpc rows_per_chunk not validated for mnl" + " so chunk sizing might be a bit off") + else: + # expression_values for each spec row + # raw_utilities for each alt + # nested_exp_utilities for each nest + extra_columns = spec.shape[0] + spec.shape[1] + logit.count_nests(nest_spec) + + row_size = chooser_row_size + extra_columns + + logger.debug("%s #chunk_calc chooser_row_size %s" % (trace_label, chooser_row_size)) + logger.debug("%s #chunk_calc extra_columns %s" % (trace_label, extra_columns)) + + return chunk.rows_per_chunk(chunk_size, row_size, num_choosers, trace_label) + + def simple_simulate_logsums(choosers, spec, nest_spec, skims=None, locals_d=None, chunk_size=0, trace_label=None): @@ -898,23 +852,27 @@ def simple_simulate_logsums(choosers, spec, nest_spec, Index will be that of `choosers`, values will be nest logsum based on spec column values """ - assert len(choosers) > 0 + trace_label = tracing.extend_trace_label(trace_label, 'simple_simulate_logsums') - num_chunk_rows = num_chunk_rows_for_chunk_size(chunk_size, choosers) + assert len(choosers) > 0 - logger.info("simple_simulate_logsums chunk_size %s num_choosers %s, num_chunk_rows %s" % - (chunk_size, len(choosers.index), num_chunk_rows)) + rows_per_chunk = simple_simulate_logsums_rpc(chunk_size, choosers, spec, nest_spec, trace_label) + logger.info("%s chunk_size %s num_choosers %s, rows_per_chunk %s" % + (trace_label, chunk_size, len(choosers.index), rows_per_chunk)) result_list = [] # segment by person type and pick the right spec for each person type - for i, num_chunks, chooser_chunk in chunked_choosers(choosers, num_chunk_rows): + for i, num_chunks, chooser_chunk in chunk.chunked_choosers(choosers, rows_per_chunk): logger.info("Running chunk %s of %s size %d" % (i, num_chunks, len(chooser_chunk))) + chunk_trace_label = tracing.extend_trace_label(trace_label, 'chunk_%s' % i) \ + if num_chunks > 1 else trace_label + logsums = _simple_simulate_logsums( chooser_chunk, spec, nest_spec, skims, locals_d, - tracing.extend_trace_label(trace_label, 'chunk_%s' % i)) + chunk_trace_label) result_list.append(logsums) diff --git a/activitysim/core/test/extensions/steps.py b/activitysim/core/test/extensions/steps.py index 3720e3a036..7a76abd291 100644 --- a/activitysim/core/test/extensions/steps.py +++ b/activitysim/core/test/extensions/steps.py @@ -2,20 +2,66 @@ import pandas as pd from activitysim.core import inject from activitysim.core import pipeline +from activitysim.core import tracing @inject.step() def step1(): - table1 = pd.DataFrame({'column1': [1, 2, 3]}) + table1 = pd.DataFrame({'c': [1, 2, 3]}) inject.add_table('table1', table1) @inject.step() def step2(): + table1 = pd.DataFrame({'c': [2, 4, 6]}) + inject.add_table('table2', table1) + + +@inject.step() +def step3(): + + table1 = pd.DataFrame({'c': [3, 6, 9]}) + inject.add_table('table3', table1) + + +@inject.step() +def step_add_col(): + + table_name = inject.get_step_arg('table_name') + assert table_name is not None + + col_name = inject.get_step_arg('column_name') + assert col_name is not None + + table = pipeline.get_table(table_name) + + assert col_name not in table.columns + + table[col_name] = table.index + (1000 * len(table.columns)) + + pipeline.replace_table(table_name, table) + + +@inject.step() +def step_forget_tab(): + table_name = inject.get_step_arg('table_name') assert table_name is not None - table2 = pd.DataFrame({'column1': [10, 20, 30]}) - inject.add_table(table_name, table2) + table = pipeline.get_table(table_name) + + pipeline.drop_table(table_name) + + +@inject.step() +def create_households(trace_hh_id): + + df = pd.DataFrame({'HHID': [1, 2, 3], 'TAZ': {100, 100, 101}}) + inject.add_table('households', df) + + pipeline.get_rn_generator().add_channel(df, 'households') + + if trace_hh_id: + tracing.register_traceable_table('households', df) diff --git a/activitysim/core/test/test_pipeline.py b/activitysim/core/test/test_pipeline.py index 6f41014db2..61670811ba 100644 --- a/activitysim/core/test/test_pipeline.py +++ b/activitysim/core/test/test_pipeline.py @@ -12,7 +12,7 @@ import pytest import yaml -from . import extensions +import extensions from activitysim.core import tracing from activitysim.core import pipeline @@ -23,6 +23,25 @@ HH_ID = 961042 +def setup(): + + orca.orca._INJECTABLES.pop('skim_dict', None) + orca.orca._INJECTABLES.pop('skim_stack', None) + + configs_dir = os.path.join(os.path.dirname(__file__), 'configs') + orca.add_injectable("configs_dir", configs_dir) + + output_dir = os.path.join(os.path.dirname(__file__), 'output') + orca.add_injectable("output_dir", output_dir) + + data_dir = os.path.join(os.path.dirname(__file__), 'data') + orca.add_injectable("data_dir", data_dir) + + orca.clear_cache() + + tracing.config_logger() + + def teardown_function(func): orca.clear_cache() inject.reinject_decorated_tables() @@ -40,39 +59,34 @@ def close_handlers(): def test_pipeline_run(): - orca.orca._INJECTABLES.pop('skim_dict', None) - orca.orca._INJECTABLES.pop('skim_stack', None) - - configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) - - output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) - - data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) - - orca.clear_cache() - - tracing.config_logger() + setup() _MODELS = [ 'step1', + 'step2', + 'step3', + 'step_add_col.table_name=table2;column_name=c2' ] pipeline.run(models=_MODELS, resume_after=None) - table1 = pipeline.get_table("table1").column1 + checkpoints = pipeline.get_checkpoints() + print "checkpoints\n", checkpoints + + c2 = pipeline.get_table("table2").c2 - # test that model arg is passed to step - pipeline.run_model('step2.table_name=table2') + # get table from + pipeline.get_table("table1", checkpoint_name="step3") - table2 = pipeline.get_table("table2").column1 + # try to get a table from a step before it was checkpointed + with pytest.raises(RuntimeError) as excinfo: + pipeline.get_table("table2", checkpoint_name="step1") + assert "not in checkpoint 'step1'" in str(excinfo.value) # try to get a non-existant table with pytest.raises(RuntimeError) as excinfo: pipeline.get_table("bogus") - assert "not in checkpointed tables" in str(excinfo.value) + assert "never checkpointed" in str(excinfo.value) # try to get an existing table from a non-existant checkpoint with pytest.raises(RuntimeError) as excinfo: @@ -80,6 +94,50 @@ def test_pipeline_run(): assert "not in checkpoints" in str(excinfo.value) pipeline.close_pipeline() - orca.clear_cache() close_handlers() + + +def test_pipeline_checkpoint_drop(): + + setup() + + _MODELS = [ + 'step1', + '_step2', + '_step_add_col.table_name=table2;column_name=c2', + '_step_forget_tab.table_name=table2', + 'step3', + 'step_forget_tab.table_name=table3', + ] + pipeline.run(models=_MODELS, resume_after=None) + + checkpoints = pipeline.get_checkpoints() + print "checkpoints\n", checkpoints + + pipeline.get_table("table1") + + with pytest.raises(RuntimeError) as excinfo: + pipeline.get_table("table2") + assert "never checkpointed" in str(excinfo.value) + + # can't get a dropped table from current checkpoint + with pytest.raises(RuntimeError) as excinfo: + pipeline.get_table("table3") + assert "was dropped" in str(excinfo.value) + + # ensure that we can still get table3 from a checkpoint at which it existed + pipeline.get_table("table3", checkpoint_name="step3") + + pipeline.close_pipeline() + close_handlers() + +# if __name__ == "__main__": +# +# print "\n\ntest_pipeline_run" +# test_pipeline_run() +# teardown_function(None) +# +# print "\n\ntest_pipeline_checkpoint_drop" +# test_pipeline_checkpoint_drop() +# teardown_function(None) diff --git a/activitysim/core/test/test_simulate.py b/activitysim/core/test/test_simulate.py index acc2b303d8..25537f0db5 100644 --- a/activitysim/core/test/test_simulate.py +++ b/activitysim/core/test/test_simulate.py @@ -10,7 +10,7 @@ import orca -from .. import simulate as asim +from .. import simulate @pytest.fixture(scope='module') @@ -25,9 +25,10 @@ def spec_name(data_dir): @pytest.fixture(scope='module') def spec(data_dir, spec_name): - return asim.read_model_spec(data_dir, spec_name, - description_name='description', - expression_name='expression') + return simulate.read_model_spec( + data_dir, spec_name, + description_name='description', + expression_name='expression') @pytest.fixture(scope='module') @@ -37,7 +38,7 @@ def data(data_dir): def test_read_model_spec(data_dir, spec_name): - spec = asim.read_model_spec( + spec = simulate.read_model_spec( data_dir, spec_name, description_name='description', expression_name='expression') @@ -51,7 +52,7 @@ def test_read_model_spec(data_dir, spec_name): def test_eval_variables(spec, data): - result = asim.eval_variables(spec.index, data, target_type=None) + result = simulate.eval_variables(spec.index, data, target_type=None) expected_result = pd.DataFrame([ [True, False, 4, 1], @@ -61,7 +62,7 @@ def test_eval_variables(spec, data): pdt.assert_frame_equal(result, expected_result, check_names=False) - result = asim.eval_variables(spec.index, data, target_type=float) + result = simulate.eval_variables(spec.index, data, target_type=float) expected_result = pd.DataFrame([ [1.0, 0.0, 4.0, 1.0], @@ -76,7 +77,7 @@ def test_simple_simulate(data, spec): orca.add_injectable("check_for_variability", False) - choices = asim.simple_simulate(data, spec, nest_spec=None) + choices = simulate.simple_simulate(data, spec, nest_spec=None) expected = pd.Series([1, 1, 1], index=data.index) pdt.assert_series_equal(choices, expected) @@ -85,6 +86,6 @@ def test_simple_simulate_chunked(data, spec): orca.add_injectable("check_for_variability", False) - choices = asim.simple_simulate(data, spec, nest_spec=None, chunk_size=2) + choices = simulate.simple_simulate(data, spec, nest_spec=None, chunk_size=2) expected = pd.Series([1, 1, 1], index=data.index) pdt.assert_series_equal(choices, expected) diff --git a/activitysim/core/timetable.py b/activitysim/core/timetable.py index 209a5b7745..c2efd557d0 100644 --- a/activitysim/core/timetable.py +++ b/activitysim/core/timetable.py @@ -8,6 +8,8 @@ from activitysim.core import config from activitysim.core import pipeline +from activitysim.core import tracing +from activitysim.core import util logger = logging.getLogger(__name__) @@ -246,17 +248,26 @@ def tour_available(self, window_row_ids, tdds): assert len(window_row_ids) == len(tdds) + # t0 = tracing.print_elapsed_time() + # numpy array with one tdd_footprints_df row for tdds - tour_footprints = self.tdd_footprints_df.loc[tdds].as_matrix() + tour_footprints = util.quick_loc_df(tdds, self.tdd_footprints_df).as_matrix() + + # t0 = tracing.print_elapsed_time("tour_footprints", t0, debug=True) + # assert (tour_footprints == self.tdd_footprints_df.loc[tdds].as_matrix()).all # numpy array with one windows row for each person windows = self.slice_windows_by_row_id(window_row_ids) + # t0 = tracing.print_elapsed_time("slice_windows_by_row_id", t0, debug=True) + x = tour_footprints + (windows << I_BIT_SHIFT) available = ~np.isin(x, COLLISION_LIST).any(axis=1) available = pd.Series(available, index=window_row_ids.index) + # t0 = tracing.print_elapsed_time("available", t0, debug=True) + return available def assign(self, window_row_ids, tdds): diff --git a/activitysim/core/tracing.py b/activitysim/core/tracing.py index b57d0b7e4f..b9e2951d4d 100644 --- a/activitysim/core/tracing.py +++ b/activitysim/core/tracing.py @@ -41,7 +41,7 @@ def print_elapsed_time(msg=None, t0=None, debug=False): t1 = time.time() if msg: t = t1 - (t0 or t1) - msg = "Time to execute %s : %s seconds (%s minutes)" % (msg, round(t, 3), round(t/60.0)) + msg = "Time to execute %s : %s seconds (%s minutes)" % (msg, round(t, 3), round(t/60.0, 1)) if debug: logger.debug(msg) else: @@ -291,8 +291,8 @@ def register_trips(df, trace_hh_id): """ Register with inject for tracing - create an injectable 'trace_tour_ids' with a list of tour_ids in household we are tracing. - This allows us to slice by tour_id without requiring presence of person_id column + create an injectable 'trace_trip_ids' with a list of tour_ids in household we are tracing. + This allows us to slice by trip_id without requiring presence of person_id column Parameters ---------- @@ -307,7 +307,7 @@ def register_trips(df, trace_hh_id): Nothing """ - # get list of persons in traced household (should already have been registered) + # get list of tours in traced household (should already have been registered) tour_ids = inject.get_injectable("trace_tour_ids", []) if len(tour_ids) == 0: @@ -356,21 +356,12 @@ def register_traceable_table(table_name, df): register_tours(df, trace_hh_id) -def sort_for_registration(table_names): +def traceable_tables(): # names of all traceable tables ordered by dependency on household_id # e.g. 'persons' has to be registered AFTER 'households' - preferred_order = ['households', 'persons', 'non_mandatory_tours', 'mandatory_tours', 'trips'] - - table_names = list(table_names) - - for table_name in reversed(preferred_order): - if table_name in table_names: - # move it to the end of the list - table_names.remove(table_name) - table_names.append(table_name) - return reversed(table_names) + return ['households', 'persons', 'tours', 'trips'] def write_df_csv(df, file_path, index_label=None, columns=None, column_labels=None, transpose=True): diff --git a/activitysim/core/util.py b/activitysim/core/util.py index d09b900695..85aef298f9 100644 --- a/activitysim/core/util.py +++ b/activitysim/core/util.py @@ -1,6 +1,7 @@ import os import psutil import gc +import logging from operator import itemgetter @@ -9,14 +10,29 @@ from zbox import toolz as tz +logger = logging.getLogger(__name__) + + +def GB(bytes): + gb = (bytes / (1024 * 1024 * 1024.0)) + return "%s GB" % (round(gb, 2), ) + + +def df_size(df): + bytes = df.memory_usage(index=True).sum() + return "%s %s" % (df.shape, GB(bytes)) + def memory_info(): + + mi = psutil.Process().memory_full_info() + return "memory_info: vms: %s rss: %s uss: %s" % (GB(mi.vms), GB(mi.rss), GB(mi.uss)) + + +def force_garbage_collect(): + gc.collect() - process = psutil.Process(os.getpid()) - bytes = process.memory_info().rss - mb = (bytes / (1024 * 1024.0)) - gb = (bytes / (1024 * 1024 * 1024.0)) - return "memory_info: %s MB (%s GB)" % (int(mb), round(gb, 2)) + logger.debug("force_garbage_collect %s" % memory_info()) def left_merge_on_index_and_col(left_df, right_df, join_col, target_col): @@ -145,9 +161,9 @@ def other_than(groups, bools): return gt1.where(bools, other=gt0) -def quick_loc_df(loc_list, target_df, attribute): +def quick_loc_df(loc_list, target_df, attribute=None): """ - faster replacement for target_df.loc[loc_list][attribute] + faster replacement for target_df.loc[loc_list] or target_df.loc[loc_list][attribute] pandas DataFrame.loc[] indexing doesn't scale for large arrays (e.g. > 1,000,000 elements) @@ -155,11 +171,11 @@ def quick_loc_df(loc_list, target_df, attribute): ---------- loc_list : list-like (numpy.ndarray, pandas.Int64Index, or pandas.Series) target_df : pandas.DataFrame containing column named attribute - attribute : name of column from loc_list to return + attribute : name of column from loc_list to return (or none for all columns) Returns ------- - pandas.Series + pandas.DataFrame or, if attribbute specified, pandas.Series """ left_on = "left" @@ -173,16 +189,26 @@ def quick_loc_df(loc_list, target_df, attribute): else: raise RuntimeError("quick_loc_df loc_list of unexpected type %s" % type(loc_list)) + if attribute: + target_df = target_df[[attribute]] + df = pd.merge(left_df, - target_df[[attribute]], + target_df, left_on=left_on, right_index=True, - how="left") + how="left").set_index(left_on) + + df.index.name = target_df.index.name # regression test - # assert list(df[attribute]) == list(target_df.loc[loc_list][attribute]) + # assert df.equals(target_df.loc[loc_list]) - return df[attribute] + if attribute: + # return series + return df[attribute] + else: + # return df + return df def quick_loc_series(loc_list, target_series): diff --git a/docs/howitworks.rst b/docs/howitworks.rst index e2818ea210..7b60085ba8 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -264,7 +264,7 @@ trace labels are passed in. skims=skims, locals_d=locals_d, chunk_size=chunk_size, - trace_label=trace_hh_id and 'school_location_sample.%s' % school_type) + trace_label=school_location_sample.%s' % school_type) This function solves the utilities, calculates probabilities, draws random numbers, selects choices with replacement, and returns the choices. This is done in a for loop of chunks of chooser records in order to avoid diff --git a/example/configs/settings.yaml b/example/configs/settings.yaml index 5e8acbea92..c718f7bcfe 100644 --- a/example/configs/settings.yaml +++ b/example/configs/settings.yaml @@ -6,13 +6,13 @@ skims_file: skims.omx households_sample_size: 100 #trace household id; comment out for no trace -trace_hh_id: 961042 +#trace_hh_id: 961042 # trace origin, destination in accessibility calculation -trace_od: [5, 11] +#trace_od: [5, 11] #internal settings -chunk_size: 1000000 +chunk_size: 400000000 # comment out or set false to disable variability check in simple_simulate and interaction_simulate @@ -44,9 +44,18 @@ models: - atwork_subtour_mode_choice_simulate - create_simple_trips - trip_mode_choice_simulate + - write_data_dictionary + - write_tables #resume_after: tour_mode_choice_simulate +output_tables: + action: include + prefix: final_ + tables: + - checkpoints + + # area_types less than this are considered urban urban_threshold: 4 cbd_threshold: 2 diff --git a/example/output/.gitignore b/example/output/.gitignore index f4b7708c65..4a2323ec0a 100644 --- a/example/output/.gitignore +++ b/example/output/.gitignore @@ -2,3 +2,4 @@ *.log *.prof *.h5 +*.txt diff --git a/example/simulation.py b/example/simulation.py index 7b15c36eae..8e30034ecb 100644 --- a/example/simulation.py +++ b/example/simulation.py @@ -35,18 +35,6 @@ pipeline.run(models=MODELS, resume_after=resume_after) -print "\n#### run completed" - -# write final versions of all checkpointed dataframes to CSV files to review results -for table_name in pipeline.checkpointed_tables(): - file_name = "final_%s_table.csv" % table_name - file_path = os.path.join(orca.get_injectable("output_dir"), file_name) - pipeline.get_table(table_name).to_csv(file_path) - -# write checkpoints -file_path = os.path.join(orca.get_injectable("output_dir"), "checkpoints.csv") -pipeline.get_checkpoints().to_csv(file_path) - # tables will no longer be available after pipeline is closed pipeline.close_pipeline() diff --git a/example_multi/output/.gitignore b/example_multi/output/.gitignore index f4b7708c65..4a2323ec0a 100644 --- a/example_multi/output/.gitignore +++ b/example_multi/output/.gitignore @@ -2,3 +2,4 @@ *.log *.prof *.h5 +*.txt diff --git a/setup.py b/setup.py index b28dbdc338..da10898259 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='activitysim', - version='0.5.1', + version='0.5.2', description='Activity-Based Travel Modeling', author='contributing authors', author_email='ben.stabler@rsginc.com',