Skip to content

Commit

Permalink
output_dir log subdir
Browse files Browse the repository at this point in the history
  • Loading branch information
toliwaga committed Nov 6, 2018
1 parent 1c0abab commit 5c0e3d9
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 70 deletions.
2 changes: 1 addition & 1 deletion activitysim/abm/models/atwork_subtour_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

logger = logging.getLogger(__name__)

DUMP = True
DUMP = False


@inject.step()
Expand Down
23 changes: 9 additions & 14 deletions activitysim/core/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
logger = logging.getLogger(__name__)

# dict of table_dicts keyed by trace_label
# table_dicts are dicts tuples of (elements, bytes, mem) keyed by table_name
# table_dicts are dicts tuples of {table_name: (elements, bytes, mem), ...}
CHUNK_LOG = OrderedDict()

# array of chunk_size active CHUNK_LOG
Expand Down Expand Up @@ -171,22 +171,22 @@ def log_write_hwm():
d = HWM[0]
for tag in d:
hwm = d[tag]
logger.info("high_water_mark %s: %s (%s) in %s" %
logger.info("#chunk_hwm high_water_mark %s: %s (%s) in %s" %
(tag, hwm['mark'], hwm['info'], hwm['trace_label']), )

# - elements shouldn't exceed chunk_size or effective_chunk_size of base chunker
def check_chunk_size(hwm, chunk_size, label, max_leeway):
elements = hwm['mark']
if chunk_size and max_leeway and elements > chunk_size * max_leeway: # too high
logger.warn("total_elements (%s) > %s (%s) %s : %s " %
logger.warn("#chunk_hwm total_elements (%s) > %s (%s) %s : %s " %
(commas(elements), label, commas(chunk_size),
hwm['info'], hwm['trace_label']))

# if we are in a chunker
if len(HWM) > 1 and HWM[1]:
assert 'elements' in HWM[1] # expect an 'elements' hwm dict for base chunker
hwm = HWM[1].get('elements')
check_chunk_size(hwm, EFFECTIVE_CHUNK_SIZE[0], 'effective_chunk_size', max_leeway=1)
check_chunk_size(hwm, EFFECTIVE_CHUNK_SIZE[0], 'effective_chunk_size', max_leeway=1.05)
check_chunk_size(hwm, CHUNK_SIZE[0], 'chunk_size', max_leeway=1)


Expand All @@ -203,16 +203,11 @@ def rows_per_chunk(chunk_size, row_size, num_choosers, trace_label):

# chunks = int(ceil(num_choosers / float(rpc)))
effective_chunk_size = row_size * rpc
#
# logger.debug("%s #chunk_calc chunk_size %s" % (trace_label, chunk_size))
# logger.debug("%s #chunk_calc num_choosers %s" % (trace_label, num_choosers))
# logger.debug("%s #chunk_calc total row_size %s" % (trace_label, row_size))
# logger.debug("%s #chunk_calc rows_per_chunk %s" % (trace_label, rpc))
# logger.debug("%s #chunk_calc effective_chunk_size %s" % (trace_label, effective_chunk_size))
# logger.debug("%s #chunk_calc chunks %s" % (trace_label, chunks))

logger.info("%s #chunk_calc rows_per_chunk %s, effective_chunk_size %s, num_choosers %s" %
(trace_label, rpc, effective_chunk_size, num_choosers))
num_chunks = (num_choosers // rpc) + (num_choosers % rpc > 0)

logger.info("#chunk_calc num_chunks: %s, rows_per_chunk: %s, "
"effective_chunk_size: %s, num_choosers: %s : %s" %
(num_chunks, rpc, effective_chunk_size, num_choosers, trace_label))

return rpc, effective_chunk_size

Expand Down
21 changes: 20 additions & 1 deletion activitysim/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import argparse
import os
import yaml
import sys

import logging
from activitysim.core import inject
Expand Down Expand Up @@ -230,6 +231,7 @@ def trace_file_path(file_name):

output_dir = inject.get_injectable('output_dir')

# - check for optional trace subfolder
if os.path.exists(os.path.join(output_dir, 'trace')):
output_dir = os.path.join(output_dir, 'trace')
else:
Expand All @@ -241,8 +243,25 @@ def trace_file_path(file_name):

def log_file_path(file_name):

output_dir = inject.get_injectable('output_dir')

# - check for optional log subfolder
if os.path.exists(os.path.join(output_dir, 'log')):
output_dir = os.path.join(output_dir, 'log')

# - check for optional process name prefix
prefix = inject.get_injectable('log_file_prefix', None)
return build_output_file_path(file_name, use_prefix=prefix)
if prefix:
file_name = "%s-%s" % (prefix, file_name)

file_path = os.path.join(output_dir, file_name)

return file_path


def open_log_file(file_name, mode):
mode = mode + 'b' if sys.version_info < (3,) else mode
return open(log_file_path(file_name), mode)


def pipeline_file_path(file_name):
Expand Down
17 changes: 7 additions & 10 deletions activitysim/core/mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import psutil
import logging
import gc
import sys


from activitysim.core import config

Expand Down Expand Up @@ -60,12 +60,11 @@ def log_hwm():
logger.info("high water mark %s: %s timestamp: %s label: %s" %
(tag, hwm['mark'], hwm['timestamp'], hwm['label']))

mode = 'ab' if sys.version_info < (3,) else 'a'
with open(config.output_file_path(MEM['file_name']), mode) as file:
with config.open_log_file(MEM['file_name'], 'a') as log_file:
for tag in HWM:
hwm = HWM[tag]
print("high water mark %s: %.2f timestamp: %s label: %s" %
(tag, hwm['mark'], hwm['timestamp'], hwm['label']), file=file)
(tag, hwm['mark'], hwm['timestamp'], hwm['label']), file=log_file)


def trace_memory_info(event=''):
Expand All @@ -83,9 +82,8 @@ def trace_memory_info(event=''):
vmi = psutil.virtual_memory()

if last_tick == 0:
mode = 'wb' if sys.version_info < (3,) else 'w'
with open(config.output_file_path(MEM['file_name']), mode) as file:
print("time,rss,used,available,percent,event", file=file)
with config.open_log_file(MEM['file_name'], 'w') as log_file:
print("time,rss,used,available,percent,event", file=log_file)

MEM['tick'] = t

Expand All @@ -105,16 +103,15 @@ def trace_memory_info(event=''):
# logger.debug("memory_info: rss: %s available: %s percent: %s"
# % (GB(mi.rss), GB(vmi.available), GB(vmi.percent)))

mode = 'ab' if sys.version_info < (3,) else 'a'
with open(config.output_file_path(MEM['file_name']), mode) as file:
with config.open_log_file(MEM['file_name'], 'a') as output_file:

print("%s, %.2f, %.2f, %.2f, %s%%, %s" %
(timestamp,
GB(rss),
GB(vmi.used),
GB(vmi.available),
vmi.percent,
event), file=file)
event), file=output_file)


