Skip to content

Commit

Permalink
Merge pull request #441 from JoostJM/thread-safe_logging
Browse files Browse the repository at this point in the history
Thread safe logging
  • Loading branch information
JoostJM committed Oct 22, 2018
2 parents d1a55eb + 9f4eb63 commit ab1bcaa
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 66 deletions.
129 changes: 101 additions & 28 deletions radiomics/scripts/__init__.py
Expand Up @@ -2,10 +2,12 @@
import argparse
import csv
from functools import partial
import logging
from multiprocessing import cpu_count, Pool
import logging.config
import logging.handlers
from multiprocessing import cpu_count, Manager, Pool
import os
import sys
import threading

from pykwalify.compat import yaml
import pykwalify.core
Expand All @@ -16,7 +18,6 @@


scriptlogger = logging.getLogger('radiomics.script') # holds logger for script events
logging_config = {}
relative_path_start = os.getcwd()


Expand Down Expand Up @@ -95,28 +96,37 @@ def parse_args(custom_arguments=None):

args = parser.parse_args(args=custom_arguments) # Exits with code 2 if parsing fails

# variable to hold the listener needed for processing parallel log records
queue_listener = None

# Run the extraction
try:
_configureLogging(args)
logging_config, queue_listener = _configureLogging(args)
scriptlogger.info('Starting PyRadiomics (version: %s)', radiomics.__version__)
input_tuple = _processInput(args)
if input_tuple is not None:
if args.validate:
_validateCases(*input_tuple)
else:
results = _extractSegment(*input_tuple)
results = _extractSegment(*input_tuple, logging_config=logging_config)
segment.processOutput(results, args.out, args.skip_nans, args.format, args.format_path, relative_path_start)
scriptlogger.info('Finished extraction successfully...')
else:
return 1 # Feature extraction error
except (KeyboardInterrupt, SystemExit):
scriptlogger.info('Cancelling Extraction')
return -1
except Exception:
scriptlogger.error('Error extracting features!', exc_info=True)
return 3 # Unknown error
finally:
if queue_listener is not None:
queue_listener.stop()
return 0 # success


def _processInput(args):
global logging_config, relative_path_start, scriptlogger
global relative_path_start, scriptlogger
scriptlogger.info('Processing input...')

caseCount = 1
Expand Down Expand Up @@ -166,12 +176,26 @@ def _processInput(args):
return caseGenerator, caseCount, num_workers


def _extractSegment(case_generator, case_count, num_workers):
def _extractSegment(case_generator, case_count, num_workers, logging_config):
if num_workers > 1: # multiple cases, parallel processing enabled
scriptlogger.info('Input valid, starting parallel extraction from %d cases with %d workers...',
case_count, num_workers)
pool = Pool(num_workers)
results = pool.map(partial(segment.extractSegment_parallel, parallel_config=logging_config), case_generator)
try:
task = pool.map_async(partial(segment.extractSegment_parallel, logging_config=logging_config),
case_generator,
chunksize=min(10, case_count))
# Wait for the results to be done. task.get() without timeout performs a blocking call, which prevents
# the program from processing the KeyboardInterrupt if it occurs
while not task.ready():
pass
results = task.get()
pool.close()
except (KeyboardInterrupt, SystemExit):
pool.terminate()
raise
finally:
pool.join()
elif num_workers == 1: # single case or sequential batch processing
scriptlogger.info('Input valid, starting sequential extraction from %d case(s)...',
case_count)
Expand All @@ -181,7 +205,7 @@ def _extractSegment(case_generator, case_count, num_workers):
else:
# No cases defined in the batch
scriptlogger.error('No cases to process...')
return None
results = None
return results


Expand All @@ -199,6 +223,8 @@ def _validateCases(case_generator, case_count, num_workers):
c = pykwalify.core.Core(source_file=param, schema_files=[schemaFile], extensions=[schemaFuncs])
try:
c.validate()
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
scriptlogger.error('Parameter validation failed!\n%s' % e.message)
scriptlogger.debug("Validating case (%i/%i): %s", case_idx, case_count, case)
Expand Down Expand Up @@ -289,36 +315,83 @@ def parse_value(value, value_type):
setting_overrides[setting_key] = parse_value(setting_value, setting_type)
scriptlogger.debug('Parsed "%s" as type "%s"; value: %s', setting_key, setting_type, setting_overrides[setting_key])

