Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5918] Add Cast transform for Rows #6888

Merged
merged 6 commits into from Nov 5, 2018

Conversation

kanterov
Copy link
Member

@kanterov kanterov commented Oct 30, 2018

Casts rows from one schema, into another. Implements:

  • widening values (e.g., int -> long), to be extended with more conversions
  • narrowwing (e.g., int -> short), to be extended with more conversions
  • ignoring nullability (nullable=true -> nullable=false)
  • weakening nullability (nullable=false -> nullable=true)
  • projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32))

It would be very useful for Row-based IO-s, for instance, BeamBigQueryTable can be implemented with org.apache.beam.sdk.schemas.utils.AvroUtils and Cast, and this will make it more flexible, now it's very restrictive to the schema.

Another example is reading AVRO GenericRecord as user-provided POJO, BEAM-5807.

I want to get an initial portion of feedback before polishing Javadoc, API, etc.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@kanterov kanterov changed the title [BEAM-5918] Add Cast transform for Rows [BEAM-5918] [WIP] Add Cast transform for Rows Oct 30, 2018
@kanterov
Copy link
Member Author

I want to redo part of resolving how to convert schemas.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very useful! A few initial comments on the approach. I hope my high-level idea is clear.

}

@VisibleForTesting
static Row castRow(Row input, Schema inputSchema, Schema outputSchema) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that cast(value, srcType, targetType) would be a generically useful utility method to make public, then this can be a convenience transform that just maps it over the rows and perhaps has a "dead letter" output for failures (overflow and null cases).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles I was thinking about it, but I'm not sure it's stable enough. What is the policy regarding version compatibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of schemas are still @Experimental. I don't think I would commit them to full compatibility until we have the portable semantics fleshed out. I also think the approach to metadata needs some attention; in particular SQL should not use the metadata but trust its compilation process, at which point users probably don't need the metadata at all. Of course, we should save breaking changes until we have worked through all this.

/** Can cast non-nullable fields to nullable. */
WEAKEN,
/** Can cast nullable fields to non-nullable and forth. */
IGNORE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term IGNORE might be misleading. This would be a failed cast if the input is null. FWIW in this type system you can consider NULLABLE t to be a supertype of t so it is perfectly analogous to the Type enum.

}

/** Configures casting of primitive types. */
public enum Type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need all the knobs? I would imagine that if the user requests to cast from A to B then there is just one good interpretation. So all these enums would be not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a need, but I'm not sure it's organized properly. Motivation is to separate different kinds of conversions and provide validation not to accidentally run unsafe code that can crash in runtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that casting from type S to type T you can statically determine whether error handling is required, and what sort.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's was the intention, explicit opt-in for unsafe operations, otherwise, PTransform throws an exception during expand.


// TODO extend this list

static final Map<KV<TypeName, TypeName>, Converter> TYPE_WIDEN =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of types is not dynamic, so I would suggest just using a nested switch instead of a map for what to do for each type constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys are used for checking if there is a conversion between two types

@kanterov
Copy link
Member Author

kanterov commented Oct 30, 2018

@kennknowles thanks! I'm thinking about getting rid of enumerations, and use providers with registry instead, something like:

interface Provider {
  Optional<Conversion> get(ConversionRegistry registry, FieldType inputType, FieldType outputType);
}

Motivation is to be able to compose "custom" conversions, for instance, STRING to DATETIME. As well as in-house types with special behavior that can't be put into Beam.

@kennknowles
Copy link
Member

I'm slightly more worried about fancy conversions like parsing. It is just a bit more of a big design decision. Casting to add/remove nullability or narrow/widen integer types is more simple in my mind.

@kanterov
Copy link
Member Author

kanterov commented Oct 30, 2018

@kennknowles yes, I agree, it's very controversial, but there are cases where it makes a lot of sense, for instance, BigQuery exports:

  • BQ:DATE, AVRO: string
  • BQ:DATETIME, AVRO: string
  • BQ: TIMESTAMP, AVRO: long, logicalType=timestamp-micros

