Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7886] Make row coder a standard coder and implement in Python #9188

Merged
merged 7 commits into from
Nov 6, 2019

Conversation

TheNeuralBit
Copy link
Member

@TheNeuralBit TheNeuralBit commented Jul 30, 2019

  • Defines a URN, beam:coder:row:v1, to represent row coder as a standard coder.
  • Implements row coder as a standard coder in Java (previously it was a custom coder). It is now serialized to a portable representation based on the Schema specification defined in https://s.apache.org/beam-schemas
  • Adds very basic support for Beam Schemas in the Python SDK based on typing (detailed below)
  • Implements row coder in Python (currently only strings, integers, doubles, and lists are supported)
  • Adds some simple test cases for row coder to standard_coders.yaml to verify compatibility between the Java and Python implementations.

Beam Schemas in Python

As noted above this PR adds basic support for Beam Schemas in Python. Currently this relies on Python's typing module as a native Python representation of Beam Schemas.

apache_beam.typehints.schemas includes two functions, typing_from_runner_api and typing_to_runner_api which convert supported typing instances to/from portable schemas. Primitive types are mapped to numpy types (e.g. np.int32, np.double), Arrays are mapped to typing.List[T], Maps are mapped to typing.Mapping[K,V], and Rows are mapped to typing.NamedTuple. Logical types are not yet supported.

With the changes in this PR it's possible to use Python's row coder for simple structured data types with code like the following:

import apache_beam as beam
from apache_beam import coders
from typing import NamedTuple
from typing import Optional
import numpy as np

class Movie(NamedTuple):
  name: np.unicode
  year: Optional[np.int16]

# The class/type annotation syntax doesn't work in Python 2. Instead you can use:
# Movie = NamedTuple('Movie', [('name', np.unicode), ('year', Optional[np.int16])]

coders.registry.register_coder(Movie, coders.RowCoder)

# Create a PCollection with a NamedTuple type to assign a Schema
movies = p | 'create movies' >> beam.Map(some_function).with_output_types(Movie)

# Retrieve the original type by accessing movies.element_type

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@TheNeuralBit TheNeuralBit changed the title Make row coder a standard coder and implement in Python WIP: Make row coder a standard coder and implement in Python Jul 30, 2019
@TheNeuralBit
Copy link
Member Author

Run Portable_Python PreCommit

@TheNeuralBit TheNeuralBit force-pushed the row-coder-standard branch 2 times, most recently from 29fddd1 to f381123 Compare July 31, 2019 00:46
@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

@TheNeuralBit
Copy link
Member Author

Run Java PreCommit

@TheNeuralBit TheNeuralBit changed the title WIP: Make row coder a standard coder and implement in Python [BEAM-7886] Make row coder a standard coder and implement in Python Aug 2, 2019
Copy link
Contributor

@robinyqiu robinyqiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a partial review on the Java and portable side. Will review the Python side code later.

@TheNeuralBit TheNeuralBit force-pushed the row-coder-standard branch 2 times, most recently from 17bfdbb to 2571d89 Compare August 13, 2019 00:53
@TheNeuralBit
Copy link
Member Author

R: @reuvenlax would you mind taking a look at the model changes and the Java changes? I can separate out the relevant commits in their own PR if that helps.

@reuvenlax
Copy link
Contributor

taking a look

@udim
Copy link
Member

udim commented Aug 21, 2019

I'm reviewing the Python changes

@TheNeuralBit
Copy link
Member Author

Thank you both! Let me know if you have any questions

Copy link
Member

@udim udim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a partial review. I need to look more closely at the files under typehints/.

sdks/python/apache_beam/coders/row_coder.py Show resolved Hide resolved
sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/typehints/schemas.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/coders/row_coder_test.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved
sdks/python/setup.py Outdated Show resolved Hide resolved
PRIMITIVE_TO_ATOMIC_TYPE.update({
# In python 3, this is a no-op because str == unicode,
# but in python 2 it overrides the bytes -> BYTES mapping.
str: schema_pb2.AtomicType.STRING,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that does conversion to/from these types?
I suspect that in Python 2 a str such as '\xff' will fail because it's not valid UTF-8.
In other words, str should be invalid or converted to AtomicType.BYTES in Python 2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the runtime types don't necessarily line up exactly with the typing representation of the schema. For example even though a schema may have an attribute with np.int* type, we still actually produce and consume int instances, and never use np.int* instances at runtime.

In this case, the typing might say str, but RowCoder uses StrUtf8Coder to produce/consume instances of past.builtins.unicode at runtime.

I agree this could be a little confusing for users. We discussed it on the ML and @robertwb suggested this approach:

In both Python 2 and Python 3 one would use str for STRING, it would decode to
past.builtins.unicode. This seems to capture the intent better than
mapping str to BYTES in Python 2 only.)

There are tests over in row_coder_test.py, and in standard_coders.yaml/standard_coders_test.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, what about just rejecting str in Python 2 (forcing the user to say unicode). We can loosen things if this becomes too cumbersome in the future (but going the other way is backwards incompatible).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to support python2? If this is going to be a burden in general, I would rather not add support for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'll be harder to consistently exclude support for Python 2 for all schema use.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense. It doesn't seem too onerous to ask people to use unicode in python 2. And that's a good point that it's a backwards compatible change if we find out otherwise. I pushed a commit that does this: ecaf73c

sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved
("aliases", typing.List[unicode]),
])