except (KeyboardInterrupt, SystemExit):
raise
except Exception:
scriptlogger.warning('Could not parse value "%s" for setting "%s", skipping...', setting_value, setting_key)

return setting_overrides


def _configureLogging(args):
global scriptlogger, logging_config
global scriptlogger

# Initialize Logging
logLevel = getattr(logging, args.logging_level)
rLogger = radiomics.logger
logging_config['logLevel'] = logLevel
# Listener to process log messages from child processes in case of multiprocessing
queue_listener = None

fmt = logging.Formatter("[%(asctime)-.19s] %(levelname)-.1s: %(name)s: %(message)s")
rLogger.handlers[0].setFormatter(fmt)
logfileLevel = getattr(logging, args.logging_level)
verboseLevel = (6 - args.verbosity) * 10 # convert to python logging level
logger_level = min(logfileLevel, verboseLevel)

logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)-.1s: %(name)s: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': verboseLevel,
'formatter': 'default'
}
},
'loggers': {
'radiomics': {
'level': logger_level,
'handlers': ['console']
}
}
}

# Set up optional logging to file
if args.log_file is not None:
logging_config['logFile'] = args.log_file
logHandler = logging.FileHandler(filename=args.log_file, mode='a')
logHandler.setFormatter(fmt)
logHandler.setLevel(logLevel)
rLogger.addHandler(logHandler)
if rLogger.level > logHandler.level:
rLogger.level = logHandler.level

# Set verbosity of output (stderr)
verboseLevel = (6 - args.verbosity) * 10 # convert to python logging level
radiomics.setVerbosity(verboseLevel)
logging_config['verbosity'] = verboseLevel
if args.jobs > 1:
# Multiprocessing! Use a QueueHandler, FileHandler and QueueListener
# to implement thread-safe logging.
q = Manager().Queue(-1)
threading.current_thread().setName('Main')

logging_config['formatters']['default']['format'] = \
'[%(asctime)s] %(levelname)-.1s: (%(threadName)s) %(name)s: %(message)s'

logging_config['handlers']['logfile'] = {
'class': 'logging.handlers.QueueHandler',
'queue': q,
'level': logfileLevel,
'formatter': 'default'
}

file_handler = logging.FileHandler(filename=args.log_file, mode='a')
file_handler.setFormatter(logging.Formatter(fmt=logging_config['formatters']['default'].get('format'),
datefmt=logging_config['formatters']['default'].get('datefmt')))

queue_listener = logging.handlers.QueueListener(q, file_handler)
queue_listener.start()
else:
logging_config['handlers']['logfile'] = {
'class': 'logging.FileHandler',
'filename': args.log_file,
'mode': 'a',
'level': logfileLevel,
'formatter': 'default'
}
logging_config['loggers']['radiomics']['handlers'].append('logfile')

logging.config.dictConfig(logging_config)

scriptlogger.debug('Logging initialized')
return logging_config, queue_listener
75 changes: 37 additions & 38 deletions radiomics/scripts/segment.py
Expand Up @@ -3,15 +3,15 @@
from datetime import datetime
from functools import partial
import json
import logging
import logging.config
import os
import threading

import numpy
import SimpleITK as sitk
import six

from radiomics import featureextractor, setVerbosity
import radiomics.featureextractor

caseLogger = logging.getLogger('radiomics.script')
_parallel_extraction_configured = False
Expand All @@ -24,6 +24,7 @@ def extractSegment(case_idx, case, config, config_override):
feature_vector = OrderedDict(case)

try:
caseLogger.info('Processing case %s', case_idx)
t = datetime.now()

imageFilepath = case['Image'] # Required
Expand All @@ -33,27 +34,37 @@ def extractSegment(case_idx, case, config, config_override):
label = int(label)

# Instantiate Radiomics Feature extractor
extractor = featureextractor.RadiomicsFeaturesExtractor(config, **config_override)
extractor = radiomics.featureextractor.RadiomicsFeaturesExtractor(config, **config_override)

# Extract features
feature_vector.update(extractor.execute(imageFilepath, maskFilepath, label))

# Display message
delta_t = datetime.now() - t
caseLogger.info('Patient %s processed in %s', case_idx, delta_t)
caseLogger.info('Case %s processed in %s', case_idx, delta_t)

