Skip to content

Commit

Permalink
test: serialize/deserialize plans from qtt (#4080)
Browse files Browse the repository at this point in the history
Extends qtt to build the query plan and deserialize/serialize it
before executing. Should be enough to make sure plans are serializable
and executable until we improve qtt.
  • Loading branch information
rodesai committed Dec 10, 2019
1 parent 8dbfbb7 commit 4dd76ac
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.parser.json.KsqlParserSerializationModule;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;

public final class PlanJsonMapper {
private PlanJsonMapper() {
Expand All @@ -33,7 +36,10 @@ public static ObjectMapper create() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModules(
new Jdk8Module(),
new JavaTimeModule()
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
);
mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.execution.context.QueryContext;
import java.io.IOException;
import org.junit.Test;

public class QueryContextTest {
private static final ObjectMapper MAPPER = new ObjectMapper();

private final QueryContext.Stacker contextStacker = new QueryContext.Stacker().push("node");
private final QueryContext queryContext = contextStacker.getQueryContext();

Expand All @@ -51,4 +56,26 @@ public void shouldGenerateNewContextOnPush() {
assertQueryContext(childContext, "node", "child");
assertQueryContext(grandchildContext, "node", "child", "grandchild");
}

@Test
public void shouldSerializeCorrectly() throws IOException {
// Given:
final QueryContext context = contextStacker.push("child").getQueryContext();

// When:
final String serialized = MAPPER.writeValueAsString(context);

// Then:
assertThat(serialized, is("\"node/child\""));
}

@Test
public void shouldDeserializeCorrectly() throws IOException {
// When:
final QueryContext deserialized = MAPPER.readValue("\"node/child\"", QueryContext.class);

// Then:
final QueryContext expected = contextStacker.push("child").getQueryContext();
assertThat(deserialized, is(expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.execution.context;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand Down Expand Up @@ -43,12 +44,12 @@ private QueryContext(List<String> context) {
}
}

@SuppressWarnings("unused")// Invoked via reflection by Jackson
@JsonCreator
private QueryContext(final String context) {
private QueryContext(String context) {
this(ImmutableList.copyOf(context.split(DELIMITER)));
}

@JsonIgnore
public List<String> getContext() {
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.engine.SqlFormatInjector;
import io.confluent.ksql.engine.StubInsertValuesExecutor;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
Expand All @@ -41,12 +44,14 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector;
import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.tools.stubs.StubKafkaService;
import io.confluent.ksql.test.utils.SerdeUtil;
Expand All @@ -55,6 +60,7 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -71,6 +77,8 @@
public final class TestExecutorUtil {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final ObjectMapper PLAN_MAPPER = PlanJsonMapper.create();

private TestExecutorUtil() {
}

Expand Down Expand Up @@ -275,7 +283,6 @@ private static List<PersistentQueryAndSortedSources> execute(
.collect(Collectors.toList());
}


@SuppressWarnings({"rawtypes", "unchecked"})
private static ExecuteResultAndSortedSources execute(
final KsqlExecutionContext executionContext,
Expand Down Expand Up @@ -308,7 +315,7 @@ private static ExecuteResultAndSortedSources execute(

final ExecuteResult executeResult;
try {
executeResult = executionContext.execute(executionContext.getServiceContext(), reformatted);
executeResult = executeConfiguredStatement(executionContext, reformatted);
} catch (final KsqlStatementException statementException) {
// use the original statement text in the exception so that tests
// can easily check that the failed statement is the input statement
Expand Down Expand Up @@ -339,6 +346,31 @@ private static ExecuteResultAndSortedSources execute(
Optional.empty());
}

@SuppressWarnings("unchecked")
private static ExecuteResult executeConfiguredStatement(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> stmt) {
final ConfiguredKsqlPlan configuredPlan;
try {
configuredPlan = buildConfiguredPlan(executionContext, stmt);
} catch (final IOException e) {
throw new TestFrameworkException("Error (de)serializing plan: " + e.getMessage(), e);
}
return executionContext.execute(executionContext.getServiceContext(), configuredPlan);
}

private static ConfiguredKsqlPlan buildConfiguredPlan(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> stmt
) throws IOException {
final KsqlPlan plan = executionContext.plan(executionContext.getServiceContext(), stmt);
final String serialized = PLAN_MAPPER.writeValueAsString(plan);
return ConfiguredKsqlPlan.of(
PLAN_MAPPER.readValue(serialized, KsqlPlan.class),
stmt.getOverrides(),
stmt.getConfig());
}

private static Optional<Long> getWindowSize(final Query query) {
return query.getWindow().flatMap(window -> window
.getKsqlWindowExpression()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;

public class KsqlTypesDeserializationModule extends SimpleModule {

public KsqlTypesDeserializationModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
public KsqlTypesDeserializationModule(boolean withImplicitColumns) {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer(withImplicitColumns));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
Expand All @@ -25,6 +25,11 @@
import java.io.IOException;

final class LogicalSchemaDeserializer extends JsonDeserializer<LogicalSchema> {
final boolean withImplicitColumns;

LogicalSchemaDeserializer(final boolean withImplicitColumns) {
this.withImplicitColumns = withImplicitColumns;
}

@Override
public LogicalSchema deserialize(
Expand All @@ -36,6 +41,6 @@ public LogicalSchema deserialize(

final TableElements tableElements = SchemaParser.parse(text, TypeRegistry.EMPTY);

return tableElements.toLogicalSchema(false);
return tableElements.toLogicalSchema(withImplicitColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -32,7 +32,7 @@ public class LogicalSchemaDeserializerTest {

@BeforeClass
public static void classSetUp() {
MAPPER.registerModule(new TestModule());
MAPPER.registerModule(new TestModule(false));
}

@Test
Expand Down Expand Up @@ -83,10 +83,29 @@ public void shouldDeserializeSchemaWithKeyAfterValue() throws Exception {
.build()));
}

@Test
public void shouldAddImplicitColumns() throws Exception {
// Given:
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new TestModule(true));
final String json = "\"`v0` INTEGER\"";

// When:
final LogicalSchema schema = mapper.readValue(json, LogicalSchema.class);

// Then:
assertThat(schema, is(LogicalSchema.builder()
.valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER)
.build()));
}

private static class TestModule extends SimpleModule {

private TestModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
private TestModule(boolean withImplicitColumns) {
addDeserializer(
LogicalSchema.class,
new LogicalSchemaDeserializer(withImplicitColumns)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
Expand All @@ -35,7 +35,7 @@ public final class KsqlClient implements AutoCloseable {
static {
JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule());
JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule(false));
}

private final Client httpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.Arrays;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class TableRowsEntityTest {
MAPPER = new ObjectMapper();
MAPPER.registerModule(new Jdk8Module());
MAPPER.registerModule(new KsqlTypesSerializationModule());
MAPPER.registerModule(new KsqlTypesDeserializationModule());
MAPPER.registerModule(new KsqlTypesDeserializationModule(false));
}

@Test(expected = IllegalArgumentException.class)
Expand Down

0 comments on commit 4dd76ac

Please sign in to comment.