Skip to content

Commit

Permalink
Merge 37862e3 into a57efa1
Browse files Browse the repository at this point in the history
  • Loading branch information
jpn-- committed Jul 20, 2021
2 parents a57efa1 + 37862e3 commit 0315ce0
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 32 deletions.
95 changes: 65 additions & 30 deletions activitysim/core/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
MODE_RETRAIN = 'training'
MODE_ADAPTIVE = 'adaptive'
MODE_PRODUCTION = 'production'
TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION]
MODE_CHUNKLESS = 'disabled'
TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION, MODE_CHUNKLESS]

#
# low level
Expand Down Expand Up @@ -135,7 +136,9 @@ def chunk_metric():
def chunk_training_mode():
training_mode = \
SETTINGS.setdefault('chunk_training_mode', config.setting('chunk_training_mode', MODE_ADAPTIVE))
assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode} not one of: {TRAINING_MODES}"
if not training_mode:
training_mode = MODE_CHUNKLESS
assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode}' not one of: {TRAINING_MODES}"
return training_mode


Expand Down Expand Up @@ -223,8 +226,9 @@ def consolidate_logs():
if not glob_files:
return

assert chunk_training_mode() != MODE_PRODUCTION, \
f"shouldn't be any chunk log files when chunk_training_mode is {MODE_PRODUCTION}"
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS), \
f"shouldn't be any chunk log files when chunk_training_mode" \
f" is {MODE_PRODUCTION} or {MODE_CHUNKLESS}"

#
# OMNIBUS_LOG_FILE
Expand Down Expand Up @@ -331,6 +335,9 @@ def load_cached_history(self):
else:
self.have_cached_history = False

if chunk_training_mode() == MODE_CHUNKLESS:
return

if chunk_training_mode() == MODE_PRODUCTION:
# raise RuntimeError(f"chunk_training_mode is {MODE_PRODUCTION} but no chunk_cache: {chunk_cache_path}")

Expand Down Expand Up @@ -379,7 +386,7 @@ def cached_row_size(self, chunk_tag):

def write_history(self, history, chunk_tag):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

history_df = pd.DataFrame.from_dict(history)

Expand Down Expand Up @@ -418,7 +425,7 @@ def __init__(self, trace_label, chunk_size, baseline_rss, baseline_uss, headroom

def audit(self, msg, bytes=0, rss=0, uss=0, from_rss_monitor=False):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

MAX_OVERDRAFT = 0.2

Expand Down Expand Up @@ -483,7 +490,7 @@ def size_it(df):
assert False
return elements, bytes

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

if df is None:
elements, bytes = (0, 0)
Expand Down Expand Up @@ -511,7 +518,7 @@ def size_it(df):

def check_local_hwm(self, hwm_trace_label, rss, uss, total_bytes):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

from_rss_monitor = total_bytes is None

Expand Down Expand Up @@ -563,13 +570,18 @@ def get_hwm_bytes(self):

def log_rss(trace_label, force=False):

if chunk_training_mode() == MODE_CHUNKLESS:
# no memory tracing at all in chunkless mode
return

assert len(CHUNK_LEDGERS) > 0, f"log_rss called without current chunker."

hwm_trace_label = f"{trace_label}.log_rss"

if chunk_training_mode() == MODE_PRODUCTION:
trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
# FIXME - this trace_memory_info call slows things down a lot so it is turned off for now
# trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
# mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
return

rss, uss = mem.trace_memory_info(hwm_trace_label)
Expand All @@ -582,11 +594,11 @@ def log_rss(trace_label, force=False):

def log_df(trace_label, table_name, df):

assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker."

if chunk_training_mode() == MODE_PRODUCTION:
if chunk_training_mode() in (MODE_PRODUCTION, MODE_CHUNKLESS):
return

assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker."

op = 'del' if df is None else 'add'
hwm_trace_label = f"{trace_label}.{op}.{table_name}"

Expand Down Expand Up @@ -624,19 +636,27 @@ class ChunkSizer(object):
def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0):

self.depth = len(CHUNK_SIZERS) + 1
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if self.depth > 1:
# nested chunkers should be unchunked
assert chunk_size == 0
if chunk_training_mode() != MODE_CHUNKLESS:
if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

# if we are in a nested call, then we must be in the scope of active Ledger
# so any rss accumulated so far should be attributed to the parent active ledger
assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS)
parent = CHUNK_SIZERS[-1]
assert parent.chunk_ledger is not None
if self.depth > 1:
# nested chunkers should be unchunked
assert chunk_size == 0

log_rss(trace_label) # give parent a complementary log_rss reading entering sub context
# if we are in a nested call, then we must be in the scope of active Ledger
# so any rss accumulated so far should be attributed to the parent active ledger
assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS)
parent = CHUNK_SIZERS[-1]
assert parent.chunk_ledger is not None

log_rss(trace_label) # give parent a complementary log_rss reading entering sub context
else:
self.rss, self.uss = 0, 0

self.chunk_tag = chunk_tag
self.trace_label = trace_label
Expand Down Expand Up @@ -679,7 +699,8 @@ def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0):

