From f4b80ea47fc3a3d4c1ba901e646c47981483eabd Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Wed, 10 May 2017 01:44:56 -0700 Subject: [PATCH] Renames some python classes and functions that were unnecessarily public. Adds a note to documentation of classes that are public but should be only used internally by the SDK (non-user facing classes). Marks some of the modules as experimental. --- sdks/python/apache_beam/coders/coder_impl.py | 57 +++++++++++---- sdks/python/apache_beam/coders/observable.py | 4 +- sdks/python/apache_beam/coders/slow_stream.py | 16 +++-- sdks/python/apache_beam/coders/typecoders.py | 4 +- sdks/python/apache_beam/internal/gcp/auth.py | 18 +++-- .../apache_beam/internal/gcp/json_value.py | 12 +++- sdks/python/apache_beam/internal/pickler.py | 26 ++++--- sdks/python/apache_beam/internal/util.py | 16 +++-- sdks/python/apache_beam/io/concat_source.py | 8 ++- sdks/python/apache_beam/metrics/cells.py | 32 ++++++--- sdks/python/apache_beam/metrics/execution.py | 7 +- .../runners/api/beam_fn_api_pb2.py | 2 + .../runners/api/beam_fn_api_pb2_grpc.py | 2 + .../runners/api/beam_runner_api_pb2.py | 2 + sdks/python/apache_beam/runners/common.pxd | 4 +- sdks/python/apache_beam/runners/common.py | 28 +++++--- .../runners/dataflow/internal/apiclient.py | 4 +- .../runners/dataflow/internal/dependency.py | 16 +++-- .../runners/dataflow/internal/names.py | 8 ++- .../runners/dataflow/native_io/iobase.py | 2 + .../runners/direct/bundle_factory.py | 16 +++-- .../apache_beam/runners/direct/clock.py | 5 +- .../consumer_tracking_pipeline_visitor.py | 4 +- .../apache_beam/runners/direct/executor.py | 63 ++++++++-------- .../runners/direct/transform_evaluator.py | 8 ++- .../runners/direct/transform_result.py | 4 +- .../runners/direct/watermark_manager.py | 10 +-- .../apache_beam/runners/pipeline_context.py | 4 +- .../runners/portability/fn_api_runner.py | 2 + .../portability/maptask_executor_runner.py | 2 + sdks/python/apache_beam/runners/runner.py | 8 ++- .../apache_beam/runners/worker/data_plane.py | 2 + .../apache_beam/runners/worker/log_handler.py | 2 + .../apache_beam/runners/worker/logger.py | 2 + .../apache_beam/runners/worker/opcounters.py | 2 + .../runners/worker/operation_specs.py | 2 + .../apache_beam/runners/worker/sdk_worker.py | 4 ++ .../runners/worker/sdk_worker_main.py | 2 + .../apache_beam/runners/worker/sideinputs.py | 2 + .../runners/worker/statesampler_fake.py | 2 + sdks/python/apache_beam/transforms/core.py | 12 +++- .../apache_beam/transforms/ptransform.py | 14 ++-- sdks/python/apache_beam/transforms/trigger.py | 72 +++++++++---------- 43 files changed, 346 insertions(+), 166 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index d56606d3962d..a0496a2bbfb5 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -50,6 +50,7 @@ class CoderImpl(object): + """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, stream, nested): """Reads object from potentially-nested encoding in stream.""" @@ -97,7 +98,9 @@ def get_estimated_size_and_observables(self, value, nested=False): class SimpleCoderImpl(CoderImpl): - """Subclass of CoderImpl implementing stream methods using encode/decode.""" + """For internal use only; no backwards-compatibility guarantees. + + Subclass of CoderImpl implementing stream methods using encode/decode.""" def encode_to_stream(self, value, stream, nested): """Reads object from potentially-nested encoding in stream.""" @@ -109,7 +112,9 @@ def decode_from_stream(self, stream, nested): class StreamCoderImpl(CoderImpl): - """Subclass of CoderImpl implementing encode/decode using stream methods.""" + """For internal use only; no backwards-compatibility guarantees. + + Subclass of CoderImpl implementing encode/decode using stream methods.""" def encode(self, value): out = create_OutputStream() @@ -127,7 +132,9 @@ def estimate_size(self, value, nested=False): class CallbackCoderImpl(CoderImpl): - """A CoderImpl that calls back to the _impl methods on the Coder itself. + """For internal use only; no backwards-compatibility guarantees. + + A CoderImpl that calls back to the _impl methods on the Coder itself. This is the default implementation used if Coder._get_impl() is not overwritten. @@ -166,6 +173,7 @@ def get_estimated_size_and_observables(self, value, nested=False): class DeterministicFastPrimitivesCoderImpl(CoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" def __init__(self, coder, step_label): self._underlying_coder = coder @@ -208,6 +216,7 @@ def get_estimated_size_and_observables(self, value, nested=False): class ProtoCoderImpl(SimpleCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" def __init__(self, proto_message_type): self.proto_message_type = proto_message_type @@ -235,6 +244,7 @@ def decode(self, encoded): class FastPrimitivesCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" def __init__(self, fallback_coder_impl): self.fallback_coder_impl = fallback_coder_impl @@ -319,7 +329,9 @@ def decode_from_stream(self, stream, nested): class BytesCoderImpl(CoderImpl): - """A coder for bytes/str objects.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder for bytes/str objects.""" def encode_to_stream(self, value, out, nested): out.write(value, nested) @@ -336,6 +348,7 @@ def decode(self, encoded): class FloatCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): out.write_bigendian_double(value) @@ -349,6 +362,8 @@ def estimate_size(self, unused_value, nested=False): class IntervalWindowCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + # TODO: Fn Harness only supports millis. Is this important enough to fix? def _to_normal_time(self, value): """Convert "lexicographically ordered unsigned" to signed.""" @@ -379,6 +394,8 @@ def estimate_size(self, value, nested=False): class TimestampCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + def encode_to_stream(self, value, out, nested): out.write_bigendian_int64(value.micros) @@ -395,7 +412,9 @@ def estimate_size(self, unused_value, nested=False): class VarIntCoderImpl(StreamCoderImpl): - """A coder for long/int objects.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder for long/int objects.""" def encode_to_stream(self, value, out, nested): out.write_var_int64(value) @@ -422,7 +441,9 @@ def estimate_size(self, value, nested=False): class SingletonCoderImpl(CoderImpl): - """A coder that always encodes exactly one value.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder that always encodes exactly one value.""" def __init__(self, value): self._value = value @@ -445,7 +466,9 @@ def estimate_size(self, value, nested=False): class AbstractComponentCoderImpl(StreamCoderImpl): - """CoderImpl for coders that are comprised of several component coders.""" + """For internal use only; no backwards-compatibility guarantees. + + CoderImpl for coders that are comprised of several component coders.""" def __init__(self, coder_impls): for c in coder_impls: @@ -507,7 +530,9 @@ def _construct_from_components(self, components): class SequenceCoderImpl(StreamCoderImpl): - """A coder for sequences. + """For internal use only; no backwards-compatibility guarantees. + + A coder for sequences. If the length of the sequence in known we encode the length as a 32 bit ``int`` followed by the encoded bytes. @@ -611,21 +636,27 @@ def get_estimated_size_and_observables(self, value, nested=False): class TupleSequenceCoderImpl(SequenceCoderImpl): - """A coder for homogeneous tuple objects.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder for homogeneous tuple objects.""" def _construct_from_sequence(self, components): return tuple(components) class IterableCoderImpl(SequenceCoderImpl): - """A coder for homogeneous iterable objects.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder for homogeneous iterable objects.""" def _construct_from_sequence(self, components): return components class WindowedValueCoderImpl(StreamCoderImpl): - """A coder for windowed values.""" + """For internal use only; no backwards-compatibility guarantees. + + A coder for windowed values.""" # Ensure that lexicographic ordering of the bytes corresponds to # chronological order of timestamps. @@ -713,7 +744,9 @@ def get_estimated_size_and_observables(self, value, nested=False): class LengthPrefixCoderImpl(StreamCoderImpl): - """Coder which prefixes the length of the encoded object in the stream.""" + """For internal use only; no backwards-compatibility guarantees. + + Coder which prefixes the length of the encoded object in the stream.""" def __init__(self, value_coder): self._value_coder = value_coder diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py index f344b5d0d38e..5a808d8ad38c 100644 --- a/sdks/python/apache_beam/coders/observable.py +++ b/sdks/python/apache_beam/coders/observable.py @@ -20,7 +20,9 @@ class ObservableMixin(object): - """An observable iterable. + """For internal use only; no backwards-compatibility guarantees. + + An observable iterable. Subclasses need to call self.notify_observers with any object yielded. """ diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index a87495c94bfe..85837bcfa002 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -21,7 +21,9 @@ class OutputStream(object): - """A pure Python implementation of stream.OutputStream.""" + """For internal use only; no backwards-compatibility guarantees. + + A pure Python implementation of stream.OutputStream.""" def __init__(self): self.data = [] @@ -69,7 +71,9 @@ def size(self): class ByteCountingOutputStream(OutputStream): - """A pure Python implementation of stream.ByteCountingOutputStream.""" + """For internal use only; no backwards-compatibility guarantees. + + A pure Python implementation of stream.ByteCountingOutputStream.""" def __init__(self): # Note that we don't actually use any of the data initialized by our super. @@ -96,7 +100,9 @@ def __str__(self): class InputStream(object): - """A pure Python implementation of stream.InputStream.""" + """For internal use only; no backwards-compatibility guarantees. + + A pure Python implementation of stream.InputStream.""" def __init__(self, data): self.data = data @@ -149,7 +155,9 @@ def read_bigendian_double(self): def get_varint_size(v): - """Returns the size of the given integer value when encode as a VarInt.""" + """For internal use only; no backwards-compatibility guarantees. + + Returns the size of the given integer value when encode as a VarInt.""" if v < 0: v += 1 << 64 if v <= 0: diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 1bd4312705f2..60832c9b130c 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -160,7 +160,9 @@ def verify_deterministic(self, key_coder, op_name, silent=True): class FirstOf(object): - """A class used to get the first matching coder from a list of coders.""" + """For internal use only; no backwards-compatibility guarantees. + + A class used to get the first matching coder from a list of coders.""" def __init__(self, coders): self._coders = coders diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 3c8dd64d9759..9f32092f4166 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -38,7 +38,9 @@ def set_running_in_gce(worker_executing_project): - """Informs the authentication library that we are running in GCE. + """For internal use only; no backwards-compatibility guarantees. + + Informs the authentication library that we are running in GCE. When we are running in GCE, we have the option of using the VM metadata credentials for authentication to Google services. @@ -57,8 +59,10 @@ class AuthenticationException(retry.PermanentException): pass -class GCEMetadataCredentials(OAuth2Credentials): - """Credential object initialized using access token from GCE VM metadata.""" +class _GCEMetadataCredentials(OAuth2Credentials): + """For internal use only; no backwards-compatibility guarantees. + + Credential object initialized using access token from GCE VM metadata.""" def __init__(self, user_agent=None): """Create an instance of GCEMetadataCredentials. @@ -69,7 +73,7 @@ def __init__(self, user_agent=None): Args: user_agent: string, The HTTP User-Agent to provide for this application. """ - super(GCEMetadataCredentials, self).__init__( + super(_GCEMetadataCredentials, self).__init__( None, # access_token None, # client_id None, # client_secret @@ -94,7 +98,9 @@ def _refresh(self, http_request): def get_service_credentials(): - """Get credentials to access Google services.""" + """For internal use only; no backwards-compatibility guarantees. + + Get credentials to access Google services.""" user_agent = 'beam-python-sdk/1.0' if is_running_in_gce: # We are currently running as a GCE taskrunner worker. @@ -102,7 +108,7 @@ def get_service_credentials(): # TODO(ccy): It's not entirely clear if these credentials are thread-safe. # If so, we can cache these credentials to save the overhead of creating # them again. - return GCEMetadataCredentials(user_agent=user_agent) + return _GCEMetadataCredentials(user_agent=user_agent) else: client_scopes = [ 'https://www.googleapis.com/auth/bigquery', diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py index 523db9c7d804..59f8b60dd9d7 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value.py +++ b/sdks/python/apache_beam/internal/gcp/json_value.py @@ -33,7 +33,9 @@ def get_typed_value_descriptor(obj): - """Converts a basic type into a @type/value dictionary. + """For internal use only; no backwards-compatibility guarantees. + + Converts a basic type into a @type/value dictionary. Args: obj: A basestring, bool, int, or float to be converted. @@ -59,7 +61,9 @@ def get_typed_value_descriptor(obj): def to_json_value(obj, with_type=False): - """Converts Python objects into extra_types.JsonValue objects. + """For internal use only; no backwards-compatibility guarantees. + + Converts Python objects into extra_types.JsonValue objects. Args: obj: Python object to be converted. Can be 'None'. @@ -115,7 +119,9 @@ def to_json_value(obj, with_type=False): def from_json_value(v): - """Converts extra_types.JsonValue objects into Python objects. + """For internal use only; no backwards-compatibility guarantees. + + Converts extra_types.JsonValue objects into Python objects. Args: v: JsonValue object to be converted. diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index ed429fe084be..4305379747b3 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -36,25 +36,25 @@ import dill -def is_nested_class(cls): +def _is_nested_class(cls): """Returns true if argument is a class object that appears to be nested.""" return (isinstance(cls, type) and cls.__module__ != '__builtin__' and cls.__name__ not in sys.modules[cls.__module__].__dict__) -def find_containing_class(nested_class): +def _find_containing_class(nested_class): """Finds containing class of a nestec class passed as argument.""" - def find_containing_class_inner(outer): + def _find_containing_class_inner(outer): for k, v in outer.__dict__.items(): if v is nested_class: return outer, k elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'): - res = find_containing_class_inner(v) + res = _find_containing_class_inner(v) if res: return res - return find_containing_class_inner(sys.modules[nested_class.__module__]) + return _find_containing_class_inner(sys.modules[nested_class.__module__]) def _nested_type_wrapper(fun): @@ -76,8 +76,8 @@ def wrapper(pickler, obj): # do anything special because the pickler itself will save the constituent # parts of the type (i.e., name, base classes, dictionary) and then # recreate it during unpickling. - if is_nested_class(obj) and obj.__module__ != '__main__': - containing_class_and_name = find_containing_class(obj) + if _is_nested_class(obj) and obj.__module__ != '__main__': + containing_class_and_name = _find_containing_class(obj) if containing_class_and_name is not None: return pickler.save_reduce( getattr, containing_class_and_name, obj=obj) @@ -108,11 +108,11 @@ def wrapper(pickler, obj): # Dill pickles generators objects without complaint, but unpickling produces # TypeError: object.__new__(generator) is not safe, use generator.__new__() # on some versions of Python. -def reject_generators(unused_pickler, unused_obj): +def _reject_generators(unused_pickler, unused_obj): raise TypeError("can't (safely) pickle generator objects") -dill.dill.Pickler.dispatch[types.GeneratorType] = reject_generators +dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators # This if guards against dill not being full initialized when generating docs. @@ -185,6 +185,8 @@ def new_log_info(msg, *args, **kwargs): # pickler.loads() being used for data, which results in an unnecessary base64 # encoding. This should be cleaned up. def dumps(o, enable_trace=True): + """For internal use only; no backwards-compatibility guarantees.""" + try: s = dill.dumps(o) except Exception: # pylint: disable=broad-except @@ -206,6 +208,8 @@ def dumps(o, enable_trace=True): def loads(encoded, enable_trace=True): + """For internal use only; no backwards-compatibility guarantees.""" + c = base64.b64decode(encoded) s = zlib.decompress(c) @@ -224,7 +228,9 @@ def loads(encoded, enable_trace=True): def dump_session(file_path): - """Pickle the current python session to be used in the worker. + """For internal use only; no backwards-compatibility guarantees. + + Pickle the current python session to be used in the worker. Note: Due to the inconsistency in the first dump of dill dump_session we create and load the dump twice to have consistent results in the worker and diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 5b31e88ee271..df4878c31382 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -24,7 +24,9 @@ class ArgumentPlaceholder(object): - """A place holder object replacing PValues in argument lists. + """For internal use only; no backwards-compatibility guarantees. + + A place holder object replacing PValues in argument lists. A Fn object can take any number of "side inputs", which are PValues that will be evaluated during pipeline execution and will be provided to the function @@ -48,7 +50,9 @@ def __eq__(self, other): def remove_objects_from_args(args, kwargs, pvalue_classes): - """Replaces all objects of a given type in args/kwargs with a placeholder. + """For internal use only; no backwards-compatibility guarantees. + + Replaces all objects of a given type in args/kwargs with a placeholder. Args: args: A list of positional arguments. @@ -77,7 +81,9 @@ def swapper(value): def insert_values_in_args(args, kwargs, values): - """Replaces all placeholders in args/kwargs with actual values. + """For internal use only; no backwards-compatibility guarantees. + + Replaces all placeholders in args/kwargs with actual values. Args: args: A list of positional arguments. @@ -100,7 +106,9 @@ def insert_values_in_args(args, kwargs, values): def run_using_threadpool(fn_to_execute, inputs, pool_size): - """Runs the given function on given inputs using a thread pool. + """For internal use only; no backwards-compatibility guarantees. + + Runs the given function on given inputs using a thread pool. Args: fn_to_execute: Function to execute diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py index 1656180f0dfd..dfd16953bfa9 100644 --- a/sdks/python/apache_beam/io/concat_source.py +++ b/sdks/python/apache_beam/io/concat_source.py @@ -25,7 +25,9 @@ class ConcatSource(iobase.BoundedSource): - """A ``BoundedSource`` that can group a set of ``BoundedSources``. + """For internal use only; no backwards-compatibility guarantees. + + A ``BoundedSource`` that can group a set of ``BoundedSources``. Primarily for internal use, use the ``apache_beam.Flatten`` transform to create the union of several reads. @@ -89,7 +91,9 @@ def default_output_coder(self): class ConcatRangeTracker(iobase.RangeTracker): - """Range tracker for ConcatSource""" + """For internal use only; no backwards-compatibility guarantees. + + Range tracker for ConcatSource""" def __init__(self, start, end, source_bundles): """Initializes ``ConcatRangeTracker`` diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 41d24beda6dc..fbe3ad3efcd5 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -31,7 +31,9 @@ class CellCommitState(object): - """Atomically tracks a cell's dirty/clean commit status. + """For internal use only; no backwards-compatibility guarantees. + + Atomically tracks a cell's dirty/clean commit status. Reporting a metric update works in a two-step process: First, updates to the metric are received, and the metric is marked as 'dirty'. Later, updates are @@ -102,7 +104,9 @@ def before_commit(self): class MetricCell(object): - """Accumulates in-memory changes to a metric. + """For internal use only; no backwards-compatibility guarantees. + + Accumulates in-memory changes to a metric. A MetricCell represents a specific metric in a single context and bundle. All subclasses must be thread safe, as these are used in the pipeline runners, @@ -118,7 +122,9 @@ def get_cumulative(self): class CounterCell(Counter, MetricCell): - """Tracks the current value and delta of a counter metric. + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta of a counter metric. Each cell tracks the state of a metric independently per context per bundle. Therefore, each metric has a different cell in each bundle, cells are @@ -146,7 +152,9 @@ def get_cumulative(self): class DistributionCell(Distribution, MetricCell): - """Tracks the current value and delta for a distribution metric. + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a distribution metric. Each cell tracks the state of a metric independently per context per bundle. Therefore, each metric has a different cell in each bundle, that is later @@ -228,7 +236,9 @@ def mean(self): class DistributionData(object): - """The data structure that holds data about a distribution metric. + """For internal use only; no backwards-compatibility guarantees. + + The data structure that holds data about a distribution metric. Distribution metrics are restricted to distributions of integers only. @@ -280,7 +290,9 @@ def singleton(cls, value): class MetricAggregator(object): - """Base interface for aggregating metric data during pipeline execution.""" + """For internal use only; no backwards-compatibility guarantees. + + Base interface for aggregating metric data during pipeline execution.""" def zero(self): raise NotImplementedError @@ -292,7 +304,9 @@ def result(self, x): class CounterAggregator(MetricAggregator): - """Aggregator for Counter metric data during pipeline execution. + """For internal use only; no backwards-compatibility guarantees. + + Aggregator for Counter metric data during pipeline execution. Values aggregated should be ``int`` objects. """ @@ -307,7 +321,9 @@ def result(self, x): class DistributionAggregator(MetricAggregator): - """Aggregator for Distribution metric data during pipeline execution. + """For internal use only; no backwards-compatibility guarantees. + + Aggregator for Distribution metric data during pipeline execution. Values aggregated should be ``DistributionData`` objects. """ diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index dbd05330d1fd..a06ec0c2f6af 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -16,7 +16,7 @@ # """ -Internal classes for Metrics API. +This module is for internal use only; no backwards-compatibility guarantees. The classes in this file keep shared state, and organize metrics information. @@ -36,7 +36,9 @@ class MetricKey(object): - """Key used to identify instance of metric cell. + """ + + Key used to identify instance of metric cell. Metrics are internally keyed by the step name they associated with and the name of the metric. @@ -193,6 +195,7 @@ def get_cumulative(self): class ScopedMetricsContainer(object): + def __init__(self, container=None): self._stack = MetricsEnvironment.container_stack() self._container = container diff --git a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py index 769b13cfeb7d..cb0b72b7d1fc 100644 --- a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py +++ b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py @@ -33,6 +33,8 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 +# This module is experimental. No backwards-compatibility guarantees. + DESCRIPTOR = _descriptor.FileDescriptor( name='beam_fn_api.proto', diff --git a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py index 7fbe4c10d1ba..08d7dada14a6 100644 --- a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py +++ b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py @@ -22,6 +22,8 @@ import beam_fn_api_pb2 as beam__fn__api__pb2 +# This module is experimental. No backwards-compatibility guarantees. + class BeamFnControlStub(object): """ diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py index d2006d71c9c2..e8793b63da51 100644 --- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py +++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py @@ -33,6 +33,8 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 +# This module is experimental. No backwards-compatibility guarantees. + DESCRIPTOR = _descriptor.FileDescriptor( name='beam_runner_api.proto', diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 53f7aa8cc962..dcfac2e1b2bd 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -43,7 +43,7 @@ cdef class DoFnSignature(object): cdef class DoFnInvoker(object): cdef public DoFnSignature signature - cdef OutputProcessor output_processor + cdef _OutputProcessor output_processor cpdef invoke_process(self, WindowedValue windowed_value) cpdef invoke_start_bundle(self) @@ -77,7 +77,7 @@ cdef class DoFnRunner(Receiver): cpdef process(self, WindowedValue windowed_value) -cdef class OutputProcessor(object): +cdef class _OutputProcessor(object): cdef object window_fn cdef Receiver main_receivers cdef object tagged_receivers diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index ec1f5dc47c77..0aef0a1791a0 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -32,6 +32,7 @@ class LoggingContext(object): + """For internal use only; no backwards-compatibility guarantees.""" def enter(self): pass @@ -41,7 +42,9 @@ def exit(self): class Receiver(object): - """An object that consumes a WindowedValue. + """For internal use only; no backwards-compatibility guarantees. + + An object that consumes a WindowedValue. This class can be efficiently used to pass values between the sdk and worker harnesses. @@ -52,7 +55,9 @@ def receive(self, windowed_value): class DoFnMethodWrapper(object): - """Represents a method of a DoFn object.""" + """For internal use only; no backwards-compatibility guarantees. + + Represents a method of a DoFn object.""" def __init__(self, do_fn, method_name): """ @@ -299,7 +304,9 @@ def _invoke_per_window(self, windowed_value): class DoFnRunner(Receiver): - """A helper class for executing ParDo operations. + """For internal use only; no backwards-compatibility guarantees. + + A helper class for executing ParDo operations. """ def __init__(self, @@ -361,7 +368,7 @@ def __init__(self, # Optimize for the common case. main_receivers = as_receiver(tagged_receivers[None]) - output_processor = OutputProcessor( + output_processor = _OutputProcessor( windowing.windowfn, main_receivers, tagged_receivers) self.do_fn_invoker = DoFnInvoker.create_invoker( @@ -411,11 +418,11 @@ def _reraise_augmented(self, exn): raise -class OutputProcessor(object): +class _OutputProcessor(object): """Processes output produced by DoFn method invocations.""" def __init__(self, window_fn, main_receivers, tagged_receivers): - """Initializes ``OutputProcessor``. + """Initializes ``_OutputProcessor``. Args: window_fn: a windowing function (WindowFn). @@ -497,7 +504,7 @@ def finish_bundle_outputs(self, results): self.tagged_receivers[tag].output(windowed_value) -class NoContext(WindowFn.AssignContext): +class _NoContext(WindowFn.AssignContext): """An uninspectable WindowFn.AssignContext.""" NO_VALUE = object() @@ -518,7 +525,9 @@ def existing_windows(self): class DoFnState(object): - """Keeps track of state that DoFns want, currently, user counters. + """For internal use only; no backwards-compatibility guarantees. + + Keeps track of state that DoFns want, currently, user counters. """ def __init__(self, counter_factory): @@ -533,6 +542,7 @@ def counter_for(self, aggregator): # TODO(robertwb): Replace core.DoFnContext with this. class DoFnContext(object): + """For internal use only; no backwards-compatibility guarantees.""" def __init__(self, label, element=None, state=None): self.label = label @@ -597,6 +607,8 @@ def receive(self, windowed_value): def as_receiver(maybe_receiver): + """For internal use only; no backwards-compatibility guarantees.""" + if isinstance(maybe_receiver, Receiver): return maybe_receiver return _ReceiverAdapter(maybe_receiver) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 220ceb8ed325..ea4959322f1e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -15,7 +15,9 @@ # limitations under the License. # -"""Dataflow client utility functions.""" +""" For internal use only. No backwards compatibility guarantees. + +Dataflow client utility functions.""" import codecs import getpass diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 63e593ce58d1..892d9f94d1ee 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -244,7 +244,9 @@ def _populate_requirements_cache(requirements_file, cache_dir): def stage_job_resources( options, file_copy=_dependency_file_copy, build_setup_args=None, temp_dir=None, populate_requirements_cache=_populate_requirements_cache): - """Creates (if needed) and stages job resources to options.staging_location. + """For internal use only; no backwards-compatibility guarantees. + + Creates (if needed) and stages job resources to options.staging_location. Args: options: Command line options. More specifically the function will expect @@ -467,7 +469,9 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir): def get_required_container_version(): - """Returns the Google Cloud Dataflow container version for remote execution. + """For internal use only; no backwards-compatibility guarantees. + + Returns the Google Cloud Dataflow container version for remote execution. """ # TODO(silviuc): Handle apache-beam versions when we have official releases. import pkg_resources as pkg @@ -487,7 +491,9 @@ def get_required_container_version(): def get_sdk_name_and_version(): - """Returns name and version of SDK reported to Google Cloud Dataflow.""" + """For internal use only; no backwards-compatibility guarantees. + + Returns name and version of SDK reported to Google Cloud Dataflow.""" # TODO(ccy): Make this check cleaner. container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: @@ -496,7 +502,9 @@ def get_sdk_name_and_version(): def get_sdk_package_name(): - """Returns the PyPI package name to be staged to Google Cloud Dataflow.""" + """For internal use only; no backwards-compatibility guarantees. + + Returns the PyPI package name to be staged to Google Cloud Dataflow.""" container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return BEAM_PACKAGE_NAME diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 182f27e5af97..be6722427b6c 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -29,7 +29,9 @@ class TransformNames(object): - """Transform strings as they are expected in the CloudWorkflow protos.""" + """For internal use only; no backwards-compatibility guarantees. + + Transform strings as they are expected in the CloudWorkflow protos.""" COLLECTION_TO_SINGLETON = 'CollectionToSingleton' COMBINE = 'CombineValues' CREATE_PCOLLECTION = 'CreateCollection' @@ -41,7 +43,9 @@ class TransformNames(object): class PropertyNames(object): - """Property strings as they are expected in the CloudWorkflow protos.""" + """For internal use only; no backwards-compatibility guarantees. + + Property strings as they are expected in the CloudWorkflow protos.""" BIGQUERY_CREATE_DISPOSITION = 'create_disposition' BIGQUERY_DATASET = 'dataset' BIGQUERY_QUERY = 'bigquery_query' diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 188b4af2579c..c1f4238178ac 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -16,6 +16,8 @@ # """Dataflow native sources and sinks. + +For internal use only; no backwards-compatibility guarantees. """ import logging diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 42c8095c6858..ed00b03310bb 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -24,7 +24,9 @@ class BundleFactory(object): - """BundleFactory creates output bundles to be used by transform evaluators. + """For internal use only; no backwards-compatibility guarantees. + + BundleFactory creates output bundles to be used by transform evaluators. Args: stacked: whether or not to stack the WindowedValues within the bundle @@ -36,7 +38,7 @@ def __init__(self, stacked): self._stacked = stacked def create_bundle(self, output_pcollection): - return Bundle(output_pcollection, self._stacked) + return _Bundle(output_pcollection, self._stacked) def create_empty_committed_bundle(self, output_pcollection): bundle = self.create_bundle(output_pcollection) @@ -45,7 +47,7 @@ def create_empty_committed_bundle(self, output_pcollection): # a bundle represents a unit of work that will be processed by a transform. -class Bundle(object): +class _Bundle(object): """Part of a PCollection with output elements. Part of a PCollection. Elements are output to a bundle, which will cause them @@ -67,7 +69,7 @@ class Bundle(object): b = Bundle(stacked=False) """ - class StackedWindowedValues(object): + class _StackedWindowedValues(object): """A stack of WindowedValues with the same timestamp and windows. It must be initialized from a single WindowedValue. @@ -131,7 +133,7 @@ def get_elements_iterable(self, make_copy=False): def iterable_stacked_or_elements(elements): for e in elements: - if isinstance(e, Bundle.StackedWindowedValues): + if isinstance(e, _Bundle._StackedWindowedValues): for w in e.windowed_values(): yield w else: @@ -171,11 +173,11 @@ def add(self, element): return if (self._elements and (isinstance(self._elements[-1], (WindowedValue, - Bundle.StackedWindowedValues))) and + _Bundle._StackedWindowedValues))) and self._elements[-1].timestamp == element.timestamp and self._elements[-1].windows == element.windows): if isinstance(self._elements[-1], WindowedValue): - self._elements[-1] = Bundle.StackedWindowedValues(self._elements[-1]) + self._elements[-1] = _Bundle._StackedWindowedValues(self._elements[-1]) self._elements[-1].add_value(element.value) else: self._elements.append(element) diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py index dd1800a1bb85..84d52f79948b 100644 --- a/sdks/python/apache_beam/runners/direct/clock.py +++ b/sdks/python/apache_beam/runners/direct/clock.py @@ -23,6 +23,7 @@ class Clock(object): + """For internal use only; no backwards-compatibility guarantees.""" def time(self): """Returns the number of milliseconds since epoch.""" @@ -30,7 +31,9 @@ def time(self): class MockClock(Clock): - """Mock clock implementation for testing.""" + """For internal use only; no backwards-compatibility guarantees. + + Mock clock implementation for testing.""" def __init__(self, now_in_ms): self._now_in_ms = now_in_ms diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py index cdfadb704cf5..d625d3ce5cee 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py @@ -24,7 +24,9 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor): - """Visitor for extracting value-consumer relations from the graph. + """For internal use only; no backwards-compatibility guarantees. + + Visitor for extracting value-consumer relations from the graph. Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This is used to schedule consuming PTransforms to consume input after the upstream diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 0aa1aec42bd7..9efbede47e9b 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -30,7 +30,7 @@ from apache_beam.metrics.execution import ScopedMetricsContainer -class ExecutorService(object): +class _ExecutorService(object): """Thread pool for executing tasks in parallel.""" class CallableTask(object): @@ -42,14 +42,14 @@ def call(self): def name(self): return None - class ExecutorServiceWorker(threading.Thread): + class _ExecutorServiceWorker(threading.Thread): """Worker thread for executing a single task at a time.""" # Amount to block waiting for getting an item from the queue in seconds. TIMEOUT = 5 def __init__(self, queue, index): - super(ExecutorService.ExecutorServiceWorker, self).__init__() + super(_ExecutorService._ExecutorServiceWorker, self).__init__() self.queue = queue self._index = index self._default_name = 'ExecutorServiceWorker-' + str(index) @@ -70,7 +70,7 @@ def _get_task_or_none(self): # Do not block indefinitely, otherwise we may not act for a requested # shutdown. return self.queue.get( - timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT) + timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT) except Queue.Empty: return None @@ -92,12 +92,12 @@ def shutdown(self): def __init__(self, num_workers): self.queue = Queue.Queue() - self.workers = [ExecutorService.ExecutorServiceWorker( + self.workers = [_ExecutorService._ExecutorServiceWorker( self.queue, i) for i in range(num_workers)] self.shutdown_requested = False def submit(self, task): - assert isinstance(task, ExecutorService.CallableTask) + assert isinstance(task, _ExecutorService.CallableTask) if not self.shutdown_requested: self.queue.put(task) @@ -122,7 +122,7 @@ def shutdown(self): # last task). -class TransformEvaluationState(object): +class _TransformEvaluationState(object): def __init__(self, executor_service, scheduled): self.executor_service = executor_service @@ -136,7 +136,7 @@ def complete(self, completed_work): self.scheduled.remove(completed_work) -class ParallelEvaluationState(TransformEvaluationState): +class _ParallelEvaluationState(_TransformEvaluationState): """A TransformEvaluationState with unlimited parallelism. Any TransformExecutor scheduled will be immediately submitted to the @@ -148,7 +148,7 @@ class ParallelEvaluationState(TransformEvaluationState): pass -class SerialEvaluationState(TransformEvaluationState): +class _SerialEvaluationState(_TransformEvaluationState): """A TransformEvaluationState with a single work queue. Any TransformExecutor scheduled will be placed on the work queue. Only one @@ -159,14 +159,14 @@ class SerialEvaluationState(TransformEvaluationState): """ def __init__(self, executor_service, scheduled): - super(SerialEvaluationState, self).__init__(executor_service, scheduled) + super(_SerialEvaluationState, self).__init__(executor_service, scheduled) self.serial_queue = collections.deque() self.currently_evaluating = None self._lock = threading.Lock() def complete(self, completed_work): self._update_currently_evaluating(None, completed_work) - super(SerialEvaluationState, self).complete(completed_work) + super(_SerialEvaluationState, self).complete(completed_work) def schedule(self, new_work): self._update_currently_evaluating(new_work, None) @@ -181,10 +181,10 @@ def _update_currently_evaluating(self, new_work, completed_work): if self.serial_queue and not self.currently_evaluating: next_work = self.serial_queue.pop() self.currently_evaluating = next_work - super(SerialEvaluationState, self).schedule(next_work) + super(_SerialEvaluationState, self).schedule(next_work) -class TransformExecutorServices(object): +class _TransformExecutorServices(object): """Schedules and completes TransformExecutors. Controls the concurrency as appropriate for the applied transform the executor @@ -194,7 +194,7 @@ class TransformExecutorServices(object): def __init__(self, executor_service): self._executor_service = executor_service self._scheduled = set() - self._parallel = ParallelEvaluationState( + self._parallel = _ParallelEvaluationState( self._executor_service, self._scheduled) self._serial_cache = WeakValueDictionary() @@ -204,7 +204,7 @@ def parallel(self): def serial(self, step): cached = self._serial_cache.get(step) if not cached: - cached = SerialEvaluationState(self._executor_service, self._scheduled) + cached = _SerialEvaluationState(self._executor_service, self._scheduled) self._serial_cache[step] = cached return cached @@ -230,17 +230,19 @@ def handle_result(self, input_committed_bundle, transform_result): output_committed_bundles = self._evaluation_context.handle_result( input_committed_bundle, self._timers, transform_result) for output_committed_bundle in output_committed_bundles: - self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate( + self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate( output_committed_bundle, None)) return output_committed_bundles def handle_exception(self, exception): self._all_updates.offer( - _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception)) + _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception)) -class TransformExecutor(ExecutorService.CallableTask): - """TransformExecutor will evaluate a bundle using an applied ptransform. +class TransformExecutor(_ExecutorService.CallableTask): + """For internal use only; no backwards-compatibility guarantees. + + TransformExecutor will evaluate a bundle using an applied ptransform. A CallableTask responsible for constructing a TransformEvaluator and evaluating it on some bundle of input, and registering the result using the @@ -316,6 +318,7 @@ def call(self): class Executor(object): + """For internal use only; no backwards-compatibility guarantees.""" def __init__(self, *args, **kwargs): self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs) @@ -334,17 +337,17 @@ class _ExecutorServiceParallelExecutor(object): def __init__(self, value_to_consumers, transform_evaluator_registry, evaluation_context): - self.executor_service = ExecutorService( + self.executor_service = _ExecutorService( _ExecutorServiceParallelExecutor.NUM_WORKERS) - self.transform_executor_services = TransformExecutorServices( + self.transform_executor_services = _TransformExecutorServices( self.executor_service) self.value_to_consumers = value_to_consumers self.transform_evaluator_registry = transform_evaluator_registry self.evaluation_context = evaluation_context self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue( - _ExecutorServiceParallelExecutor.ExecutorUpdate) + _ExecutorServiceParallelExecutor._ExecutorUpdate) self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate) + _ExecutorServiceParallelExecutor._VisibleExecutorUpdate) self.default_completion_callback = _CompletionCallback( evaluation_context, self.all_updates) @@ -412,7 +415,7 @@ def offer(self, item): assert isinstance(item, self._item_type) self._queue.put_nowait(item) - class ExecutorUpdate(object): + class _ExecutorUpdate(object): """An internal status update on the state of the executor.""" def __init__(self, produced_bundle=None, exception=None): @@ -425,7 +428,7 @@ def __init__(self, produced_bundle=None, exception=None): # Not the right exception. self.exc_info = (exception, None, None) - class VisibleExecutorUpdate(object): + class _VisibleExecutorUpdate(object): """An update of interest to the user. Used for awaiting the completion to decide whether to return normally or @@ -437,7 +440,7 @@ def __init__(self, exc_info=(None, None, None)): self.exception = exc_info[1] or exc_info[0] self.exc_info = exc_info - class _MonitorTask(ExecutorService.CallableTask): + class _MonitorTask(_ExecutorService.CallableTask): """MonitorTask continuously runs to ensure that pipeline makes progress.""" def __init__(self, executor): @@ -458,7 +461,7 @@ def call(self): logging.warning('A task failed with exception.\n %s', update.exception) self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( + _ExecutorServiceParallelExecutor._VisibleExecutorUpdate( update.exc_info)) update = self._executor.all_updates.poll() self._executor.evaluation_context.schedule_pending_unblocked_tasks( @@ -467,7 +470,7 @@ def call(self): except Exception as e: # pylint: disable=broad-except logging.error('Monitor task died due to exception.\n %s', e) self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( + _ExecutorServiceParallelExecutor._VisibleExecutorUpdate( sys.exc_info())) finally: if not self._should_shutdown(): @@ -497,11 +500,11 @@ def _should_shutdown(self): else: if self._executor.evaluation_context.is_done(): self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate()) + _ExecutorServiceParallelExecutor._VisibleExecutorUpdate()) else: # Nothing is scheduled for execution, but watermarks incomplete. self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( + _ExecutorServiceParallelExecutor._VisibleExecutorUpdate( (Exception('Monitor task detected a pipeline stall.'), None, None))) diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index f6cdd414eb48..6984ded7cc3f 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -41,7 +41,9 @@ class TransformEvaluatorRegistry(object): - """Creates instances of TransformEvaluator for the application of a transform. + """For internal use only; no backwards-compatibility guarantees. + + Creates instances of TransformEvaluator for the application of a transform. """ def __init__(self, evaluation_context): @@ -253,7 +255,7 @@ class NullReceiver(object): def output(self, element): pass - class InMemoryReceiver(object): + class _InMemoryReceiver(object): """Buffers undeclared outputs to the given dictionary.""" def __init__(self, target, tag): @@ -267,7 +269,7 @@ def __missing__(self, key): if self._evaluation_context.has_cache: if not self._undeclared_in_memory_tag_values: self._undeclared_in_memory_tag_values = collections.defaultdict(list) - receiver = _TaggedReceivers.InMemoryReceiver( + receiver = _TaggedReceivers._InMemoryReceiver( self._undeclared_in_memory_tag_values, key) else: if not self._null_receiver: diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py index 8ae0aeab24b2..febdd202aa0a 100644 --- a/sdks/python/apache_beam/runners/direct/transform_result.py +++ b/sdks/python/apache_beam/runners/direct/transform_result.py @@ -21,7 +21,9 @@ class TransformResult(object): - """The result of evaluating an AppliedPTransform with a TransformEvaluator.""" + """For internal use only; no backwards-compatibility guarantees. + + The result of evaluating an AppliedPTransform with a TransformEvaluator.""" def __init__(self, applied_ptransform, uncommitted_output_bundles, state, timer_update, counters, watermark_hold, diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 7793478a85a8..3a135397e12f 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -28,7 +28,9 @@ class WatermarkManager(object): - """Tracks and updates watermarks for all AppliedPTransforms.""" + """For internal use only; no backwards-compatibility guarantees. + + Tracks and updates watermarks for all AppliedPTransforms.""" WATERMARK_POS_INF = MAX_TIMESTAMP WATERMARK_NEG_INF = MIN_TIMESTAMP @@ -41,12 +43,12 @@ def __init__(self, clock, root_transforms, value_to_consumers): self._transform_to_watermarks = {} for root_transform in root_transforms: - self._transform_to_watermarks[root_transform] = TransformWatermarks( + self._transform_to_watermarks[root_transform] = _TransformWatermarks( self._clock) for consumers in value_to_consumers.values(): for consumer in consumers: - self._transform_to_watermarks[consumer] = TransformWatermarks( + self._transform_to_watermarks[consumer] = _TransformWatermarks( self._clock) for consumers in value_to_consumers.values(): @@ -139,7 +141,7 @@ def extract_fired_timers(self): return all_timers -class TransformWatermarks(object): +class _TransformWatermarks(object): """Tracks input and output watermarks for aan AppliedPTransform.""" def __init__(self, clock): diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 4f827749d0a4..d3d3c2468321 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -60,7 +60,9 @@ def get_by_id(self, id): class PipelineContext(object): - """Used for accessing and constructing the referenced objects of a Pipeline. + """For internal use only; no backwards-compatibility guarantees. + + Used for accessing and constructing the referenced objects of a Pipeline. """ _COMPONENT_TYPES = { diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 5802c17f4276..26355595e991 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -39,6 +39,8 @@ from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sdk_worker +# This module is experimental. No backwards-compatibility guarantees. + def streaming_rpc_handler(cls, method_name): """Un-inverts the flow of control between the runner and the sdk harness.""" diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py index 3e0878002581..077871ed50ca 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py @@ -44,6 +44,8 @@ from apache_beam.utils import profiler from apache_beam.utils.counters import CounterFactory +# This module is experimental. No backwards-compatibility guarantees. + class MapTaskExecutorRunner(PipelineRunner): """Beam runner translating a pipeline into map tasks that are then executed. diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index b35cb7b77f20..d875fdc11a45 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -56,7 +56,9 @@ def _get_runner_map(runner_names, module_path): def create_runner(runner_name): - """Creates a runner instance from a runner class name. + """For internal use only; no backwards-compatibility guarantees. + + Creates a runner instance from a runner class name. Args: runner_name: Name of the pipeline runner. Possible values are: @@ -170,7 +172,9 @@ def run_transform(self, transform_node): class PValueCache(object): - """Local cache for arbitrary information computed for PValue objects.""" + """For internal use only; no backwards-compatibility guarantees. + + Local cache for arbitrary information computed for PValue objects.""" def __init__(self, use_disk_backed_cache=False): # Cache of values computed while a runner executes a pipeline. This is a diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 6425447994f6..5edd0b490750 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -31,6 +31,8 @@ from apache_beam.runners.api import beam_fn_api_pb2 import grpc +# This module is experimental. No backwards-compatibility guarantees. + class ClosableOutputStream(type(coder_impl.create_OutputStream())): """A Outputstream for use with CoderImpls that has a close() method.""" diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index b9e36ad2c312..59ffbf4f45a2 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -24,6 +24,8 @@ from apache_beam.runners.api import beam_fn_api_pb2 import grpc +# This module is experimental. No backwards-compatibility guarantees. + class FnApiLogRecordHandler(logging.Handler): """A handler that writes log records to the fn API.""" diff --git a/sdks/python/apache_beam/runners/worker/logger.py b/sdks/python/apache_beam/runners/worker/logger.py index 217dc5835075..043353807a31 100644 --- a/sdks/python/apache_beam/runners/worker/logger.py +++ b/sdks/python/apache_beam/runners/worker/logger.py @@ -24,6 +24,8 @@ from apache_beam.runners.common import LoggingContext +# This module is experimental. No backwards-compatibility guarantees. + # Per-thread worker information. This is used only for logging to set # context information that changes while work items get executed: diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index 56ce0db6c8be..2bb15fa7ee4b 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -25,6 +25,8 @@ from apache_beam.utils.counters import Counter +# This module is experimental. No backwards-compatibility guarantees. + class SumAccumulator(object): """Accumulator for collecting byte counts.""" diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index 977e165afe7c..c03d9a25ec5f 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -25,6 +25,8 @@ from apache_beam import coders +# This module is experimental. No backwards-compatibility guarantees. + def build_worker_instruction(*args): """Create an object representing a ParallelInstruction protobuf. diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6907f6edc92d..596bb907b582 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -40,6 +40,10 @@ from apache_beam.runners.api import beam_fn_api_pb2 from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations + +# This module is experimental. No backwards-compatibility guarantees. + + try: from apache_beam.runners.worker import statesampler except ImportError: diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 28828c3b4925..b8917791b91e 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -28,6 +28,8 @@ from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler from apache_beam.runners.worker.sdk_worker import SdkHarness +# This module is experimental. No backwards-compatibility guarantees. + def main(unused_argv): """Main entry point for SDK Fn Harness.""" diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index 3bac3d9a2e4b..bdf9f4e71f5e 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -26,6 +26,8 @@ from apache_beam.io import iobase from apache_beam.transforms import window +# This module is experimental. No backwards-compatibility guarantees. + # Maximum number of reader threads for reading side input sources, per side # input. diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py index efd7f2db8fcd..88ace8c5ae89 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py @@ -15,6 +15,8 @@ # limitations under the License. # +# This module is experimental. No backwards-compatibility guarantees. + class StateSampler(object): diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index e37a387e6d18..d42115c5c23f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -227,7 +227,9 @@ def _fn_takes_side_inputs(fn): class CallableWrapperDoFn(DoFn): - """A DoFn (function) object wrapping a callable object. + """For internal use only; no backwards-compatibility guarantees. + + A DoFn (function) object wrapping a callable object. The purpose of this class is to conveniently wrap simple functions and use them in transforms. @@ -410,7 +412,9 @@ def maybe_from_callable(fn): class CallableWrapperCombineFn(CombineFn): - """A CombineFn (function) object wrapping a callable object. + """For internal use only; no backwards-compatibility guarantees. + + A CombineFn (function) object wrapping a callable object. The purpose of this class is to conveniently wrap simple functions and use them in Combine transforms. @@ -537,7 +541,9 @@ def partition_for(self, element, num_partitions, *args, **kwargs): class CallableWrapperPartitionFn(PartitionFn): - """A PartitionFn object wrapping a callable object. + """For internal use only; no backwards-compatibility guarantees. + + A PartitionFn object wrapping a callable object. Instances of this class wrap simple functions for use in Partition operations. """ diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 526291c43fc9..fb79b19be982 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -125,7 +125,7 @@ def visit(self, node, pvalues=None): super(GetPValues, self).visit(node, pvalues) -class ZipPValues(_PValueishTransform): +class _ZipPValues(_PValueishTransform): """Pairs each PValue in a pvalueish with a value in a parallel out sibling. Sibling should have the same nested structure as pvalueish. Leaves in @@ -148,7 +148,7 @@ def visit(self, pvalueish, sibling, pairs=None, context=None): elif isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)): pairs.append((context, pvalueish, sibling)) else: - super(ZipPValues, self).visit(pvalueish, sibling, pairs, context) + super(_ZipPValues, self).visit(pvalueish, sibling, pairs, context) def visit_list(self, pvalueish, sibling, pairs, context): if isinstance(sibling, (list, tuple)): @@ -264,7 +264,7 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output): self.__class__, input_or_output)) root_hint = ( arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints) - for context, pvalue_, hint in ZipPValues().visit(pvalueish, root_hint): + for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint): if pvalue_.element_type is None: # TODO(robertwb): It's a bug that we ever get here. (typecheck) continue @@ -341,7 +341,7 @@ def __rrshift__(self, label): def __or__(self, right): """Used to compose PTransforms, e.g., ptransform1 | ptransform2.""" if isinstance(right, PTransform): - return ChainedPTransform(self, right) + return _ChainedPTransform(self, right) return NotImplemented def __ror__(self, left, label=None): @@ -453,10 +453,10 @@ def from_runner_api_parameter(spec_parameter, unused_context): PTransform.from_runner_api_parameter) -class ChainedPTransform(PTransform): +class _ChainedPTransform(PTransform): def __init__(self, *parts): - super(ChainedPTransform, self).__init__(label=self._chain_label(parts)) + super(_ChainedPTransform, self).__init__(label=self._chain_label(parts)) self._parts = parts def _chain_label(self, parts): @@ -466,7 +466,7 @@ def __or__(self, right): if isinstance(right, PTransform): # Create a flat list rather than a nested tree of composite # transforms for better monitoring, etc. - return ChainedPTransform(*(self._parts + (right,))) + return _ChainedPTransform(*(self._parts + (right,))) return NotImplemented def expand(self, pval): diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 97240d392d2c..2cb7ce32702d 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -47,7 +47,7 @@ class AccumulationMode(object): # RETRACTING = 3 -class StateTag(object): +class _StateTag(object): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then @@ -60,22 +60,22 @@ def __init__(self, tag): self.tag = tag -class ValueStateTag(StateTag): +class _ValueStateTag(_StateTag): """StateTag pointing to an element.""" def __repr__(self): return 'ValueStateTag(%s)' % (self.tag) def with_prefix(self, prefix): - return ValueStateTag(prefix + self.tag) + return _ValueStateTag(prefix + self.tag) -class CombiningValueStateTag(StateTag): +class _CombiningValueStateTag(_StateTag): """StateTag pointing to an element, accumulated with a combiner.""" # TODO(robertwb): Also store the coder (perhaps extracted from the combine_fn) def __init__(self, tag, combine_fn): - super(CombiningValueStateTag, self).__init__(tag) + super(_CombiningValueStateTag, self).__init__(tag) if not combine_fn: raise ValueError('combine_fn must be specified.') if not isinstance(combine_fn, core.CombineFn): @@ -86,22 +86,22 @@ def __repr__(self): return 'CombiningValueStateTag(%s, %s)' % (self.tag, self.combine_fn) def with_prefix(self, prefix): - return CombiningValueStateTag(prefix + self.tag, self.combine_fn) + return _CombiningValueStateTag(prefix + self.tag, self.combine_fn) -class ListStateTag(StateTag): +class _ListStateTag(_StateTag): """StateTag pointing to a list of elements.""" def __repr__(self): return 'ListStateTag(%s)' % self.tag def with_prefix(self, prefix): - return ListStateTag(prefix + self.tag) + return _ListStateTag(prefix + self.tag) -class WatermarkHoldStateTag(StateTag): +class _WatermarkHoldStateTag(_StateTag): def __init__(self, tag, timestamp_combiner_impl): - super(WatermarkHoldStateTag, self).__init__(tag) + super(_WatermarkHoldStateTag, self).__init__(tag) self.timestamp_combiner_impl = timestamp_combiner_impl def __repr__(self): @@ -109,8 +109,8 @@ def __repr__(self): self.timestamp_combiner_impl) def with_prefix(self, prefix): - return WatermarkHoldStateTag(prefix + self.tag, - self.timestamp_combiner_impl) + return _WatermarkHoldStateTag(prefix + self.tag, + self.timestamp_combiner_impl) # pylint: disable=unused-argument @@ -251,7 +251,7 @@ class AfterWatermark(TriggerFn): late: if not None, a speculative trigger to repeatedly evaluate after the watermark passes the end of the window """ - LATE_TAG = CombiningValueStateTag('is_late', any) + LATE_TAG = _CombiningValueStateTag('is_late', any) def __init__(self, early=None, late=None): self.early = Repeatedly(early) if early else None @@ -355,7 +355,7 @@ def to_runner_api(self, context): class AfterCount(TriggerFn): """Fire when there are at least count elements in this window pane.""" - COUNT_TAG = CombiningValueStateTag('count', combiners.CountCombineFn()) + COUNT_TAG = _CombiningValueStateTag('count', combiners.CountCombineFn()) def __init__(self, count): self.count = count @@ -432,7 +432,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class ParallelTriggerFn(TriggerFn): +class _ParallelTriggerFn(TriggerFn): __metaclass__ = ABCMeta @@ -506,7 +506,7 @@ def to_runner_api(self, context): raise NotImplementedError(self) -class AfterAny(ParallelTriggerFn): +class AfterAny(_ParallelTriggerFn): """Fires when any subtrigger fires. Also finishes when any subtrigger finishes. @@ -514,7 +514,7 @@ class AfterAny(ParallelTriggerFn): combine_op = any -class AfterAll(ParallelTriggerFn): +class AfterAll(_ParallelTriggerFn): """Fires when all subtriggers have fired. Also finishes when all subtriggers have finished. @@ -524,7 +524,7 @@ class AfterAll(ParallelTriggerFn): class AfterEach(TriggerFn): - INDEX_TAG = CombiningValueStateTag('index', ( + INDEX_TAG = _CombiningValueStateTag('index', ( lambda indices: 0 if not indices else max(indices))) def __init__(self, *triggers): @@ -711,7 +711,7 @@ class MergeableStateAdapter(SimpleState): # TODO(robertwb): A similar indirection could be used for sliding windows # or other window_fns when a single element typically belongs to many windows. - WINDOW_IDS = ValueStateTag('window_ids') + WINDOW_IDS = _ValueStateTag('window_ids') def __init__(self, raw_state): self.raw_state = raw_state @@ -726,7 +726,7 @@ def clear_timer(self, window, name, time_domain): self.raw_state.clear_timer(window_id, name, time_domain) def add_state(self, window, tag, value): - if isinstance(tag, ValueStateTag): + if isinstance(tag, _ValueStateTag): raise ValueError( 'Merging requested for non-mergeable state tag: %r.' % tag) self.raw_state.add_state(self._get_id(window), tag, value) @@ -734,10 +734,10 @@ def add_state(self, window, tag, value): def get_state(self, window, tag): values = [self.raw_state.get_state(window_id, tag) for window_id in self._get_ids(window)] - if isinstance(tag, ValueStateTag): + if isinstance(tag, _ValueStateTag): raise ValueError( 'Merging requested for non-mergeable state tag: %r.' % tag) - elif isinstance(tag, CombiningValueStateTag): + elif isinstance(tag, _CombiningValueStateTag): # TODO(robertwb): Strip combine_fn.extract_output from raw_state tag. if not values: accumulator = tag.combine_fn.create_accumulator() @@ -747,9 +747,9 @@ def get_state(self, window, tag): accumulator = tag.combine_fn.merge_accumulators(values) # TODO(robertwb): Store the merged value in the first tag. return tag.combine_fn.extract_output(accumulator) - elif isinstance(tag, ListStateTag): + elif isinstance(tag, _ListStateTag): return [v for vs in values for v in vs] - elif isinstance(tag, WatermarkHoldStateTag): + elif isinstance(tag, _WatermarkHoldStateTag): return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag) @@ -904,15 +904,15 @@ class GeneralTriggerDriver(TriggerDriver): Suitable for all variants of Windowing. """ - ELEMENTS = ListStateTag('elements') - TOMBSTONE = CombiningValueStateTag('tombstone', combiners.CountCombineFn()) + ELEMENTS = _ListStateTag('elements') + TOMBSTONE = _CombiningValueStateTag('tombstone', combiners.CountCombineFn()) def __init__(self, windowing): self.window_fn = windowing.windowfn self.timestamp_combiner_impl = TimestampCombiner.get_impl( windowing.timestamp_combiner, self.window_fn) # pylint: disable=invalid-name - self.WATERMARK_HOLD = WatermarkHoldStateTag( + self.WATERMARK_HOLD = _WatermarkHoldStateTag( 'watermark', self.timestamp_combiner_impl) # pylint: enable=invalid-name self.trigger_fn = windowing.triggerfn @@ -1035,7 +1035,7 @@ def __init__(self, defensive_copy=True): self.defensive_copy = defensive_copy def set_global_state(self, tag, value): - assert isinstance(tag, ValueStateTag) + assert isinstance(tag, _ValueStateTag) if self.defensive_copy: value = copy.deepcopy(value) self.global_state[tag.tag] = value @@ -1055,26 +1055,26 @@ def get_window(self, window_id): def add_state(self, window, tag, value): if self.defensive_copy: value = copy.deepcopy(value) - if isinstance(tag, ValueStateTag): + if isinstance(tag, _ValueStateTag): self.state[window][tag.tag] = value - elif isinstance(tag, CombiningValueStateTag): + elif isinstance(tag, _CombiningValueStateTag): self.state[window][tag.tag].append(value) - elif isinstance(tag, ListStateTag): + elif isinstance(tag, _ListStateTag): self.state[window][tag.tag].append(value) - elif isinstance(tag, WatermarkHoldStateTag): + elif isinstance(tag, _WatermarkHoldStateTag): self.state[window][tag.tag].append(value) else: raise ValueError('Invalid tag.', tag) def get_state(self, window, tag): values = self.state[window][tag.tag] - if isinstance(tag, ValueStateTag): + if isinstance(tag, _ValueStateTag): return values - elif isinstance(tag, CombiningValueStateTag): + elif isinstance(tag, _CombiningValueStateTag): return tag.combine_fn.apply(values) - elif isinstance(tag, ListStateTag): + elif isinstance(tag, _ListStateTag): return values - elif isinstance(tag, WatermarkHoldStateTag): + elif isinstance(tag, _WatermarkHoldStateTag): return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag)