Skip to content

Commit

Permalink
more tests a la Hojjat!
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Apr 26, 2019
1 parent 934e3f4 commit 3f92588
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
Expand Up @@ -302,7 +302,7 @@ public Node visitInsertValues(final InsertValuesContext context) {
targetLocation,
targetName,
columns,
visit(context.values().primaryExpression(), Expression.class));
visit(context.values().constant(), Expression.class));
}

@Override
Expand Down
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.metastore.model.KsqlTopic;
import io.confluent.ksql.metastore.model.StructuredDataSource;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.BooleanLiteral;
import io.confluent.ksql.parser.tree.DoubleLiteral;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.InsertValues;
Expand Down Expand Up @@ -84,6 +85,16 @@ public class InsertValuesExecutorTest {
.field("COL1", Schema.STRING_SCHEMA)
.build();

private static final Schema BIG_SCHEMA = SchemaBuilder.struct()
.field("ROWTIME", Schema.OPTIONAL_INT64_SCHEMA)
.field("ROWKEY", Schema.OPTIONAL_INT64_SCHEMA)
.field("INT", Schema.OPTIONAL_INT32_SCHEMA)
.field("COL0", Schema.OPTIONAL_INT64_SCHEMA) // named COL0 for auto-ROWKEY
.field("DOUBLE", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("BOOLEAN", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field("VARCHAR", Schema.OPTIONAL_STRING_SCHEMA)
.build();

private static final byte[] KEY = new byte[]{1};
private static final byte[] VALUE = new byte[]{2};

Expand Down Expand Up @@ -239,24 +250,64 @@ public void shouldFillInKeyFromRowKey() {
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleOutOfOrderSchema() {
// Given:
final ConfiguredStatement<InsertValues> statement = givenInsertValues(
ImmutableList.of("COL1", "COL0"),
ImmutableList.of(
new StringLiteral("str"),
new LongLiteral(2L))
);

// When:
new InsertValuesExecutor(() -> 1L).execute(statement, engine, serviceContext);

// Then:
verify(rowSerializer).serialize(TOPIC_NAME, new GenericRow(1L, 2L, 2L, "str"));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleAllSortsOfLiterals() {
// Given:
final ConfiguredStatement<InsertValues> statement = givenInsertValues(
ImmutableList.of("COL1", "COL0"),
ImmutableList.of(
new StringLiteral("str"),
new LongLiteral(2L))
);

// When:
new InsertValuesExecutor(() -> 1L).execute(statement, engine, serviceContext);

// Then:
verify(rowSerializer).serialize(TOPIC_NAME, new GenericRow(1L, 2L, 2L, "str"));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleNullKey() {
// Given:
givenDataSourceWithSchema(BIG_SCHEMA);
final ConfiguredStatement<InsertValues> statement = givenInsertValues(
SCHEMA.fields().stream().map(Field::name).collect(Collectors.toList()),
BIG_SCHEMA.fields().stream().map(Field::name).collect(Collectors.toList()),
ImmutableList.of(
new LongLiteral(1L),
new NullLiteral(),
new NullLiteral(),
new LongLiteral(2L),
new IntegerLiteral(0),
new LongLiteral(2),
new DoubleLiteral("3.0"),
new BooleanLiteral("TRUE"),
new StringLiteral("str"))
);

// When:
new InsertValuesExecutor(() -> 1L).execute(statement, engine, serviceContext);

// Then:
verify(rowSerializer).serialize(TOPIC_NAME, new GenericRow(1L, null, null, "str"));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, null, VALUE));
verify(rowSerializer).serialize(TOPIC_NAME, new GenericRow(1L, 2L, 0, 2L, 3.0, true, "str"));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
Expand Down

0 comments on commit 3f92588

Please sign in to comment.