coders_registry.register_coder(Person, RowCoder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required for regular use?

Copy link
Member Author

@TheNeuralBit TheNeuralBit Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is right now. I was hesitant to make RowCoder the default coder for any NamedTuple sub-class, and I thought this was a good way to make it opt in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@reuvenlax
Copy link
Contributor

Trying to think of a better name than PortableSchemaCoder, but I guess this is fine for now.

@aaltay
Copy link
Member

aaltay commented Aug 23, 2019

Can we just SchemaCoder as the name? I agree that this does not matter much. Would changing this in the future will be difficult?

@reuvenlax
Copy link
Contributor

reuvenlax commented Aug 23, 2019 via email

@TheNeuralBit
Copy link
Member Author

Let me see if I can put together a patch that does that. I think I could just get rid of the current RowCoder and move it's logic to SchemaCoder and RowCoderGenerator.

@TheNeuralBit
Copy link
Member Author

Run Java PreCommit

@TheNeuralBit
Copy link
Member Author

Apologies for letting this go stale. After BEAM-8111 I wanted to make sure we had some better test coverage on the Java side.

@udim, @aaltay, @robertwb, and/or @chadrik - would you mind taking another look at the Python changes now?

@kennknowles kennknowles requested a review from udim October 7, 2019 20:23
Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of comments, but this is looking pretty good to me.

return new CoderTranslator<RowCoder>() {
@Override
public List<? extends Coder<?>> getComponents(RowCoder from) {
return ImmutableList.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the time being, we're inlining everything, rather than using components. Was there a bug tracking doing better for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right now there's just a fixed mapping from fieldtype to coder. There's not a bug filed for using components, I was thinking that we would just continue inlining everything. Do you think we should plan on using components instead? What does that get us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For coders, if one had a coder T one was likely to have KV<K, T> for various K, an Iterable, WindowedValue for possibly several window types, and various other permutations. Coupled with the fact that leaf coders were often huge serialized blobs made for some pretty significant savings.

Maybe this'll be less of an issue in the streaming world. I think it should not be a blocker assuming we'll be able to update this in the (short-term) future.

case FLOAT:
return Float.parseFloat((String) value);
case DOUBLE:
return Double.parseDouble((String) value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these strings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to be consistent with the YAML representations used for other coders in convertValues. We use strings for DoubleCoder there, presumably to avoid the possibility of running into precision errors? It looks like yaml does support floating point but explicitly states it doesn't specify a required accuracy for implementations.

@lukecwik added the parseDouble line I linked in #8205, maybe he can clarify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAML is JSON which supports doubles (really, as its only data type). @lukecwik I'm curious, but again this is not a blocker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I created BEAM-8437 lets take this conversation over there.

sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/coders/row_coder.py Outdated Show resolved Hide resolved

# Note that if this coder's schema has *fewer* attributes than the encoded
# value, we just need to ignore the additional values, which will occur
# here because we only decode as many values as we have coders for.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can safely do this, as we have to pull the extra values off the stream, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point.. I was just trying to replicate what is implemented in java's RowCoderGenerator. It seems like this would be an issue over there as well, unless something else is consuming the unread bytes? Could there be logic to do that when there's a length-prefix?

I'd be fine with just leaving this out for now and filing a jira if we can't get a satisfying answer.

("aliases", typing.List[unicode]),
])

coders_registry.register_coder(Person, RowCoder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

sdks/python/apache_beam/coders/standard_coders_test.py Outdated Show resolved Hide resolved
@TheNeuralBit
Copy link
Member Author

@robertwb could you take another look?

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

3 similar comments
@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

@TheNeuralBit
Copy link
Member Author

Finally got CI green! @robertwb is this ok to merge now?

@tweise
Copy link
Contributor

tweise commented Nov 4, 2019

Excited to see this happening. Perhaps a good time to squash the fixup commits?

@robertwb
Copy link
Contributor

robertwb commented Nov 4, 2019

Yes, go ahead and squash into meaningful commits and merge.

@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

1 similar comment
@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

@TheNeuralBit
Copy link
Member Author

@robertwb I squashed it down to reasonable commits, so I think this is ready to merge. I think the python failures have been flakes, hopefully it will pass this time.

@reuvenlax
Copy link
Contributor

I can merge this once Python passes

@reuvenlax reuvenlax merged commit 01726e9 into apache:master Nov 6, 2019
@TheNeuralBit
Copy link
Member Author

🎉 Thanks everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants