Skip to content

Conversation

@ttanay
Copy link
Contributor

@ttanay ttanay commented Apr 2, 2019

Added Latest PTransform and Combine Fns for the Python SDK.
Latest PTransform is used to compute the element(s) with the
latest timestamp from a PCollection.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Run Portable_Python PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Run Python PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Run RAT PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Run Python_PVR_Flink PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Run Portable_Python PreCommit

@aaltay
Copy link
Member

aaltay commented Apr 3, 2019

R: @robinyqiu

@robinyqiu
Copy link
Contributor

Hi Tanay! Thanks for the contribution. Currently the Python preCommit are all failling. I can review the code while we wait for the preCommit to pass.

@ttanay
Copy link
Contributor Author

ttanay commented Apr 3, 2019

Hi @robinyqiu! Sorry I was AFK. I'll fix the failing Python Pre-Commit before the review. I'll ping you once the tests pass.

@ttanay ttanay force-pushed the beam-6695 branch 2 times, most recently from abb6583 to 8d9b97a Compare April 4, 2019 03:25
@ttanay
Copy link
Contributor Author

ttanay commented Apr 4, 2019

Hi @robinyqiu the tests have passed. It is ready for review :)

Added Latest PTransform and Combine Fns for the Python SDK.
Latest PTransform is used to compute the element(s) with the
latest timestamp from a PCollection.
@ttanay
Copy link
Contributor Author

ttanay commented Apr 4, 2019

Observation from working on this:
A PCollection of TimestampedValues can be created with a DoFn that adds the timestamp info along with the value in a TimestampedValue object, in a ParDo PTransform. This approach is used in the Java SDK.
But, this is not possible in the Python SDK because when the runner evaluates the ParDo, it converts the element of TimestampedValue to a WindowedValue with the same value and timestamp as the TimestampedValue object.
But, this value used for the WindowedValue object is not the TimestampedValue object, which should be the case.

I have a PR on my fork with a failing test to illustrate this: ttanay#6

I'm not sure whether a PCollection of TimestampedValues would be of much use. I found a tuple of (value, timestamp) to be a useful replacement for it.
But, I don't know whether there may be other cases where this may be needed.

class LatestTest(unittest.TestCase):