def close(self):

if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and (chunk_training_mode() != MODE_PRODUCTION):
if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and \
(chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)):
_HISTORIAN.write_history(self.history, self.chunk_tag)

_chunk_sizer = CHUNK_SIZERS.pop()
Expand Down Expand Up @@ -755,7 +776,12 @@ def adaptive_rows_per_chunk(self, i):

prev_rss = self.rss
prev_uss = self.uss
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

self.headroom = self.available_headroom(self.uss if chunk_metric() == USS else self.rss)

Expand Down Expand Up @@ -826,14 +852,19 @@ def adaptive_rows_per_chunk(self, i):

# input()

if chunk_training_mode() != MODE_PRODUCTION:
if chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS):
self.cum_rows += self.rows_per_chunk

return self.rows_per_chunk, estimated_number_of_chunks

@contextmanager
def ledger(self):

# don't do anything in chunkless mode
if chunk_training_mode() == MODE_CHUNKLESS:
yield
return

mem_monitor = None

# nested chunkers should be unchunked
Expand Down Expand Up @@ -899,7 +930,8 @@ def chunk_log(trace_label, chunk_tag=None, base=False):

yield

chunk_sizer.adaptive_rows_per_chunk(1)
if chunk_training_mode() != MODE_CHUNKLESS:
chunk_sizer.adaptive_rows_per_chunk(1)

chunk_sizer.close()

Expand Down Expand Up @@ -940,7 +972,8 @@ def adaptive_chunked_choosers(choosers, chunk_size, trace_label, chunk_tag=None)

offset += rows_per_chunk

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()

Expand Down Expand Up @@ -1034,7 +1067,8 @@ def adaptive_chunked_choosers_and_alts(choosers, alternatives, chunk_size, trace
offset += rows_per_chunk
alt_offset = alt_end

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()

Expand Down Expand Up @@ -1074,6 +1108,7 @@ def adaptive_chunked_choosers_by_chunk_id(choosers, chunk_size, trace_label, chu

offset += rows_per_chunk

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()
6 changes: 6 additions & 0 deletions activitysim/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ def filter_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning, module='tables',
message='`np.object` is a deprecated alias')

# beginning pandas version 1.3, various places emit a PerformanceWarning that is
# caught in the "strict" filter above, but which are currently unavoidable for complex models.
# These warning are left as warnings as an invitation for future enhancement.
from pandas.errors import PerformanceWarning
warnings.filterwarnings('default', category=PerformanceWarning)


def handle_standard_args(parser=None):

Expand Down
5 changes: 4 additions & 1 deletion activitysim/core/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ def read_model_coefficient_template(model_settings):
# this makes for a more legible template than repeating the identical coefficient name in each column

# replace missing cell values with coefficient_name from index
template = template.where(~template.isnull(), template.index)
template = template.where(
~template.isnull(),
np.broadcast_to(template.index.values[:, None], template.shape),
)

if template.index.duplicated().any():
dupes = template[template.index.duplicated(keep=False)].sort_index()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
inherit_settings: True

# read cached skims (using numpy memmap) from output directory (memmap is faster than omx )
read_skim_cache: False
# write memmapped cached skims to output directory after reading from omx, for use in subsequent runs
write_skim_cache: False
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
inherit_settings: True

# treat warnings as errors
strict: True

# number of households to simulate
households_sample_size: 10
chunk_size: 0
chunk_training_mode: disabled

# - shadow pricing global switches
use_shadow_pricing: False

# turn writing of sample_tables on and off for all models
# (if True, tables will be written if DEST_CHOICE_SAMPLE_TABLE_NAME is specified in individual model settings)
want_dest_choice_sample_tables: False

cleanup_pipeline_after_run: True

output_tables:
h5_store: False
action: include
prefix: final_
sort: True
tables:
- trips
10 changes: 9 additions & 1 deletion activitysim/examples/example_mtc/test/test_mtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def teardown_function(func):
inject.reinject_decorated_tables()


def run_test_mtc(multiprocess=False):
def run_test_mtc(multiprocess=False, chunkless=False):

def example_path(dirname):
resource = os.path.join('examples', 'example_mtc', dirname)
Expand All @@ -38,6 +38,9 @@ def regress():
if multiprocess:
run_args = ['-c', test_path('configs_mp'), '-c', example_path('configs_mp'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
elif chunkless:
run_args = ['-c', test_path('configs_chunkless'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
else:
run_args = ['-c', test_path('configs'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
Expand All @@ -51,6 +54,10 @@ def test_mtc():
run_test_mtc(multiprocess=False)


def test_mtc_chunkless():
run_test_mtc(multiprocess=False, chunkless=True)


def test_mtc_mp():
run_test_mtc(multiprocess=True)

Expand All @@ -59,3 +66,4 @@ def test_mtc_mp():

run_test_mtc(multiprocess=False)
run_test_mtc(multiprocess=True)
run_test_mtc(multiprocess=False, chunkless=True)

0 comments on commit 0315ce0

Please sign in to comment.