It needs to be converted to Row. The idea is not to have a global registry but override it per transform. For instance:

Cast
  .to(...)
  .with(StringToDateConversion.of())
  .with(...)
  .build()
  .apply(...)

@kanterov
Copy link
Member Author

Another approach to the problem would be implementing functional-style traversals over Schemas and Rows, and implement casting using them. Then they can be used to implement parsing of "special" fields if needed.

@kanterov kanterov changed the title [BEAM-5918] [WIP] Add Cast transform for Rows [BEAM-5918] Add Cast transform for Rows Oct 31, 2018
@kanterov
Copy link
Member Author

@kennknowles Thanks for the feedback. I've simplified implementation a lot. Please take a look.

* <p>Row widening:
*
* <ul>
* <li>wider schema to schema with a subset of fields
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't match with the definition of widening, probably should be only in narrowing.

@kanterov
Copy link
Member Author

kanterov commented Nov 5, 2018

@kennknowles Gentle ping, or, perhaps, there is somebody else who can help with the review? @akedin

@kennknowles
Copy link
Member

Sorry for the delay - taking another look.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually happy with this as a good step. I don't want to hold it up in review. I didn't see any blocking issues. I have some strong opinions about how we have to develop the type system but nothing worth blocking getting this PR in so progress can move along.

return other.isSupertypeOf(this);
}

public boolean isSupertypeOf(TypeName other) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #6861 nullability is added for array elements and map values. It isn't expressed in the most natural "type system" way, but we should move towards treating a nullable T as OPTIONAL<T> with automatic coercion to T rather than treating it as just a T with nullability as a side condition. It requires a fairly significant refactor to do so. Just something to keep in mind. It would affect this sub/supertype check.


boolean supertype = outputType.isSupertypeOf(inputType);

if (isIntegral(inputType) && isDecimal(outputType)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen later, but I think the authoritative catalog of what is a subtype/supertype and what can be coerced can live as top-level concepts, and the transform just a quick helper to apply them.

In particular, it took me a second to understand why there are two branches here. The first branch is "allowed automatic coercion" and the final return below is the usual "no action required" subtyping.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it's misleading, I will reverse condition like:

if (isIntegral(inputType) && isDecimal(outputType)) {
  return Collections.emptyList();
}

if (supertype) {
  return Collections.emptyList();
}

return cantCast;
}

*
* <ul>
* <li>wider schema to schema with a subset of fields
* <li>non-nullable fields to nullable fields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should be in widening.

* <ul>
* <li>integral type to another integral type
* <li>BYTE or INT16 to FLOAT, DOUBLE or DECIMAL
* <li>INT32 to DOUBLE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about INT64? And why only some can go to DECIMAL? And adding nullability is widening too.

return output.build();
}

public static Number castNumber(Number value, TypeName input, TypeName output) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are nice. Very straightforward and easy to verify.

*
* <p>Values returned by `accept` are accumulated.
*/
public abstract class SchemaZipFold<T> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this class, but maybe others won't love it vs nested switch statements / static recursive functions. Out of curiosity does it not add some allocation cost?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't optimized to have few allocations or be fast, because it's called a couple of times during graph construction, and, in my understanding, performance isn't a concern here.

The reason why I extracted zip and fold is that otherwise, it's hard to see the actual narrowing/widening logic. I agree that it is a bit out of the rest of codebase. One alternative could be creating something like class ZippedSchemas

@kennknowles
Copy link
Member

I will wait a short while if you want to make any last changes like touching up the javadoc, then I will go ahead and merge and we can do it in follow-up smaller PRs.

@kennknowles kennknowles merged commit f7a19e6 into apache:master Nov 5, 2018
@kennknowles
Copy link
Member

Merged, but if you had other changes in progress, just open another PR from the branch.

@kennknowles
Copy link
Member

Ah, sorry I neglectected the squashing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants