Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Joining on ROWKEY #2735

Merged
merged 9 commits into from
May 1, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,21 @@ public static Schema getSchemaFromType(final Type type, final String name, final

public static boolean matchFieldName(final Field field, final String fieldName) {
return field.name().equals(fieldName)
|| field.name().equals(fieldName.substring(fieldName.indexOf(FIELD_NAME_DELIMITER) + 1));
|| field.name().equals(getFieldNameWithNoAlias(fieldName));
}

/**
* Check if the supplied {@code actual} field name matches the supplied {@code required}.
*
* <p>Note: if {@code required} is not aliases and {@code actual} is, then the alias is stripped
* from {@code actual} to allow a match.
* @param actual the field name to be checked
* @param required the required field name.
* @return {@code true} on a match, {@code false} otherwise.
*/
public static boolean isFieldName(final String actual, final String required) {
return required.equals(actual)
|| required.equals(getFieldNameWithNoAlias(actual));
}

public static Field buildAliasedField(final String alias, final Field field) {
Expand Down Expand Up @@ -365,12 +379,16 @@ private static org.apache.avro.Schema unionWithNull(final org.apache.avro.Schema

public static String getFieldNameWithNoAlias(final Field field) {
final String name = field.name();
final int idx = name.indexOf(FIELD_NAME_DELIMITER);
return getFieldNameWithNoAlias(name);
}

public static String getFieldNameWithNoAlias(final String fieldName) {
final int idx = fieldName.indexOf(FIELD_NAME_DELIMITER);
if (idx < 0) {
return name;
return fieldName;
}

return name.substring(idx + 1);
return fieldName.substring(idx + 1);
}

public static boolean areEqualSchemas(final Schema schema1, final Schema schema2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,28 @@ public void shouldMatchNameWithAlias() {
assertThat(SchemaUtil.matchFieldName(field, "bar.foo"), is(true));
}

@Test
public void shouldMatchFieldNameOnExactMatch() {
assertThat(SchemaUtil.isFieldName("bob", "bob"), is(true));
assertThat(SchemaUtil.isFieldName("aliased.bob", "aliased.bob"), is(true));
}

@Test
public void shouldMatchFieldNameEvenIfActualAliased() {
assertThat(SchemaUtil.isFieldName("aliased.bob", "bob"), is(true));
}

@Test
public void shouldNotMatchFieldNamesOnMismatch() {
assertThat(SchemaUtil.isFieldName("different", "bob"), is(false));
assertThat(SchemaUtil.isFieldName("aliased.different", "bob"), is(false));
}

@Test
public void shouldNotMatchFieldNamesIfRequiredIsAliased() {
assertThat(SchemaUtil.isFieldName("bob", "aliased.bob"), is(false));
}

@Test
public void shouldGetTheCorrectFieldName() {
final Optional<Field> field = SchemaUtil.getFieldByName(schema, "orderid".toUpperCase());
Expand Down Expand Up @@ -521,18 +543,46 @@ public void shouldThrowOnUnknownSchemaType() {
SchemaUtil.getSchemaTypeAsSqlType(Schema.Type.BYTES);
}

@Test
public void shouldStripAliasFromField() {
// Given:
final Field field = new Field("alias.some-field-name", 1, Schema.OPTIONAL_STRING_SCHEMA);

// When:
final String result = SchemaUtil.getFieldNameWithNoAlias(field);

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldReturnFieldWithoutAliasAsIs() {
// Given:
final Field field = new Field("some-field-name", 1, Schema.OPTIONAL_STRING_SCHEMA);

// When:
final String result = SchemaUtil.getFieldNameWithNoAlias(field);

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldStripAliasFromFieldName() {
final Schema schemaWithAlias = SchemaUtil.buildSchemaWithAlias(schema, "alias");
assertThat("Invalid field name",
SchemaUtil.getFieldNameWithNoAlias(schemaWithAlias.fields().get(0)),
equalTo(schema.fields().get(0).name()));
// When:
final String result = SchemaUtil.getFieldNameWithNoAlias("some-alias.some-field-name");

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldReturnFieldNameWithoutAliasAsIs() {
assertThat("Invalid field name", SchemaUtil.getFieldNameWithNoAlias(schema.fields().get(0)),
equalTo(schema.fields().get(0).name()));
// When:
final String result = SchemaUtil.getFieldNameWithNoAlias("some-field-name");

// Then:
assertThat(result, is("some-field-name"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,20 +402,18 @@ protected Node visitJoin(final Join node, final Void context) {
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn
.getExpression();

final Pair<String, String> leftSide = fetchKeyFieldName(
final Field leftJoinField = getJoinField(
comparisonExpression,
leftAlias,
leftDataSource.getSchema()
);
final Pair<String, String> rightSide = fetchKeyFieldName(

final Field rightJoinField = getJoinField(
comparisonExpression,
rightAlias,
rightDataSource.getSchema()
);

final String leftKeyFieldName = leftSide.getRight();
final String rightKeyFieldName = rightSide.getRight();

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
throw new KsqlException("Only equality join criteria is supported.");
}
Expand All @@ -441,8 +439,8 @@ protected Node visitJoin(final Join node, final Void context) {
joinType,
leftSourceKafkaTopicNode,
rightSourceKafkaTopicNode,
leftKeyFieldName,
rightKeyFieldName,
leftJoinField.name(),
rightJoinField.name(),
leftAlias,
rightAlias,
node.getWithinExpression().orElse(null),
Expand Down Expand Up @@ -472,68 +470,69 @@ private JoinNode.JoinType getJoinType(final Join node) {
return joinType;
}

/**
* From the join criteria expression fetch the field corresponding to the given source alias.
*/
private Pair<String, String> fetchKeyFieldName(
private Field getJoinField(
final ComparisonExpression comparisonExpression,
final String sourceAlias,
final Schema sourceSchema
) {
Pair<String, String> keyInfo = fetchFieldNameFromExpr(
Optional<Field> joinField = getJoinFieldFromExpr(
comparisonExpression.getLeft(),
sourceAlias,
sourceSchema
);
if (keyInfo == null) {
keyInfo = fetchFieldNameFromExpr(

if (!joinField.isPresent()) {
joinField = getJoinFieldFromExpr(
comparisonExpression.getRight(),
sourceAlias,
sourceSchema
);
}
if (keyInfo == null) {
throw new KsqlException(
String.format(
"%s : Invalid join criteria %s. Could not find a join criteria operand for %s. ",
comparisonExpression.getLocation().map(Objects::toString).orElse(""),
comparisonExpression, sourceAlias
)
);
}
return keyInfo;

return joinField
.orElseThrow(() -> new KsqlException(
String.format(
"%s : Invalid join criteria %s. Could not find a join criteria operand for %s. ",
comparisonExpression.getLocation().map(Objects::toString).orElse(""),
comparisonExpression, sourceAlias
)
));
}

/**
* Given an expression and the source alias detects if the expression type is
* DereferenceExpression or QualifiedNameReference and if the variable prefix matches the source
* Alias.
*/
private Pair<String, String> fetchFieldNameFromExpr(
final Expression expression, final String sourceAlias,
private Optional<Field> getJoinFieldFromExpr(
final Expression expression,
final String sourceAlias,
final Schema sourceSchema
) {
if (expression instanceof DereferenceExpression) {
final DereferenceExpression dereferenceExpression =
(DereferenceExpression) expression;
final String sourceAliasVal = dereferenceExpression.getBase().toString();
if (sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
final String fieldName = dereferenceExpression.getFieldName();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAliasVal, fieldName);
}
}
} else if (expression instanceof QualifiedNameReference) {
final QualifiedNameReference qualifiedNameReference =
(QualifiedNameReference) expression;
final String fieldName = qualifiedNameReference.getName().getSuffix();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAlias, fieldName);
final DereferenceExpression dereferenceExpr = (DereferenceExpression) expression;

final String sourceAliasVal = dereferenceExpr.getBase().toString();
if (!sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
return Optional.empty();
}

final String fieldName = dereferenceExpr.getFieldName();
return getJoinFieldFromSource(fieldName, sourceAlias, sourceSchema);
}
return null;

if (expression instanceof QualifiedNameReference) {
final QualifiedNameReference qualifiedNameRef = (QualifiedNameReference) expression;

final String fieldName = qualifiedNameRef.getName().getSuffix();
return getJoinFieldFromSource(fieldName, sourceAlias, sourceSchema);
}
return Optional.empty();
}

private Optional<Field> getJoinFieldFromSource(
final String fieldName,
final String sourceAlias,
final Schema sourceSchema
) {
return SchemaUtil.getFieldByName(sourceSchema, fieldName)
.map(field -> SchemaUtil.buildAliasedField(sourceAlias, field));
}

@Override
protected Node visitAliasedRelation(final AliasedRelation node, final Void context) {
Expand Down
Loading