From 0f6cbd682c3dc3bf88b5dc129d89c142a673a34a Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Sat, 28 Jan 2017 08:54:33 -0800 Subject: [PATCH] Updates places in SDK that creates thread pools. Moves ThreadPool creation to a util function. Records and resets logging level due to this being reset by apitools when used with a ThreadPool. --- sdks/python/apache_beam/internal/util.py | 33 +++++++++++++++++++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++------- sdks/python/apache_beam/io/fileio.py | 11 ++----- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 2d12d49fa649..5b31e88ee271 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -17,6 +17,11 @@ """Utility functions used throughout the package.""" +import logging +from multiprocessing.pool import ThreadPool +import threading +import weakref + class ArgumentPlaceholder(object): """A place holder object replacing PValues in argument lists. @@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values): (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v) for k, v in sorted(kwargs.iteritems())) return (new_args, new_kwargs) + + +def run_using_threadpool(fn_to_execute, inputs, pool_size): + """Runs the given function on given inputs using a thread pool. + + Args: + fn_to_execute: Function to execute + inputs: Inputs on which given function will be executed in parallel. + pool_size: Size of thread pool. + Returns: + Results retrieved after executing the given function on given inputs. + """ + + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): + threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool(min(pool_size, len(inputs))) + try: + # We record and reset logging level here since 'apitools' library Beam + # depends on updates the logging level when used with a threadpool - + # https://github.com/google/apitools/issues/141 + # TODO: Remove this once above issue in 'apitools' is fixed. + old_level = logging.getLogger().level + return pool.map(fn_to_execute, inputs) + finally: + pool.terminate() + logging.getLogger().setLevel(old_level) diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 1bfde258f0b2..582d673889c1 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,11 +26,9 @@ """ import random -import threading -import weakref -from multiprocessing.pool import ThreadPool from apache_beam.internal import pickler +from apache_beam.internal import util from apache_beam.io import concat_source from apache_beam.io import fileio from apache_beam.io import iobase @@ -158,16 +156,9 @@ def _estimate_sizes_of_files(file_names, pattern=None): return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: if pattern is None: - # ThreadPool crashes in old versions of Python (< 2.7.5) if created - # from a child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - pool = ThreadPool( - min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) - try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) - finally: - pool.terminate() + return util.run_using_threadpool( + fileio.ChannelFactory.size_in_bytes, file_names, + MAX_NUM_THREADS_FOR_SIZE_ESTIMATION) else: file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, file_names) diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f67dca9c3168..97cf387589d3 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -22,16 +22,14 @@ import cStringIO import glob import logging -from multiprocessing.pool import ThreadPool import os import re import shutil -import threading import time import zlib -import weakref from apache_beam import coders +from apache_beam.internal import util from apache_beam.io import gcsio from apache_beam.io import iobase from apache_beam.transforms.display import DisplayDataItem @@ -663,11 +661,8 @@ def _rename_batch(batch): logging.debug('Rename successful: %s -> %s', src, dest) return exceptions - # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a - # child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - exception_batches = ThreadPool(num_threads).map(_rename_batch, batches) + exception_batches = util.run_using_threadpool( + _rename_batch, batches, num_threads) all_exceptions = [] for exceptions in exception_batches: