Skip to content

Commit

Permalink
fix: improve error handling of invalid Avro identifier (#6239)
Browse files Browse the repository at this point in the history
* fix: improve error message on invalid Avro identifier

fixes issue where using a column name in ksqlDB that was incompatible with Avro, e.g. one that starts with a numeric, either wasn't detected at the time the statement was issued (in the case of C* statements), or returned a ambiguous error message (in the case of C*AS statements).

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates committed Sep 17, 2020
1 parent 0f04061 commit 8dd3942
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,33 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Avro only supports MAPs with STRING keys"
}
},
{
"name": "should return error on field names incompatible with avro - C*",
"statements": [
"CREATE STREAM INPUT (`1AvroDoesNotLikeFieldsThatStartWithNumbers` DOUBLE) WITH (kafka_topic='input_topic', value_format='AVRO');"
],
"topics": [
{
"name": "input_topic",
"format": "AVRO"
}
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Schema is not compatible with Avro: Illegal initial character: 1AvroDoesNotLikeFieldsThatStartWithNumbers"
}
},
{
"name": "should return error on field names incompatible with avro - C*AS",
"statements": [
"CREATE STREAM INPUT (v DOUBLE) WITH (kafka_topic='input_topic', value_format='AVRO');",
"CREATE STREAM OUTPUT AS SELECT v AS `1AvroDoesNotLikeFieldsThatStartWithNumbers` FROM INPUT;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Schema is not compatible with Avro: Illegal initial character: 1AvroDoesNotLikeFieldsThatStartWithNumbers"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import io.confluent.ksql.serde.connect.ConnectFormat;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.avro.SchemaParseException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -74,6 +76,9 @@ protected <T> Serde<T> getConnectSerde(
final Supplier<SchemaRegistryClient> srFactory,
final Class<T> targetType
) {
// Ensure schema is compatible by converting to Avro schema:
fromConnectSchema(connectSchema, FormatInfo.of(AvroFormat.NAME, formatProps));

final String schemaFullName = getSchemaName(formatProps);

return new KsqlAvroSerdeFactory(schemaFullName)
Expand All @@ -95,7 +100,11 @@ protected ParsedSchema fromConnectSchema(
final Schema avroCompatibleSchema = AvroSchemas
.getAvroCompatibleConnectSchema(schema, schemaFullName);

return new AvroSchema(avroData.fromConnectSchema(avroCompatibleSchema));
try {
return new AvroSchema(avroData.fromConnectSchema(avroCompatibleSchema));
} catch (final SchemaParseException e) {
throw new KsqlException("Schema is not compatible with Avro: " + e.getMessage(), e);
}
}

private static String getSchemaName(final Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.confluent.ksql.serde.avro;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.EnabledSerdeFeatures;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class AvroFormatTest {

@Mock
private KsqlConfig config;
@Mock
private Supplier<SchemaRegistryClient> srFactory;
@Mock
private FormatInfo formatInfo;

private AvroFormat format;
private Map<String, String> formatProps;

@Before
public void setUp() {
format = new AvroFormat();
formatProps = new HashMap<>();

when(formatInfo.getProperties()).thenReturn(formatProps);
}

@Test
public void shouldThrowWhenCreatingSerdeIfSchemaContainsInvalidAvroNames() {
// Given:
final PersistenceSchema schema = PersistenceSchema.from(
ImmutableList.of(column("1AintRight")),
EnabledSerdeFeatures.of()
);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> format.getSerde(schema, formatProps, config, srFactory)
);

// Then:
assertThat(e.getMessage(), is("Schema is not compatible with Avro: Illegal initial character: 1AintRight"));
}

@Test
public void shouldThrowWhenBuildingAvroSchemafSchemaContainsInvalidAvroNames() {
// Given:
final PersistenceSchema schema = PersistenceSchema.from(
ImmutableList.of(column("2Bad")),
EnabledSerdeFeatures.of()
);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> format.toParsedSchema(schema, formatInfo)
);

// Then:
assertThat(e.getMessage(), is("Schema is not compatible with Avro: Illegal initial character: 2Bad"));
}

private static SimpleColumn column(final String name) {
final SimpleColumn column = mock(SimpleColumn.class);
when(column.name()).thenReturn(ColumnName.of(name));
when(column.type()).thenReturn(SqlTypes.BIGINT);
return column;
}
}

0 comments on commit 8dd3942

Please sign in to comment.