except (KeyboardInterrupt, SystemExit): # Cancel extraction by forwarding this 'error'
raise
except SystemError:
# Occurs when Keyboard Interrupt is caught while the thread is processing a SimpleITK call
raise KeyboardInterrupt()
except Exception:
caseLogger.error('Feature extraction failed!', exc_info=True)

return feature_vector


def extractSegment_parallel(args, parallel_config=None):
if parallel_config is not None:
# set thread name to patient name
threading.current_thread().name = 'case %s' % args[0] # args[0] = case_idx
_configurParallelExtraction(parallel_config)
return extractSegment(*args)
def extractSegment_parallel(args, logging_config=None):
try:
if logging_config is not None:
# set thread name to patient name
threading.current_thread().name = 'case %s' % args[0] # args[0] = case_idx
_configureParallelExtraction(logging_config)
return extractSegment(*args)
except (KeyboardInterrupt, SystemExit):
# Catch the error here, as this represents the interrupt of the child process.
# The main process is also interrupted, and cancellation is further handled there
return None


def extractSegmentWithTempFiles(case_idx, case, config, config_override, temp_dir):
Expand Down Expand Up @@ -84,12 +95,17 @@ def extractSegmentWithTempFiles(case_idx, case, config, config_override, temp_di
return feature_vector


def extractSegmentWithTempFiles_parallel(args, parallel_config=None):
if parallel_config is not None:
_configurParallelExtraction(parallel_config)
# set thread name to patient name
threading.current_thread().name = 'case %s' % args[0] # args[0] = case_idx
return extractSegmentWithTempFiles(*args)
def extractSegmentWithTempFiles_parallel(args, logging_config=None):
try:
if logging_config is not None:
# set thread name to patient name
threading.current_thread().name = 'case %s' % args[0] # args[0] = case_idx
_configureParallelExtraction(logging_config)
return extractSegmentWithTempFiles(*args)
except (KeyboardInterrupt, SystemExit):
# Catch the error here, as this represents the interrupt of the child process.
# The main process is also interrupted, and cancellation is further handled there
return None


def processOutput(results,
Expand Down Expand Up @@ -145,7 +161,7 @@ def processOutput(results,
outStream.write('Case-%d_%s: %s\n' % (case_idx, k, v))


def _configurParallelExtraction(parallel_config):
def _configureParallelExtraction(logging_config, add_info_filter=True):
"""
Initialize logging for parallel extraction. This needs to be done here, as it needs to be done for each thread that is
created.
Expand All @@ -157,23 +173,9 @@ def _configurParallelExtraction(parallel_config):
# Configure logging
###################

rLogger = logging.getLogger('radiomics')

# Add logging to file is specified
logFile = parallel_config.get('logFile', None)
if logFile is not None:
logHandler = logging.FileHandler(filename=logFile, mode='a')
logHandler.setLevel(parallel_config.get('logLevel', logging.INFO))
rLogger.addHandler(logHandler)
if rLogger.level > logHandler.level:
rLogger.level = logHandler.level
logging.config.dictConfig(logging_config)

# Include thread name in Log-message output for all handlers.
parallelFormatter = logging.Formatter('[%(asctime)-.19s] %(levelname)-.1s: (%(threadName)s) %(name)s: %(message)s')
for h in rLogger.handlers:
h.setFormatter(parallelFormatter)

if parallel_config.get('addFilter', True):
if add_info_filter:
# Define filter that allows messages from specified filter and level INFO and up, and level WARNING and up from
# other loggers.
class info_filter(logging.Filter):
Expand All @@ -191,16 +193,13 @@ def filter(self, record):
# Adding the filter to the first handler of the radiomics logger limits the info messages on the output to just
# those from radiomics.script, but warnings and errors from the entire library are also printed to the output.
# This does not affect the amount of logging stored in the log file.
outputhandler = rLogger.handlers[0] # Handler printing to the output
outputhandler = radiomics.logger.handlers[0] # Handler printing to the output
outputhandler.addFilter(info_filter('radiomics.script'))

# Ensures that log messages are being passed to the filter with the specified level
setVerbosity(parallel_config.get('verbosity', logging.INFO))

# Ensure the entire extraction for each cases is handled on 1 thread
####################################################################

sitk.ProcessObject_SetGlobalDefaultNumberOfThreads(1)

_parallel_extraction_configured = True
rLogger.debug('parallel extraction configured')
radiomics.logger.debug('parallel extraction configured')

0 comments on commit ab1bcaa

Please sign in to comment.