Skip to content

Commit

Permalink
fix: add deserializer for SqlType (#4830)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Mar 19, 2020
1 parent fa84576 commit eed9912
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;

public final class KsqlTypesSerializationModule extends SimpleModule {

public KsqlTypesSerializationModule() {
addSerializer(LogicalSchema.class, new LogicalSchemaSerializer());
addSerializer(SqlType.class, new SqlTypeSchemaSerializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.io.IOException;

public class SqlTypeSchemaSerializer extends JsonSerializer<SqlType> {

@Override
public void serialize(
final SqlType value,
final JsonGenerator gen,
final SerializerProvider serializers
) throws IOException {
gen.writeString(value.toString(FormatOptions.none()));
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.confluent.ksql.execution.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import org.junit.Test;

public class PlanJsonMapperTest {
Expand All @@ -29,4 +31,11 @@ public void shouldEnableFailOnNullProperties() {
public void shouldEnableFailOnInvalidSubtype() {
assertThat(MAPPER.isEnabled(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE), is(true));
}

@Test
public void shouldHaveTypeMapperRegistered() {
assertThat(
MAPPER.getRegisteredModuleIds(),
hasItem(new KsqlTypesDeserializationModule(false).getTypeId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;

public class KsqlTypesDeserializationModule extends SimpleModule {

public KsqlTypesDeserializationModule(final boolean withImplicitColumns) {
public KsqlTypesDeserializationModule(
final boolean withImplicitColumns
) {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer(withImplicitColumns));
addDeserializer(SqlType.class, new SqlTypeDeserializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.schema.ksql.SqlTypeParser;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.io.IOException;

public class SqlTypeDeserializer extends JsonDeserializer<SqlType> {

public SqlTypeDeserializer() {
}

@Override
public SqlType deserialize(
final JsonParser p,
final DeserializationContext ctxt
) throws IOException {
final String text = p.readValueAs(String.class);
return SqlTypeParser.create(TypeRegistry.EMPTY).parse(text).getSqlType();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.json;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlMap;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class KsqlTypesSerdeModuleTest {

private static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new KsqlTypesDeserializationModule(false))
.registerModule(new KsqlTypesSerializationModule());

@Test
public void shouldSerDeSqlPrimitiveTypes() throws JsonProcessingException {
// Given:
final SqlType[] types = new SqlType[]{
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING
};

for (final SqlType type : types) {
// When:
final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(type), SqlType.class);

// Then
assertThat(out, is(type));
}
}

@Test
public void shouldSerDeSqlArrayTypes() throws JsonProcessingException {
// Given:
final SqlType[] types = new SqlType[]{
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING
};

for (final SqlType type : types) {
// When:
final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(SqlArray.of(type)), SqlType.class);

// Then
assertThat(out, is(SqlArray.of(type)));
}
}

@Test
public void shouldSerDeSqlMapTypes() throws JsonProcessingException {
// Given:
final SqlType[] types = new SqlType[]{
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING
};

for (final SqlType type : types) {
// When:
final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(SqlMap.of(type)), SqlType.class);

// Then
assertThat(out, is(SqlMap.of(type)));
}
}

@Test
public void shouldSerDeStructType() throws JsonProcessingException {
// Given:
SqlStruct struct = SqlStruct.builder().field("foo", SqlArray.of(SqlTypes.STRING)).build();

// When:
final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(struct), SqlType.class);

// Then:
assertThat(out, is(struct));
}
}

0 comments on commit eed9912

Please sign in to comment.