def get_memory_info():
Expand Down
9 changes: 4 additions & 5 deletions activitysim/core/mp_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,11 @@ def mp_run_simulation(queue, injectables, step_info, resume_after, **kwargs):

setup_injectables_and_logging(injectables)

mem.init_trace(setting('mem_tick'),
file_name="mem_%s.csv" % multiprocessing.current_process().name)
mem.init_trace(setting('mem_tick'))

if step_info['num_processes'] > 1:
pipeline_prefix = multiprocessing.current_process().name
logger.info("injecting pipeline_file_prefix '%s'", pipeline_prefix)
logger.debug("injecting pipeline_file_prefix '%s'", pipeline_prefix)
inject.add_injectable("pipeline_file_prefix", pipeline_prefix)

if setting('profile', False):
Expand Down Expand Up @@ -875,8 +874,8 @@ def get_run_list():
multiprocess_steps[istep]['resume_after'] = resume_after

# - write run list to output dir
mode = 'wb' if sys.version_info < (3,) else 'w'
with open(config.output_file_path('run_list.txt'), mode) as f:
# use log_file_path so we use (optional) log subdir and prefix process name
with config.open_log_file('run_list.txt', 'w') as f:
print_run_list(run_list, f)

return run_list
Expand Down
30 changes: 15 additions & 15 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,39 @@ def track_skim_usage(output_dir):
skim_stack = inject.get_injectable('skim_stack', None)

mode = 'wb' if sys.version_info < (3,) else 'w'
with open(config.output_file_path('skim_usage.txt'), mode) as file:
with open(config.output_file_path('skim_usage.txt'), mode) as output_file:

print("\n### skim_dict usage", file=file)
print("\n### skim_dict usage", file=output_file)
for key in skim_dict.usage:
print(key, file=file)
print(key, file=output_file)

if skim_stack is None:

unused_keys = {k for k in skim_dict.skim_info['omx_keys']} - \
{k for k in skim_dict.usage}

print("\n### unused skim keys", file=file)
print("\n### unused skim keys", file=output_file)
for key in unused_keys:
print(key, file=file)
print(key, file=output_file)

else:

print("\n### skim_stack usage", file=file)
print("\n### skim_stack usage", file=output_file)
for key in skim_stack.usage:
print(key, file=file)
print(key, file=output_file)

unused = {k for k in skim_dict.skim_info['omx_keys'] if not isinstance(k, tuple)} - \
{k for k in skim_dict.usage if not isinstance(k, tuple)}
print("\n### unused skim str keys", file=file)
print("\n### unused skim str keys", file=output_file)
for key in unused:
print(key, file=file)
print(key, file=output_file)

