Fetching contributors…
Cannot retrieve contributors at this time
208 lines (162 sloc) 7.01 KB
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for testing Beam pipelines."""
from __future__ import absolute_import
import collections
import glob
import tempfile
from builtins import object
from apache_beam import pvalue
from apache_beam.transforms import window
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.core import Map
from apache_beam.transforms.core import ParDo
from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.util import CoGroupByKey
from apache_beam.utils.annotations import experimental
__all__ = [
# open_shards is internal and has no backwards compatibility guarantees.
class BeamAssertException(Exception):
"""Exception raised by matcher classes used by assert_that transform."""
# Used for reifying timestamps and windows for assert_that matchers.
TestWindowedValue = collections.namedtuple(
'TestWindowedValue', 'value timestamp windows')
def contains_in_any_order(iterable):
"""Creates an object that matches another iterable if they both have the
same count of items.
iterable: An iterable of hashable objects.
class InAnyOrder(object):
def __init__(self, iterable):
self._counter = collections.Counter(iterable)
def __eq__(self, other):
return self._counter == collections.Counter(other)
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash(self._counter)
def __repr__(self):
return "InAnyOrder(%s)" % self._counter
return InAnyOrder(iterable)
def equal_to_per_window(expected_window_to_elements):
"""Matcher used by assert_that to check on values for specific windows.
expected_window_to_elements: A dictionary where the keys are the windows
to check and the values are the elements associated with each window.
def matcher(elements):
actual_elements_in_window, window = elements
if window in expected_window_to_elements:
expected_elements_in_window = list(
sorted_expected = sorted(expected_elements_in_window)
sorted_actual = sorted(actual_elements_in_window)
if sorted_expected != sorted_actual:
# Results for the same window don't necessarily come all
# at once. Hence the same actual window may contain only
# subsets of the expected elements for the window.
# For example, in the presence of early triggers.
if all(elem in sorted_expected for elem in sorted_actual) is False:
raise BeamAssertException(
'Failed assert: %r not in %r' % (sorted_actual, sorted_expected))
return matcher
# Note that equal_to always sorts the expected and actual since what we
# compare are PCollections for which there is no guaranteed order.
# However the sorting does not go beyond top level therefore [1,2] and [2,1]
# are considered equal and [[1,2]] and [[2,1]] are not.
def equal_to(expected):
expected = list(expected)
def _equal(actual):
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected != sorted_actual:
raise BeamAssertException(
'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
return _equal
def is_empty():
def _empty(actual):
actual = list(actual)
if actual:
raise BeamAssertException(
'Failed assert: [] == %r' % actual)
return _empty
def assert_that(actual, matcher, label='assert_that',
reify_windows=False, use_global_window=True):
"""A PTransform that checks a PCollection has an expected value.
Note that assert_that should be used only for testing pipelines since the
check relies on materializing the entire PCollection being checked.
actual: A PCollection.
matcher: A matcher function taking as argument the actual value of a
materialized PCollection. The matcher validates this actual value against
expectations and raises BeamAssertException if they are not met.
label: Optional string label. This is needed in case several assert_that
transforms are introduced in the same pipeline.
reify_windows: If True, matcher is passed a list of TestWindowedValue.
use_global_window: If False, matcher is passed a dictionary of
(k, v) = (window, elements in the window).
assert isinstance(actual, pvalue.PCollection)
class ReifyTimestampWindow(DoFn):
def process(self, element, timestamp=DoFn.TimestampParam,
# This returns TestWindowedValue instead of
# beam.utils.windowed_value.WindowedValue because ParDo will extract
# the timestamp and window out of the latter.
return [TestWindowedValue(element, timestamp, [window])]
class AddWindow(DoFn):
def process(self, element, window=DoFn.WindowParam):
yield element, window
class AssertThat(PTransform):
def expand(self, pcoll):
if reify_windows:
pcoll = pcoll | ParDo(ReifyTimestampWindow())
keyed_singleton = pcoll.pipeline | Create([(None, None)])
if use_global_window:
pcoll = pcoll | WindowInto(window.GlobalWindows())
keyed_actual = pcoll | "ToVoidKey" >> Map(lambda v: (None, v))
plain_actual = ((keyed_singleton, keyed_actual)
| "Group" >> CoGroupByKey()
| "Unkey" >> Map(lambda k_values: k_values[1][1]))
if not use_global_window:
plain_actual = plain_actual | "AddWindow" >> ParDo(AddWindow())
plain_actual = plain_actual | "Match" >> Map(matcher)
def default_label(self):
return label
actual | AssertThat() # pylint: disable=expression-not-assigned
def open_shards(glob_pattern):
"""Returns a composite file of all shards matching the given glob pattern."""
with tempfile.NamedTemporaryFile(delete=False) as out_file:
for shard in glob.glob(glob_pattern):
with open(shard) as in_file:
concatenated_file_name =
return open(concatenated_file_name, 'rb')