Skip to content

Commit

Permalink
feat: add basic support for key syntax (#3034)
Browse files Browse the repository at this point in the history
* feat: add basic support for key syntax

Adds ability to explicitly include key fields in C* statements so long as:
- the key field is named `ROWKEY`
- it of type `STRING`.

i.e. you can now do:

`CREATE STREAM FOO AS (ROWKEY STRING KEY, ...) ...;`
  • Loading branch information
big-andy-coates committed Jul 14, 2019
1 parent 1290b82 commit ca6478a
Show file tree
Hide file tree
Showing 42 changed files with 1,075 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -117,6 +118,11 @@ private InputStream getSchemaStream() {

@State(Scope.Thread)
public static class SerdeState {

private static final org.apache.kafka.connect.data.Schema KEY_SCHEMA = SchemaBuilder.struct()
.field(SchemaUtil.ROWKEY_NAME, org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA)
.build();

Serializer<GenericRow> serializer;
Deserializer<GenericRow> deserializer;
GenericRow row;
Expand Down Expand Up @@ -165,9 +171,14 @@ private static org.apache.kafka.connect.data.Schema convertFieldNamesToUppercase
private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema
) {
final PhysicalSchema physicalSchema = PhysicalSchema.from(
LogicalSchema.of(KEY_SCHEMA, schema),
SerdeOption.none()
);

return GenericRowSerDe.from(
new KsqlJsonSerdeFactory(),
PhysicalSchema.from(LogicalSchema.of(schema), SerdeOption.none()),
physicalSchema,
new KsqlConfig(Collections.emptyMap()),
() -> null,
"benchmark",
Expand All @@ -178,9 +189,15 @@ private static Serde<GenericRow> getAvroSerde(
final org.apache.kafka.connect.data.Schema schema
) {
final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

final PhysicalSchema physicalSchema = PhysicalSchema.from(
LogicalSchema.of(KEY_SCHEMA, schema),
SerdeOption.none()
);

return GenericRowSerDe.from(
new KsqlAvroSerdeFactory("benchmarkSchema"),
PhysicalSchema.from(LogicalSchema.of(schema), SerdeOption.none()),
physicalSchema,
new KsqlConfig(Collections.emptyMap()),
() -> schemaRegistryClient,
"benchmark",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public final class LogicalSchema {
.field(SchemaUtil.ROWTIME_NAME, Schema.OPTIONAL_INT64_SCHEMA)
.build();

private static final Schema KEY_SCHEMA = SchemaBuilder
private static final Schema IMPLICIT_KEY_SCHEMA = SchemaBuilder
.struct()
.field(SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.build();
Expand All @@ -90,7 +90,14 @@ public final class LogicalSchema {
private final ConnectSchema valueSchema;

public static LogicalSchema of(final Schema valueSchema) {
return new LogicalSchema(METADATA_SCHEMA, KEY_SCHEMA, valueSchema, Optional.empty());
return LogicalSchema.of(IMPLICIT_KEY_SCHEMA, valueSchema);
}

public static LogicalSchema of(
final Schema keySchema,
final Schema valueSchema
) {
return new LogicalSchema(METADATA_SCHEMA, keySchema, valueSchema, Optional.empty());
}

private LogicalSchema(
Expand All @@ -105,6 +112,10 @@ private LogicalSchema(
this.alias = requireNonNull(alias, "alias");
}

public ConnectSchema keySchema() {
return keySchema;
}

public ConnectSchema valueSchema() {
return valueSchema;
}
Expand Down Expand Up @@ -237,6 +248,13 @@ public LogicalSchema withoutAlias() {
);
}

/**
* @return {@code true} is aliased, {@code false} otherwise.
*/
public boolean isAliased() {
return alias.isPresent();
}

/**
* Copies metadata and key fields to the value schema.
*
Expand Down Expand Up @@ -310,12 +328,13 @@ public boolean equals(final Object o) {
return false;
}
final LogicalSchema that = (LogicalSchema) o;
return schemasAreEqual(valueSchema, that.valueSchema);
return Objects.equals(keySchema, that.keySchema)
&& Objects.equals(valueSchema, that.valueSchema);
}

@Override
public int hashCode() {
return Objects.hash(valueSchema);
return Objects.hash(keySchema, valueSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public final class SchemaUtil {
public static final String ROWKEY_NAME = "ROWKEY";
public static final String ROWTIME_NAME = "ROWTIME";

public static final int ROWTIME_INDEX = 0;
public static final int ROWKEY_INDEX = 1;
private static final Map<Type, Supplier<SchemaBuilder>> typeToSchema
= ImmutableMap.<Type, Supplier<SchemaBuilder>>builder()
Expand Down
Loading

0 comments on commit ca6478a

Please sign in to comment.