/
core.py
2072 lines (1687 loc) · 74.9 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map."""
from __future__ import absolute_import
import copy
import logging
import random
import re
import types
from builtins import map
from builtins import object
from builtins import range
from future.builtins import filter
from past.builtins import unicode
from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
from apache_beam.transforms import userstate
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import PTransformWithSideInputs
from apache_beam.transforms.userstate import StateSpec
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.typehints import KV
from apache_beam.typehints import Any
from apache_beam.typehints import Iterable
from apache_beam.typehints import Union
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.decorators import TypeCheckError
from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.decorators import get_type_hints
from apache_beam.typehints.decorators import getfullargspec
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.typehints.typehints import is_consistent_with
from apache_beam.utils import urns
__all__ = [
'DoFn',
'CombineFn',
'PartitionFn',
'ParDo',
'FlatMap',
'Map',
'Filter',
'CombineGlobally',
'CombinePerKey',
'CombineValues',
'GroupByKey',
'Partition',
'Windowing',
'WindowInto',
'Flatten',
'Create',
'Impulse',
]
# Type variables
T = typehints.TypeVariable('T')
K = typehints.TypeVariable('K')
V = typehints.TypeVariable('V')
class DoFnContext(object):
"""A context available to all methods of DoFn instance."""
pass
class DoFnProcessContext(DoFnContext):
"""A processing context passed to DoFn process() during execution.
Experimental; no backwards-compatibility guarantees.
Most importantly, a DoFn.process method will access context.element
to get the element it is supposed to process.
Attributes:
label: label of the ParDo whose element is being processed.
element: element being processed
(in process method only; always None in start_bundle and finish_bundle)
timestamp: timestamp of the element
(in process method only; always None in start_bundle and finish_bundle)
windows: windows of the element
(in process method only; always None in start_bundle and finish_bundle)
state: a DoFnState object, which holds the runner's internal state
for this element.
Not used by the pipeline code.
"""
def __init__(self, label, element=None, state=None):
"""Initialize a processing context object with an element and state.
The element represents one value from a PCollection that will be accessed
by a DoFn object during pipeline execution, and state is an arbitrary object
where counters and other pipeline state information can be passed in.
DoFnProcessContext objects are also used as inputs to PartitionFn instances.
Args:
label: label of the PCollection whose element is being processed.
element: element of a PCollection being processed using this context.
state: a DoFnState object with state to be passed in to the DoFn object.
"""
self.label = label
self.state = state
if element is not None:
self.set_element(element)
def set_element(self, windowed_value):
if windowed_value is None:
# Not currently processing an element.
if hasattr(self, 'element'):
del self.element
del self.timestamp
del self.windows
else:
self.element = windowed_value.value
self.timestamp = windowed_value.timestamp
self.windows = windowed_value.windows
class ProcessContinuation(object):
"""An object that may be produced as the last element of a process method
invocation.
Experimental; no backwards-compatibility guarantees.
If produced, indicates that there is more work to be done for the current
input element.
"""
def __init__(self, resume_delay=0):
"""Initializes a ProcessContinuation object.
Args:
resume_delay: indicates the minimum time, in seconds, that should elapse
before re-invoking process() method for resuming the invocation of the
current element.
"""
self.resume_delay = resume_delay
@staticmethod
def resume(resume_delay=0):
"""A convenient method that produces a ``ProcessContinuation``.
Args:
resume_delay: delay after which processing current element should be
resumed.
Returns: a ``ProcessContinuation`` for signalling the runner that current
input element has not been fully processed and should be resumed later.
"""
return ProcessContinuation(resume_delay=resume_delay)
class RestrictionProvider(object):
"""Provides methods for generating and manipulating restrictions.
This class should be implemented to support Splittable ``DoFn``s in Python
SDK. See https://s.apache.org/splittable-do-fn for more details about
Splittable ``DoFn``s.
To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``
method of that class should have exactly one parameter whose default value is
an instance of ``RestrictionProvider``.
The provided ``RestrictionProvider`` instance must provide suitable overrides
for the following methods.
* create_tracker()
* initial_restriction()
Optionally, ``RestrictionProvider`` may override default implementations of
following methods.
* restriction_coder()
* split()
** Pausing and resuming processing of an element **
As the last element produced by the iterator returned by the
``DoFn.process()`` method, a Splittable ``DoFn`` may return an object of type
``ProcessContinuation``.
If provided, ``ProcessContinuation`` object specifies that runner should
later re-invoke ``DoFn.process()`` method to resume processing the current
element and the manner in which the re-invocation should be performed. A
``ProcessContinuation`` object must only be specified as the last element of
the iterator. If a ``ProcessContinuation`` object is not provided the runner
will assume that the current input element has been fully processed.
** Updating output watermark **
``DoFn.process()`` method of Splittable ``DoFn``s could contain a parameter
with default value ``DoFn.WatermarkReporterParam``. If specified this asks the
runner to provide a function that can be used to give the runner a
(best-effort) lower bound about the timestamps of future output associated
with the current element processed by the ``DoFn``. If the ``DoFn`` has
multiple outputs, the watermark applies to all of them. Provided function must
be invoked with a single parameter of type ``Timestamp`` or as an integer that
gives the watermark in number of seconds.
"""
def create_tracker(self, restriction):
"""Produces a new ``RestrictionTracker`` for the given restriction.
Args:
restriction: an object that defines a restriction as identified by a
Splittable ``DoFn`` that utilizes the current ``RestrictionProvider``.
For example, a tuple that gives a range of positions for a Splittable
``DoFn`` that reads files based on byte positions.
Returns: an object of type ``RestrictionTracker``.
"""
raise NotImplementedError
def initial_restriction(self, element):
"""Produces an initial restriction for the given element."""
raise NotImplementedError
def split(self, element, restriction):
"""Splits the given element and restriction.
Returns an iterator of restrictions. The total set of elements produced by
reading input element for each of the returned restrictions should be the
same as the total set of elements produced by reading the input element for
the input restriction.
TODO(chamikara): give suitable hints for performing splitting, for example
number of parts or size in bytes.
"""
yield restriction
def restriction_coder(self):
"""Returns a ``Coder`` for restrictions.
Returned``Coder`` will be used for the restrictions produced by the current
``RestrictionProvider``.
Returns:
an object of type ``Coder``.
"""
return coders.registry.get_coder(object)
def get_function_arguments(obj, func):
"""Return the function arguments based on the name provided. If they have
a _inspect_function attached to the class then use that otherwise default
to the modified version of python inspect library.
"""
func_name = '_inspect_%s' % func
if hasattr(obj, func_name):
f = getattr(obj, func_name)
return f()
f = getattr(obj, func)
return getfullargspec(f)
class _DoFnParam(object):
"""DoFn parameter."""
def __init__(self, param_id):
self.param_id = param_id
def __eq__(self, other):
if type(self) == type(other):
return self.param_id == other.param_id
return False
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash(self.param_id)
def __repr__(self):
return self.param_id
class _StateDoFnParam(_DoFnParam):
"""State DoFn parameter."""
def __init__(self, state_spec):
if not isinstance(state_spec, StateSpec):
raise ValueError("DoFn.StateParam expected StateSpec object.")
self.state_spec = state_spec
self.param_id = 'StateParam(%s)' % state_spec.name
class _TimerDoFnParam(_DoFnParam):
"""Timer DoFn parameter."""
def __init__(self, timer_spec):
if not isinstance(timer_spec, TimerSpec):
raise ValueError("DoFn.TimerParam expected TimerSpec object.")
self.timer_spec = timer_spec
self.param_id = 'TimerParam(%s)' % timer_spec.name
class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""A function object used by a transform with custom processing.
The ParDo transform is such a transform. The ParDo.apply
method will take an object of type DoFn and apply it to all elements of a
PCollection object.
In order to have concrete DoFn objects one has to subclass from DoFn and
define the desired behavior (start_bundle/finish_bundle and process) or wrap a
callable object using the CallableWrapperDoFn class.
"""
# Parameters that can be used in the .process() method.
ElementParam = _DoFnParam('ElementParam')
SideInputParam = _DoFnParam('SideInputParam')
TimestampParam = _DoFnParam('TimestampParam')
WindowParam = _DoFnParam('WindowParam')
WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
WindowParam, WatermarkReporterParam]
# Parameters to access state and timers. Not restricted to use only in the
# .process() method. Usage: DoFn.StateParam(state_spec),
# DoFn.TimerParam(timer_spec).
StateParam = _StateDoFnParam
TimerParam = _TimerDoFnParam
@staticmethod
def from_callable(fn):
return CallableWrapperDoFn(fn)
def default_label(self):
return self.__class__.__name__
def process(self, element, *args, **kwargs):
"""Method to use for processing elements.
This is invoked by ``DoFnRunner`` for each element of a input
``PCollection``.
If specified, following default arguments are used by the ``DoFnRunner`` to
be able to pass the parameters correctly.
``DoFn.ElementParam``: element to be processed.
``DoFn.SideInputParam``: a side input that may be used when processing.
``DoFn.TimestampParam``: timestamp of the input element.
``DoFn.WindowParam``: ``Window`` the input element belongs to.
A ``RestrictionProvider`` instance: an ``iobase.RestrictionTracker`` will be
provided here to allow treatment as a Splittable `DoFn``.
``DoFn.WatermarkReporterParam``: a function that can be used to report
output watermark of Splittable ``DoFn`` implementations.
Args:
element: The element to be processed
*args: side inputs
**kwargs: other keyword arguments.
"""
raise NotImplementedError
def start_bundle(self):
"""Called before a bundle of elements is processed on a worker.
Elements to be processed are split into bundles and distributed
to workers. Before a worker calls process() on the first element
of its bundle, it calls this method.
"""
pass
def finish_bundle(self):
"""Called after a bundle of elements is processed on a worker.
"""
pass
def get_function_arguments(self, func):
return get_function_arguments(self, func)
# TODO(sourabhbajaj): Do we want to remove the responsibility of these from
# the DoFn or maybe the runner
def infer_output_type(self, input_type):
# TODO(robertwb): Side inputs types.
# TODO(robertwb): Assert compatibility with input type hint?
return self._strip_output_annotations(
trivial_inference.infer_return_type(self.process, [input_type]))
def _strip_output_annotations(self, type_hint):
annotations = (TimestampedValue, WindowedValue, pvalue.TaggedOutput)
# TODO(robertwb): These should be parameterized types that the
# type inferencer understands.
if (type_hint in annotations
or trivial_inference.element_type(type_hint) in annotations):
return Any
return type_hint
def _process_argspec_fn(self):
"""Returns the Python callable that will eventually be invoked.
This should ideally be the user-level function that is called with
the main and (if any) side inputs, and is used to relate the type
hint parameters with the input parameters (e.g., by argument name).
"""
return self.process
def is_process_bounded(self):
"""Checks if an object is a bound method on an instance."""
if not isinstance(self.process, types.MethodType):
return False # Not a method
if self.process.__self__ is None:
return False # Method is not bound
if issubclass(self.process.__self__.__class__, type) or \
self.process.__self__.__class__ is type:
return False # Method is a classmethod
return True
urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_DOFN)
def _fn_takes_side_inputs(fn):
try:
argspec = getfullargspec(fn)
except TypeError:
# We can't tell; maybe it does.
return True
is_bound = isinstance(fn, types.MethodType) and fn.__self__ is not None
try:
varkw = argspec.varkw
kwonlyargs = argspec.kwonlyargs
except AttributeError: # Python 2
varkw = argspec.keywords
kwonlyargs = []
return (len(argspec.args) + len(kwonlyargs) > 1 + is_bound or
argspec.varargs or varkw)
class CallableWrapperDoFn(DoFn):
"""For internal use only; no backwards-compatibility guarantees.
A DoFn (function) object wrapping a callable object.
The purpose of this class is to conveniently wrap simple functions and use
them in transforms.
"""
def __init__(self, fn):
"""Initializes a CallableWrapperDoFn object wrapping a callable.
Args:
fn: A callable object.
Raises:
TypeError: if fn parameter is not a callable type.
"""
if not callable(fn):
raise TypeError('Expected a callable object instead of: %r' % fn)
self._fn = fn
if isinstance(fn, (
types.BuiltinFunctionType, types.MethodType, types.FunctionType)):
self.process = fn
else:
# For cases such as set / list where fn is callable but not a function
self.process = lambda element: fn(element)
super(CallableWrapperDoFn, self).__init__()
def display_data(self):
# If the callable has a name, then it's likely a function, and
# we show its name.
# Otherwise, it might be an instance of a callable class. We
# show its class.
display_data_value = (self._fn.__name__ if hasattr(self._fn, '__name__')
else self._fn.__class__)
return {'fn': DisplayDataItem(display_data_value,
label='Transform Function')}
def __repr__(self):
return 'CallableWrapperDoFn(%s)' % self._fn
def default_type_hints(self):
type_hints = get_type_hints(self._fn)
# If the fn was a DoFn annotated with a type-hint that hinted a return
# type compatible with Iterable[Any], then we strip off the outer
# container type due to the 'flatten' portion of FlatMap.
# TODO(robertwb): Should we require an iterable specification for FlatMap?
if type_hints.output_types:
args, kwargs = type_hints.output_types
if len(args) == 1 and is_consistent_with(args[0], Iterable[Any]):
type_hints = type_hints.copy()
type_hints.set_output_types(element_type(args[0]), **kwargs)
return type_hints
def infer_output_type(self, input_type):
return self._strip_output_annotations(
trivial_inference.infer_return_type(self._fn, [input_type]))
def _process_argspec_fn(self):
return getattr(self._fn, '_argspec_fn', self._fn)
def _inspect_process(self):
return getfullargspec(self._process_argspec_fn())
class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""A function object used by a Combine transform with custom processing.
A CombineFn specifies how multiple values in all or part of a PCollection can
be merged into a single value---essentially providing the same kind of
information as the arguments to the Python "reduce" builtin (except for the
input argument, which is an instance of CombineFnProcessContext). The
combining process proceeds as follows:
1. Input values are partitioned into one or more batches.
2. For each batch, the create_accumulator method is invoked to create a fresh
initial "accumulator" value representing the combination of zero values.
3. For each input value in the batch, the add_input method is invoked to
combine more values with the accumulator for that batch.
4. The merge_accumulators method is invoked to combine accumulators from
separate batches into a single combined output accumulator value, once all
of the accumulators have had all the input value in their batches added to
them. This operation is invoked repeatedly, until there is only one
accumulator value left.
5. The extract_output operation is invoked on the final accumulator to get
the output value.
Note: If this **CombineFn** is used with a transform that has defaults,
**apply** will be called with an empty list at expansion time to get the
default value.
"""
def default_label(self):
return self.__class__.__name__
def create_accumulator(self, *args, **kwargs):
"""Return a fresh, empty accumulator for the combine operation.
Args:
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
raise NotImplementedError(str(self))
def add_input(self, accumulator, element, *args, **kwargs):
"""Return result of folding element into accumulator.
CombineFn implementors must override add_input.
Args:
accumulator: the current accumulator
element: the element to add
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
raise NotImplementedError(str(self))
def add_inputs(self, accumulator, elements, *args, **kwargs):
"""Returns the result of folding each element in elements into accumulator.
This is provided in case the implementation affords more efficient
bulk addition of elements. The default implementation simply loops
over the inputs invoking add_input for each one.
Args:
accumulator: the current accumulator
elements: the elements to add
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
for element in elements:
accumulator = self.add_input(accumulator, element, *args, **kwargs)
return accumulator
def merge_accumulators(self, accumulators, *args, **kwargs):
"""Returns the result of merging several accumulators
to a single accumulator value.
Args:
accumulators: the accumulators to merge
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
raise NotImplementedError(str(self))
def extract_output(self, accumulator, *args, **kwargs):
"""Return result of converting accumulator into the output value.
Args:
accumulator: the final accumulator value computed by this CombineFn
for the entire input key or PCollection.
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
raise NotImplementedError(str(self))
def apply(self, elements, *args, **kwargs):
"""Returns result of applying this CombineFn to the input values.
Args:
elements: the set of values to combine.
*args: Additional arguments and side inputs.
**kwargs: Additional arguments and side inputs.
"""
return self.extract_output(
self.add_inputs(
self.create_accumulator(*args, **kwargs), elements,
*args, **kwargs),
*args, **kwargs)
def for_input_type(self, input_type):
"""Returns a specialized implementation of self, if it exists.
Otherwise, returns self.
Args:
input_type: the type of input elements.
"""
return self
@staticmethod
def from_callable(fn):
return CallableWrapperCombineFn(fn)
@staticmethod
def maybe_from_callable(fn):
if isinstance(fn, CombineFn):
return fn
elif callable(fn):
return CallableWrapperCombineFn(fn)
else:
raise TypeError('Expected a CombineFn or callable, got %r' % fn)
def get_accumulator_coder(self):
return coders.registry.get_coder(object)
urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_COMBINE_FN)
class CallableWrapperCombineFn(CombineFn):
"""For internal use only; no backwards-compatibility guarantees.
A CombineFn (function) object wrapping a callable object.
The purpose of this class is to conveniently wrap simple functions and use
them in Combine transforms.
"""
_EMPTY = object()
def __init__(self, fn):
"""Initializes a CallableFn object wrapping a callable.
Args:
fn: A callable object that reduces elements of an iterable to a single
value (like the builtins sum and max). This callable must be capable of
receiving the kind of values it generates as output in its input, and
for best results, its operation must be commutative and associative.
Raises:
TypeError: if fn parameter is not a callable type.
"""
if not callable(fn):
raise TypeError('Expected a callable object instead of: %r' % fn)
super(CallableWrapperCombineFn, self).__init__()
self._fn = fn
def display_data(self):
return {'fn_dd': self._fn}
def __repr__(self):
return "CallableWrapperCombineFn(%s)" % self._fn
def create_accumulator(self, *args, **kwargs):
return self._EMPTY
def add_input(self, accumulator, element, *args, **kwargs):
if accumulator is self._EMPTY:
return element
return self._fn([accumulator, element], *args, **kwargs)
def add_inputs(self, accumulator, elements, *args, **kwargs):
if accumulator is self._EMPTY:
return self._fn(elements, *args, **kwargs)
elif isinstance(elements, (list, tuple)):
return self._fn([accumulator] + list(elements), *args, **kwargs)
def union():
yield accumulator
for e in elements:
yield e
return self._fn(union(), *args, **kwargs)
def merge_accumulators(self, accumulators, *args, **kwargs):
filter_fn = lambda x: x is not self._EMPTY
class ReiterableNonEmptyAccumulators(object):
def __iter__(self):
return filter(filter_fn, accumulators)
# It's (weakly) assumed that self._fn is associative.
return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs)
def extract_output(self, accumulator, *args, **kwargs):
return self._fn(()) if accumulator is self._EMPTY else accumulator
def default_type_hints(self):
fn_hints = get_type_hints(self._fn)
if fn_hints.input_types is None:
return fn_hints
else:
# fn(Iterable[V]) -> V becomes CombineFn(V) -> V
input_args, input_kwargs = fn_hints.input_types
if not input_args:
if len(input_kwargs) == 1:
input_args, input_kwargs = tuple(input_kwargs.values()), {}
else:
raise TypeError('Combiner input type must be specified positionally.')
if not is_consistent_with(input_args[0], Iterable[Any]):
raise TypeCheckError(
'All functions for a Combine PTransform must accept a '
'single argument compatible with: Iterable[Any]. '
'Instead a function with input type: %s was received.'
% input_args[0])
input_args = (element_type(input_args[0]),) + input_args[1:]
# TODO(robertwb): Assert output type is consistent with input type?
hints = fn_hints.copy()
hints.set_input_types(*input_args, **input_kwargs)
return hints
def for_input_type(self, input_type):
# Avoid circular imports.
from apache_beam.transforms import cy_combiners
if self._fn is any:
return cy_combiners.AnyCombineFn()
elif self._fn is all:
return cy_combiners.AllCombineFn()
else:
known_types = {
(sum, int): cy_combiners.SumInt64Fn(),
(min, int): cy_combiners.MinInt64Fn(),
(max, int): cy_combiners.MaxInt64Fn(),
(sum, float): cy_combiners.SumFloatFn(),
(min, float): cy_combiners.MinFloatFn(),
(max, float): cy_combiners.MaxFloatFn(),
}
return known_types.get((self._fn, input_type), self)
class PartitionFn(WithTypeHints):
"""A function object used by a Partition transform.
A PartitionFn specifies how individual values in a PCollection will be placed
into separate partitions, indexed by an integer.
"""
def default_label(self):
return self.__class__.__name__
def partition_for(self, element, num_partitions, *args, **kwargs):
"""Specify which partition will receive this element.
Args:
element: An element of the input PCollection.
num_partitions: Number of partitions, i.e., output PCollections.
*args: optional parameters and side inputs.
**kwargs: optional parameters and side inputs.
Returns:
An integer in [0, num_partitions).
"""
pass
class CallableWrapperPartitionFn(PartitionFn):
"""For internal use only; no backwards-compatibility guarantees.
A PartitionFn object wrapping a callable object.
Instances of this class wrap simple functions for use in Partition operations.
"""
def __init__(self, fn):
"""Initializes a PartitionFn object wrapping a callable.
Args:
fn: A callable object, which should accept the following arguments:
element - element to assign to a partition.
num_partitions - number of output partitions.
and may accept additional arguments and side inputs.
Raises:
TypeError: if fn is not a callable type.
"""
if not callable(fn):
raise TypeError('Expected a callable object instead of: %r' % fn)
self._fn = fn
def partition_for(self, element, num_partitions, *args, **kwargs):
return self._fn(element, num_partitions, *args, **kwargs)
class ParDo(PTransformWithSideInputs):
"""A :class:`ParDo` transform.
Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a
:class:`DoFn` to each element and returning the accumulated results into an
output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is
not fixed as long as the :class:`DoFn` can deal with it. In reality the type
is restrained to some extent because the elements sometimes must be persisted
to external storage. See the :meth:`.expand()` method comments for a
detailed description of all possible arguments.
Note that the :class:`DoFn` must return an iterable for each element of the
input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to
use the ``yield`` keyword in the process method.
Args:
pcoll (~apache_beam.pvalue.PCollection):
a :class:`~apache_beam.pvalue.PCollection` to be processed.
fn (DoFn): a :class:`DoFn` object to be applied to each element
of **pcoll** argument.
*args: positional arguments passed to the :class:`DoFn` object.
**kwargs: keyword arguments passed to the :class:`DoFn` object.
Note that the positional and keyword arguments will be processed in order
to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as
side inputs to the transform. During pipeline execution whenever the
:class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets
called) the :class:`~apache_beam.pvalue.PCollection` arguments will be
replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the
exact positions where they appear in the argument lists.
"""
def __init__(self, fn, *args, **kwargs):
super(ParDo, self).__init__(fn, *args, **kwargs)
# TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
self.dofn = self.fn
self.output_tags = set()
if not isinstance(self.fn, DoFn):
raise TypeError('ParDo must be called with a DoFn instance.')
# Validate the DoFn by creating a DoFnSignature
from apache_beam.runners.common import DoFnSignature
self._signature = DoFnSignature(self.fn)
def default_type_hints(self):
return self.fn.get_type_hints()
def infer_output_type(self, input_type):
return trivial_inference.element_type(
self.fn.infer_output_type(input_type))
def make_fn(self, fn):
if isinstance(fn, DoFn):
return fn
return CallableWrapperDoFn(fn)
def _process_argspec_fn(self):
return self.fn._process_argspec_fn()
def display_data(self):
return {'fn': DisplayDataItem(self.fn.__class__,
label='Transform Function'),
'fn_dd': self.fn}
def expand(self, pcoll):
# In the case of a stateful DoFn, warn if the key coder is not
# deterministic.
if self._signature.is_stateful_dofn():
kv_type_hint = pcoll.element_type
if kv_type_hint and kv_type_hint != typehints.Any:
coder = coders.registry.get_coder(kv_type_hint)
if not coder.is_kv_coder():
raise ValueError(
'Input elements to the transform %s with stateful DoFn must be '
'key-value pairs.' % self)
key_coder = coder.key_coder()
else:
key_coder = coders.registry.get_coder(typehints.Any)
if not key_coder.is_deterministic():
logging.warning(
'Key coder %s for transform %s with stateful DoFn may not '
'be deterministic. This may cause incorrect behavior for complex '
'key types. Consider adding an input type hint for this transform.',
key_coder, self)
return pvalue.PCollection(pcoll.pipeline)
def with_outputs(self, *tags, **main_kw):
"""Returns a tagged tuple allowing access to the outputs of a
:class:`ParDo`.
The resulting object supports access to the
:class:`~apache_beam.pvalue.PCollection` associated with a tag
(e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags
(e.g. ``for tag in o: ...``).
Args:
*tags: if non-empty, list of valid tags. If a list of valid tags is given,
it will be an error to use an undeclared tag later in the pipeline.
**main_kw: dictionary empty or with one key ``'main'`` defining the tag to
be used for the main output (which will not have a tag associated with
it).
Returns:
~apache_beam.pvalue.DoOutputsTuple: An object of type
:class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all
the outputs of a :class:`ParDo` transform and allows accessing the
individual :class:`~apache_beam.pvalue.PCollection` s for each output
using an ``object.tag`` syntax.
Raises:
~exceptions.TypeError: if the **self** object is not a
:class:`~apache_beam.pvalue.PCollection` that is the result of a
:class:`ParDo` transform.
~exceptions.ValueError: if **main_kw** contains any key other than
``'main'``.
"""
main_tag = main_kw.pop('main', None)
if main_kw:
raise ValueError('Unexpected keyword arguments: %s' %
list(main_kw))
return _MultiParDo(self, tags, main_tag)
def _pardo_fn_data(self):
si_tags_and_types = None
windowing = None
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
def to_runner_api_parameter(self, context):
assert isinstance(self, ParDo), \
"expected instance of ParDo, but got %s" % self.__class__
picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
state_specs, timer_specs = userstate.get_dofn_specs(self.fn)
return (
common_urns.primitives.PAR_DO.urn,
beam_runner_api_pb2.ParDoPayload(
do_fn=beam_runner_api_pb2.SdkFunctionSpec(
environment_id=context.default_environment_id(),
spec=beam_runner_api_pb2.FunctionSpec(
urn=python_urns.PICKLED_DOFN_INFO,
payload=picked_pardo_fn_data)),
state_specs={spec.name: spec.to_runner_api(context)
for spec in state_specs},
timer_specs={spec.name: spec.to_runner_api(context)
for spec in timer_specs},
# It'd be nice to name these according to their actual
# names/positions in the orignal argument list, but such a
# transformation is currently irreversible given how
# remove_objects_from_args and insert_values_in_args
# are currently implemented.
side_inputs={
"side%s" % ix: si.to_runner_api(context)
for ix, si in enumerate(self.side_inputs)}))
@PTransform.register_urn(
common_urns.primitives.PAR_DO.urn, beam_runner_api_pb2.ParDoPayload)
def from_runner_api_parameter(pardo_payload, context):
assert pardo_payload.do_fn.spec.urn == python_urns.PICKLED_DOFN_INFO
fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
pardo_payload.do_fn.spec.payload)
if si_tags_and_types:
raise NotImplementedError('explicit side input data')
elif windowing:
raise NotImplementedError('explicit windowing')
result = ParDo(fn, *args, **kwargs)
# This is an ordered list stored as a dict (see the comments in
# to_runner_api_parameter above).
indexed_side_inputs = [