unused = {k[0] for k in skim_dict.skim_info['omx_keys'] if isinstance(k, tuple)} - \
{k[0] for k in skim_dict.usage if isinstance(k, tuple)} - \
{k for k in skim_stack.usage}
print("\n### unused skim dim3 keys", file=file)
print("\n### unused skim dim3 keys", file=output_file)
for key in unused:
print(key, file=file)
print(key, file=output_file)


def write_data_dictionary(output_dir):
Expand All @@ -93,13 +93,13 @@ def write_data_dictionary(output_dir):
# write data dictionary for all checkpointed_tables

mode = 'wb' if sys.version_info < (3,) else 'w'
with open(config.output_file_path('data_dict.txt'), mode) as file:
with open(config.output_file_path('data_dict.txt'), mode) as output_file:
for table_name in output_tables:
df = inject.get_table(table_name, None).to_frame()

print("\n### %s %s" % (table_name, df.shape), file=file)
print('index:', df.index.name, df.index.dtype, file=file)
print(df.dtypes, file=file)
print("\n### %s %s" % (table_name, df.shape), file=output_file)
print('index:', df.index.name, df.index.dtype, file=output_file)
print(df.dtypes, file=output_file)


# def xwrite_data_dictionary(output_dir):
Expand Down
41 changes: 22 additions & 19 deletions activitysim/core/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,33 @@ def delete_output_files(file_type, ignore=None, subdir=None):

output_dir = inject.get_injectable('output_dir')

if subdir:
output_dir = os.path.join(output_dir, subdir)
if not os.path.exists(output_dir):
logger.warn("delete_output_files: No subdirectory %s" % (file_type, output_dir))
return
directories = ['', 'log', 'trace']

for subdir in directories:

dir = os.path.join(output_dir, subdir) if subdir else output_dir

if not os.path.exists(dir):
continue

if ignore:
ignore = [os.path.realpath(p) for p in ignore]
if ignore:
ignore = [os.path.realpath(p) for p in ignore]

logger.debug("Deleting %s files in output_dir %s" % (file_type, output_dir))
# logger.debug("Deleting %s files in output dir %s" % (file_type, dir))

for the_file in os.listdir(output_dir):
if the_file.endswith(file_type):
file_path = os.path.join(output_dir, the_file)
for the_file in os.listdir(dir):
if the_file.endswith(file_type):
file_path = os.path.join(dir, the_file)

if ignore and os.path.realpath(file_path) in ignore:
logger.info("delete_output_files ignoring %s" % file_path)
continue
if ignore and os.path.realpath(file_path) in ignore:
logger.debug("delete_output_files ignoring %s" % file_path)
continue

try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
print(e)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
print(e)


def delete_csv_files():
Expand Down
17 changes: 13 additions & 4 deletions example_mp/configs/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ inherit_settings: True

# - dev config
multiprocess: True
num_processes: 1
num_processes: 3
stagger: 30
mem_tick: 5
mem_tick: 30
profile: False
strict: False

Expand All @@ -25,9 +25,18 @@ strict: False
#households_sample_size: 1366361
#chunk_size: 7500000000

# - 20% sample
#households_sample_size: 546544
#chunk_size: 3000000000

# - 10% sample
households_sample_size: 273272
chunk_size: 1500000000
#households_sample_size: 273272
#chunk_size: 1500000000

# - 3.3% sample
households_sample_size: 91091
chunk_size: 500000000


# - tracing
trace_hh_id:
Expand Down
21 changes: 20 additions & 1 deletion example_mp/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def cleanup_output_files():

tracing.delete_output_files('h5')
tracing.delete_output_files('csv')
tracing.delete_output_files('csv', subdir='trace')
tracing.delete_output_files('txt')
tracing.delete_output_files('yaml')
tracing.delete_output_files('prof')
Expand All @@ -48,6 +47,24 @@ def run(run_list, injectables=None):
chunk.log_write_hwm()


def log_settings():

settings = [
'households_sample_size',
'chunk_size',
'multiprocess',
'num_processes',
'resume_after',
]

for k in settings:
logger.info("setting %s: %s" % (k, config.setting(k)))

injectables = ['data_dir', 'configs_dir', 'output_dir']
for k in injectables:
logger.info("injectable %s: %s" % (k, inject.get_injectable(k)))


if __name__ == '__main__':

# inject.add_injectable('data_dir', '/Users/jeff.doyle/work/activitysim-data/mtc_tm1/data')
Expand All @@ -59,6 +76,8 @@ def run(run_list, injectables=None):
mp_tasks.filter_warnings()
tracing.config_logger()

log_settings()

t0 = tracing.print_elapsed_time()

# cleanup if not resuming
Expand Down

0 comments on commit 5c0e3d9

Please sign in to comment.