def test_globally(self):
l = [window.GlobalWindows.windowed_value(1, 100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe shuffle the example inputs a bit to make them look more random? Like [(2, 100), (3, 300), (1, 200)]? Same below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I'll make the change.

output = self.fn.extract_output(accumulator)
self.assertEquals(output, 1)

def test_input_type_violation(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test should go to LatestTest instead of LatestCombineFnTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test - test_input_type_violation is used to test the type imposed on the input to the CombineFn here.
The test - test_per_key_type_violation is used to check that the input is a KV pair in the DoFn - add_timestamp of class PerKey here.

I think test_per_key_type_violation can be written better to display the same behaviour. I'll make that change.

@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Run Portable_Python PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Run RAT PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Run Python_PVR_Flink PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Run Python PreCommit

1 similar comment
@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Run Python PreCommit

@ttanay
Copy link
Contributor Author

ttanay commented Apr 6, 2019

Hey @robinyqiu I have made the changes. PTAL. 🚀

@staticmethod
def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
_check_instance_type(KV[T, T], element)
(K, V) = element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider changing these local variables to (k, v) since K, V are type variables already defined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in a similar vein to element, timestamp etc, use key, value = element. Also I believe you do not need the paranthesis on the left.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank You @robinyqiu and @aaltay. I'll make the change.

| core.CombinePerKey(LatestCombineFn()))


@with_input_types(KV[T, T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using KV as type hint seems a bit weird to me because the input is really a (element, timestamp) tuple. Maybe we can just use something like Tuple[E, T] as input type and E as output type. For more about Python type hints, see: https://beam.apache.org/documentation/sdks/python-type-safety/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It should be a Tuple rather than a KV.

Also, only types int, float, long, Timestamp and Duration can be interpreted as timestamps[1] so they can be compared to window.MIN_TIMESTAMP in the accumulator. So, having the timestamp's type as T would be less restrictive.
Does it make sense to create a new type - TimestampT/TimestampType = Union[int, float, long, Timestamp, Duration] in typehints.py or the combiners module? This would make the input type Tuple[T, TimestampType].

[1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/timestamp.py#L71

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion. We can add the definition in this module.
Why is Duration also allowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Timestamp and Duration have an attribute - micros. This is used to compare the two.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/timestamp.py#L161-L165

from apache_beam.typehints import Union
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
from apache_beam.typehints.decorators import _check_instance_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions whose names start with underscore are intended to be used only internally. So we shouldn't import _check_instance_type from another package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using the decorator with_input_types. But, it was comparing the typehint with WindowedValue, whereas, the type of the element in the DoFn was a tuple which was actually the WindowedValue object's value attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_input_types won't work here. Are there other public functions that does the check available? I still believe importing private functions is not a good approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no public functions that do the same/something similar, except for with_input_types AFAIK.

I think the problem here is that we want to check the type of the element which is different at pipeline construction-time and runtime.
While creating the PCollection, the type of the elements is WindowedValue and inside the DoFn, the type of the element is Tuple.

If the input is an object of WindowedValue, any DoFn will have its element param as the value attribute of the WindowedValue. I think the type evaluated at pipeline construction-time should also be the same.
If that is the case, then with_input_types will work as intended here.

@staticmethod
def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
_check_instance_type(KV[T, T], element)
_check_instance_type(Tuple[T, T], element)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you have accidentally changed KV to Tuple here, I think here we do want to check the input has KV[K, V] type.

def expand(self, pcoll):
return (pcoll
| core.ParDo(self.add_timestamp)
.with_output_types(Tuple[T, Tuple[T, TimestampType]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I believe it should be KV[K, Tuple[T, TimestampType]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. My bad. I'll change it.

@ttanay
Copy link
Contributor Author

ttanay commented Apr 25, 2019

Hi @robinyqiu
I've made the changes. PTAL 😄

@robinyqiu
Copy link
Contributor

LGTM, and thank you very much for making these changes!

@ttanay
Copy link
Contributor Author

ttanay commented Apr 26, 2019

Thank you @robinyqiu! 😄

pc = p | Create(l_int)
_ = pc | beam.ParDo(combine.Latest.PerKey.add_timestamp)

with self.assertRaises(TypeCheckError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is being tested here? Why passing KV values here will result in an assert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to test that the _check_instance_type used above raises an Exception. The last positional argument to window.GlobalWindows.windowed_value is the timestamp of each element. So, the input PCollections are those that violate the KV constraint.


@staticmethod
def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
_check_instance_type(KV[K, V], element)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop this. The reason is, during pipeline construction these validation will happen, the best thing to do at transform level is to use typehints as much as possible to express what the transform is doing. These are limited but it is better than introducing one off checks inside transforms.

See Top transform for an example: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py#L314

(This will also be good for not using an internal function.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank You @aaltay! I'll go through it and make the necessary changes.

@ttanay
Copy link
Contributor Author

ttanay commented May 2, 2019

@aaltay @robinyqiu
I agree that _check_instance_type is not the way to go.

The need for type validation inside the staticmethod add_timestamp of PerKey exists because when using with_input_types, the type evaluation does not evaluate PCollections of WindowedValue correctly as needed in this case - the value attribute of a WindowedValue.
Eg:
This is from the test: test_per_key

# Elements to create PCollection from
elem_list = [window.GlobalWindows.windowed_value(('a', 1), 300),
         window.GlobalWindows.windowed_value(('b', 3), 100),
         window.GlobalWindows.windowed_value(('a', 2), 200)]

with TestPipeline() as p:
      pc = p | Create(elem_list)
      latest = pc | combine.Latest.PerKey()
      assert_that(latest, equal_to([('a', 1), ('b', 3)]))

In the staticmethod add_timestamp of Latest PTransform, the type of pc is evaluated as WindowedValue when using with_input_types, whereas, it should be KV since the value attribute of the WindowedValue object is a KV(('a', 1)) which is the case when using _check_instance_type.

Is this expected behaviour? If not, I'd love to fix this. It would solve the problem.

@aaltay
Copy link
Member

aaltay commented May 2, 2019

Try this instead:
elem_list = [('a', TimestampedValue(1, 300)), ('b', TimestampedValue(1, 100)), ('a', TimestampedValue(2, 200))]

In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform.

I will defer to @robinyqiu for the remainder of the review.

@ttanay
Copy link
Contributor Author

ttanay commented May 4, 2019

Thank You @aaltay 😄

I tried using TimestampedValue, but, I need to set the timestamp so it is accessible as the TimestampParam in the DoFn - add_timestamp, but, it doesn't seem to do that :/

@robinyqiu I've removed the call to _check_instance_type and the related test - test_per_key_add_timestamp_element_type_violation as suggested. 🤘

@aaltay
Copy link
Member

aaltay commented May 6, 2019

Thank you @ttanay for your patience.

TimestampedValue is supposed to pack its timestamp. I expected that TimestampParam should work with that, if not, that might be a bug.

@ttanay
Copy link
Contributor Author

ttanay commented May 9, 2019

Hi @aaltay sorry for late reply.
I'll investigate, create a JIRA for it and work on a fix.

@robinyqiu
Copy link
Contributor

robinyqiu commented May 15, 2019

Hi @ttanay , I spent some time to investigate into the issues. I am sharing my findings here and I hope it will be helpful.

  1. I agree with what @aaltay said here.

In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform.

In other words, the action item is that we should add type annotations here

@with_input_types(T)
@with_output_types(T)
class Globally(ptransform.PTransform): ...

and here

@with_input_types(KV[K, V])
@with_output_types(KV[K, V])
class PerKey(ptransform.PTransform): ...

This is the intended behavior for these transforms. I know that the current tests will break after we add these annotations, but I think the right thing to do is that we make these changes and fix the tests (how we create a PCollection of timestamped values/kvs using Create, to be specific).

  1. So how can we create a PCollection of kvs that have timestamps for testing? A very simple fix is that we can add a Map(lambda x: x) transform after Create. With this change you can also use TimestampedValue instead of WindowedValue here.

I have tried it and the following test passed:

  def test_per_key(self):
    l = [window.TimestampedValue(('a', 1), 300),
         window.TimestampedValue(('b', 3), 100),
         window.TimestampedValue(('a', 2), 200)]
    with TestPipeline() as p:
      pc = p | Create(l) | Map(lambda x: x)
      latest = pc | combine.Latest.PerKey()
      assert_that(latest, equal_to([('a', 1), ('b', 3)]))
  1. The reason why this hack will make the type checking works is related to some underlying implementation details. To put it simply, both WindowedValue and TimestampValue are special objects that can be properly handled by a DoFn, but Create won't treat them differently than other object types. By inserting a Map (based on DoFn under the hood) after Create, we force the data to go through a DoFn, so the output will be a PCollection of only the element type, instead of the TimestampedValue(element, timestamp) type. Hope this explanation makes sense to you. Let me know if you have questions.

@ttanay
Copy link
Contributor Author

ttanay commented May 19, 2019

Thank You @robinyqiu! The action points and the hack were really helpful. I now understand why the hack was neccessary. This approach of type-checking looks much cleaner!

def test_globally_empty(self):
l = []
with TestPipeline() as p:
pc = p | Create(l)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add an explicit Map here as well, similar to what we did below, for the sake of consistency.

(This test is passing now only because in the assert_that function there is an implicit Map transform added to the pipeline.)

window.GlobalWindows.windowed_value(('b', 3), 100),
window.GlobalWindows.windowed_value(('a', 2), 200)]
with TestPipeline() as p:
pc = p | Create(l) | Map(lambda x: x)
Copy link
Contributor

@robinyqiu robinyqiu May 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a short comment here to explain why this hack is needed. Same above.

class LatestTest(unittest.TestCase):

def test_globally(self):
l = [window.GlobalWindows.windowed_value(3, 100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using TimestampedValue instead of windowed_value? I think it will make the code easier to understand. Same below.

@ttanay
Copy link
Contributor Author

ttanay commented May 21, 2019

Made the changes. @robinyqiu PTAL :)

@robinyqiu
Copy link
Contributor

Thank you @ttanay for your patience!

@aaltay

@aaltay
Copy link
Member

aaltay commented May 22, 2019

Thank you @ttanay and @robinyqiu. This was a great collaboration between you two!

@aaltay aaltay merged commit 766a765 into apache:master May 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants