Skip to content

Commit

Permalink
fix: include header columns when injecting schemas (#9023)
Browse files Browse the repository at this point in the history
* fix: include header columns when injecting schemas

* unused import

* update plans
  • Loading branch information
Zara Lim committed Apr 13, 2022
1 parent e012892 commit 4b5feb0
Show file tree
Hide file tree
Showing 7 changed files with 577 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -525,7 +524,11 @@ private static TableElements buildElements(
final Optional<SchemaAndId> keySchema,
final Optional<SchemaAndId> valueSchema
) {
final List<TableElement> elements = new ArrayList<>();
final List<TableElement> elements = preparedStatement.getStatement()
.getElements()
.stream()
.filter(tableElement -> tableElement.getConstraints().isHeaders())
.collect(Collectors.toList());

if (keySchema.isPresent()) {
final ColumnConstraints constraints = getKeyConstraints(preparedStatement.getStatement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,16 @@ public class DefaultSchemaInjectorTest {
private static final ColumnConstraints PRIMARY_KEY_CONSTRAINT =
new ColumnConstraints.Builder().primaryKey().build();

private static final ColumnConstraints HEADER_CONSTRAINT =
new ColumnConstraints.Builder().header("header").build();

private static final TableElements SOME_KEY_ELEMENTS_STREAM = TableElements.of(
new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING), KEY_CONSTRAINT));
private static final TableElements HEADER_ELEMENTS = TableElements.of(
new TableElement(ColumnName.of("head"), new Type(SqlTypes.BYTES), HEADER_CONSTRAINT));
private static final TableElements HEADER_AND_VALUE = TableElements.of(
new TableElement(ColumnName.of("head"), new Type(SqlTypes.BYTES), HEADER_CONSTRAINT),
new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING)));
private static final TableElements SOME_KEY_ELEMENTS_TABLE = TableElements.of(
new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING), PRIMARY_KEY_CONSTRAINT));
private static final TableElements SOME_VALUE_ELEMENTS = TableElements.of(
Expand Down Expand Up @@ -570,6 +578,106 @@ public void shouldInjectKeyAndMaintainValueColumnsForCt() {
));
}

@Test
public void shouldInjectKeyAndValuesForCs() {
// Given:
givenKeyAndValueInferenceSupported();
when(cs.getElements()).thenReturn(HEADER_ELEMENTS);

// When:
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getElements(),
is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_STREAM, INFERRED_KSQL_VALUE_SCHEMA)));
assertThat(result.getStatementText(), is(
"CREATE STREAM `cs` ("
+ "`head` BYTES HEADER('header'), "
+ "`key` STRING KEY, "
+ "`intField` INTEGER, "
+ "`bigIntField` BIGINT, "
+ "`doubleField` DOUBLE, "
+ "`stringField` STRING, "
+ "`booleanField` BOOLEAN, "
+ "`arrayField` ARRAY<INTEGER>, "
+ "`mapField` MAP<STRING, BIGINT>, "
+ "`structField` STRUCT<`s0` BIGINT>, "
+ "`decimalField` DECIMAL(4, 2)) "
+ "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldInjectKeyAndValuesForCt() {
// Given:
givenKeyAndValueInferenceSupported();
when(ct.getElements()).thenReturn(HEADER_ELEMENTS);

// When:
final ConfiguredStatement<CreateTable> result = injector.inject(ctStatement);

// Then:
assertThat(result.getStatement().getElements(),
is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_TABLE, INFERRED_KSQL_VALUE_SCHEMA)));
assertThat(result.getStatementText(), is(
"CREATE TABLE `ct` ("
+ "`head` BYTES HEADER('header'), "
+ "`key` STRING PRIMARY KEY, "
+ "`intField` INTEGER, "
+ "`bigIntField` BIGINT, "
+ "`doubleField` DOUBLE, "
+ "`stringField` STRING, "
+ "`booleanField` BOOLEAN, "
+ "`arrayField` ARRAY<INTEGER>, "
+ "`mapField` MAP<STRING, BIGINT>, "
+ "`structField` STRUCT<`s0` BIGINT>, "
+ "`decimalField` DECIMAL(4, 2)) "
+ "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldInjectValuesAndMaintainKeysAndHeadersForCs() {
// Given:
givenKeyAndValueInferenceSupported();
when(cs.getElements()).thenReturn(HEADER_AND_VALUE);

// When:
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getElements(),
is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_STREAM, SOME_VALUE_ELEMENTS)));
assertThat(result.getStatementText(), is(
"CREATE STREAM `cs` ("
+ "`head` BYTES HEADER('header'), "
+ "`key` STRING KEY, "
+ "`bob` STRING) "
+ "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldInjectValuesAndMaintainKeysAndHeadersForCt() {
// Given:
givenKeyAndValueInferenceSupported();
when(ct.getElements()).thenReturn(HEADER_AND_VALUE);

// When:
final ConfiguredStatement<CreateTable> result = injector.inject(ctStatement);

// Then:
assertThat(result.getStatement().getElements(),
is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_TABLE, SOME_VALUE_ELEMENTS)));
assertThat(result.getStatementText(), is(
"CREATE TABLE `ct` ("
+ "`head` BYTES HEADER('header'), "
+ "`key` STRING PRIMARY KEY, "
+ "`bob` STRING) "
+ "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldInjectKeyForCsas() {
// Given:
Expand Down Expand Up @@ -1224,4 +1332,17 @@ private static TableElements combineElements(
.collect(Collectors.toList())
);
}

private static TableElements combineElements(
final TableElements headerElems,
final TableElements keyElems,
final TableElements valueElems
) {
Stream.of(headerElems.stream(), keyElems.stream(), valueElems.stream()).flatMap(s -> s);
return TableElements.of(
Stream.of(headerElems.stream(), keyElems.stream(), valueElems.stream())
.flatMap(s -> s)
.collect(Collectors.toList())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.test.model.TestHeader;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -181,6 +183,10 @@ private static boolean compareText(final Object actualValue, final JsonNode expe
if (actualValue instanceof BigDecimal) {
return new BigDecimal(expected.asText()).equals(actualValue);
}
if (actualValue instanceof ByteBuffer) {
return expected.asText().equals(
Base64.getEncoder().encodeToString(((ByteBuffer) actualValue).array()));
}
return false;
}

Expand Down
Loading

0 comments on commit 4b5feb0

Please sign in to comment.