Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDAP-16708 add API for autojoiner #12185

Merged
merged 1 commit into from
May 19, 2020

Conversation

albertshau
Copy link
Contributor

Add a new set of classes for AutoJoiner, which can be used instead
of the current Joiner interface. This new API leaves all of the
implementation details up to the application, which will allow the
app to perform the join in better ways. For example, in the Spark
program, it will allow using broadcast joins.

Plugin developers are responsible for returning a JoinDefinition
based on information about incoming stages.

This change includes the JoinDefinition as well as all the classes
required to create a definition. It also includes validation logic
to make sure the plugin cannot create a definition that tries to
join on a field that doesn't exist, or tries to join on fields
that have mismatched types, or any other type of error.

@albertshau albertshau force-pushed the feature_release/CDAP-16708-new-joiner-api branch from e5f8bb5 to ad1ec9a Compare May 19, 2020 00:15
return keys;
}

public boolean shouldDropNullKeys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For boolean field, use isDropNullKeys().

throw new InvalidJoinConditionException(String.format(
"Join key for stage '%s' is invalid. Field '%s' does not exist in the stage.",
joinStageName, keysCopy.iterator().next()));
} else if (keysCopy.size() > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have else after a throw inside the if block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an else-if, since it's also possible the set is empty, in which case it's valid. I can restructure this a little to make it more clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it's cleaner to write:

if (keyCopy.size() == 1) {
  throw
}
if (keyCopy.size() > 1) {
  throw
}

public class JoinStage {
private final String stageName;
private final Schema schema;
private final boolean isRequired;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call it required (so that the getter is isRequired).

private final String stageName;
private final Schema schema;
private final boolean isRequired;
private final boolean shouldBroadcast;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call it broadcast.

@albertshau
Copy link
Contributor Author

Copy link
Contributor

@chtyim chtyim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just couple minor comments.

* The name of a field and an optional alias to rename it to.
*/
@Beta
public class Field {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to call this JoinField.

return this;
}

public Builder setShouldDropNullKeys(boolean dropNullKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better match the method name with the variable. setDropNullKeys.

// this means there are stages with different number of fields.
// it's the equivalent of trying to join on A.id = B.id and A.name = [null]
if (numFieldsToStages.size() > 1) {
Iterator<Integer> keyIter = numFieldsToStages.keySet().iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to iterate through the numFioeldsToStages to use all stages to form the error message. E.g. if the map is like this:

{
  1: ["s1"],
  2: ["s2"],
  3: ["s3"]
}

It's better to show all the stages with the number of join fields that they declared.

}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra new line.

Iterator<String> fieldIter = otherKey.getFields().iterator();
for (String key1Field : key1.getFields()) {
// guaranteed to be non-null because of earlier validation
Schema key1Schema = schema1.getField(key1Field).getSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good refactor these three lines as a private method:

  Schema.Type key1Type = getFieldType(schema1, key1Field);
  Schema.Type otherKeyType = getFieldType(otherSchema, fieldIter.next());
  if (key1Type != otherKeyType) {
     ...
  }

Add a new set of classes for AutoJoiner, which can be used instead
of the current Joiner interface. This new API leaves all of the
implementation details up to the application, which will allow the
app to perform the join in better ways. For example, in the Spark
program, it will allow using broadcast joins.

Plugin developers are responsible for returning a JoinDefinition
based on information about incoming stages.

This change includes the JoinDefinition as well as all the classes
required to create a definition. It also includes validation logic
to make sure the plugin cannot create a definition that tries to
join on a field that doesn't exist, or tries to join on fields
that have mismatched types, or any other type of error.
@albertshau albertshau force-pushed the feature_release/CDAP-16708-new-joiner-api branch from 519f129 to a568b1c Compare May 19, 2020 21:09
@chtyim chtyim added the 6.1 label May 19, 2020
Copy link
Contributor

@CuriousVini CuriousVini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one question

String otherField = fieldIter.next();
Schema otherFieldSchema = getNonNullableFieldSchema(otherSchema, otherField);

if (key1Schema.getType() != otherFieldSchema.getType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also compare logical types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that and wavered back and forth.

In the end, I decided to allow it because it is actually able to join if the physical types are the same, regardless of the logical types. It probably makes sense for the plugin to check this and disallow, but didn't want to restrict it in the app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. sg 👍

*
* <pre>
* {@code
* JoinDefinition define(AutoJoinerContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@albertshau albertshau merged commit b80e40a into release/6.1 May 19, 2020
@albertshau albertshau deleted the feature_release/CDAP-16708-new-joiner-api branch May 19, 2020 23:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants