Skip to content

Commit

Permalink
fix: Map invalid casts to null. (#9336)
Browse files Browse the repository at this point in the history
* fix: Map invalid casts to null.

Addresses #9311
  • Loading branch information
jnh5y committed Jul 26, 2022
1 parent 438d2d6 commit 36608cf
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,10 @@ public Pair<String, SqlType> visitCast(
) {
final Pair<String, SqlType> expr = process(node.getExpression(), context);
final SqlType to = node.getType().getSqlType();
return Pair.of(genCastCode(expr, to), to);
final String javaType = SchemaConverters.sqlToJavaConverter()
.toJavaType(node.getType().getSqlType()).getTypeName();
final String code = evaluateOrReturnNull(genCastCode(expr,to), to.toString(), javaType);
return Pair.of(code, to);
}

@Override
Expand Down Expand Up @@ -1195,6 +1198,25 @@ private String evaluateOrReturnNull(final String s, final String type) {
}
}

private String evaluateOrReturnNull(final String s, final String type, final String javaType) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_NESTED_ERROR_HANDLING_CONFIG)) {
return " (new " + Supplier.class.getSimpleName() + "<" + javaType + ">() {"
+ "@Override public " + javaType + " get() {"
+ " try {"
+ " return " + s + ";"
+ " } catch (Exception e) {"
+ " logger.error(RecordProcessingError.recordProcessingError("
+ " \"Error processing " + type + "\","
+ " e instanceof InvocationTargetException? e.getCause() : e,"
+ " row));"
+ " return (" + javaType + ") defaultValue;"
+ " }"
+ "}}).get()";
} else {
return s;
}
}

@Override
public Pair<String, SqlType> visitBetweenPredicate(
final BetweenPredicate node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import io.confluent.ksql.execution.expression.tree.LambdaFunctionCall;
import io.confluent.ksql.execution.expression.tree.LambdaVariable;
import io.confluent.ksql.execution.expression.tree.LikePredicate;
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.SearchedCaseExpression;
import io.confluent.ksql.execution.expression.tree.SimpleCaseExpression;
Expand All @@ -85,7 +84,6 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -258,24 +256,6 @@ public void shouldProcessStructExpressionWithDereferencesCorrectly() {
+ "logger.error(RecordProcessingError.recordProcessingError( \"Error processing struct field\", e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue; }}}).get()).put(\"col2\", (new Supplier<Object>() {@Override public Object get() { try { return ((Double) (COL5 == null ? null : ((java.util.Map)COL5).get(\"key1\"))); } catch (Exception e) { logger.error(RecordProcessingError.recordProcessingError( \"Error processing struct field\", e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue; }}}).get())) == null ? null : ((Struct)new Struct(schema0).put(\"col1\", (new Supplier<Object>() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { logger.error(RecordProcessingError.recordProcessingError( \"Error processing struct field\", e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue; }}}).get()).put(\"col2\", (new Supplier<Object>() {@Override public Object get() { try { return ((Double) (COL5 == null ? null : ((java.util.Map)COL5).get(\"key1\"))); } catch (Exception e) { logger.error(RecordProcessingError.recordProcessingError( \"Error processing struct field\", e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue; }}}).get())).get(\"col2\")))"));
}

@Test
public void shouldCreateCorrectCastJavaExpression() {
// Given:
final Expression castBigintInteger = new Cast(
COL0,
new io.confluent.ksql.execution.expression.tree.Type(SqlPrimitiveType.of("INTEGER"))
);

// When:
final String actual = sqlToJavaVisitor.process(castBigintInteger);

// Then:
final String expected = CastEvaluator
.generateCode("COL0", SqlTypes.BIGINT, SqlTypes.INTEGER, ksqlConfig);

assertThat(actual, is(expected));
}

@Test
public void shouldPostfixFunctionInstancesWithUniqueId() {
// Given:
Expand Down Expand Up @@ -348,11 +328,13 @@ public void shouldImplicitlyCastFunctionCallParameters() {
);

// Then:
final String doubleCast = CastEvaluator.generateCode(
"new BigDecimal(\"1.2\")", SqlTypes.decimal(2, 1), SqlTypes.DOUBLE, ksqlConfig);
final String doubleCast = sqlToJavaVisitor.process(new Cast(
new DecimalLiteral(new BigDecimal("1.2")),
new io.confluent.ksql.execution.expression.tree.Type(SqlTypes.DOUBLE)));

final String longCast = CastEvaluator.generateCode(
"1", SqlTypes.INTEGER, SqlTypes.BIGINT, ksqlConfig);
final String longCast = sqlToJavaVisitor.process(new Cast(
new IntegerLiteral(1),
new io.confluent.ksql.execution.expression.tree.Type(SqlTypes.BIGINT)));

assertThat(javaExpression, is(
"((String) FOO_0.evaluate(" +doubleCast + ", " + longCast + "))"
Expand Down Expand Up @@ -380,11 +362,13 @@ public void shouldImplicitlyCastFunctionCallParametersVariadic() {
);

// Then:
final String doubleCast = CastEvaluator.generateCode(
"new BigDecimal(\"1.2\")", SqlTypes.decimal(2, 1), SqlTypes.DOUBLE, ksqlConfig);
final String doubleCast = sqlToJavaVisitor.process(new Cast(
new DecimalLiteral(new BigDecimal("1.2")),
new io.confluent.ksql.execution.expression.tree.Type(SqlTypes.DOUBLE)));

final String longCast = CastEvaluator.generateCode(
"1", SqlTypes.INTEGER, SqlTypes.BIGINT, ksqlConfig);
final String longCast = sqlToJavaVisitor.process(new Cast(
new IntegerLiteral(1),
new io.confluent.ksql.execution.expression.tree.Type(SqlTypes.BIGINT)));

assertThat(javaExpression, is(
"((String) FOO_0.evaluate(" +doubleCast + ", " + longCast + ", " + longCast + "))"
Expand Down Expand Up @@ -1314,24 +1298,32 @@ public void shouldGenerateCorrectCodeForNestedLambdas() {
// Then
assertThat(
javaExpression, equalTo(
"(((Double) nested_0.evaluate(COL4, ((Double)NullSafe.apply(0,new Function() {\n"
"(((Double) nested_0.evaluate(COL4, (new Supplier<java.lang.Double>() " +
"{@Override public java.lang.Double get() { " +
"try { return ((Double)NullSafe.apply(0,new Function() {\n"
+ " @Override\n"
+ " public Object apply(Object arg1) {\n"
+ " final Integer val = (Integer) arg1;\n"
+ " return val.doubleValue();\n"
+ " }\n"
+ "})), new BiFunction() {\n"
+ "})); } catch (Exception e) { logger.error(RecordProcessingError.recordProcessingError( " +
"\"Error processing DOUBLE\", e instanceof InvocationTargetException? e.getCause() : e, " +
"row)); return (java.lang.Double) defaultValue; }}}).get(), new BiFunction() {\n"
+ " @Override\n"
+ " public Object apply(Object arg1, Object arg2) {\n"
+ " final Double A = (Double) arg1;\n"
+ " final Integer B = (Integer) arg2;\n"
+ " return (((Double) nested_1.evaluate(COL4, ((Double)NullSafe.apply(0,new Function() {\n"
+ " return (((Double) nested_1.evaluate(COL4, (new Supplier<java.lang.Double>() " +
"{@Override public java.lang.Double get() { try { " +
"return ((Double)NullSafe.apply(0,new Function() {\n"
+ " @Override\n"
+ " public Object apply(Object arg1) {\n"
+ " final Integer val = (Integer) arg1;\n"
+ " return val.doubleValue();\n"
+ " }\n"
+ "})), new BiFunction() {\n"
+ "})); } catch (Exception e) { logger.error(RecordProcessingError.recordProcessingError( " +
"\"Error processing DOUBLE\", e instanceof InvocationTargetException? e.getCause() : e, " +
"row)); return (java.lang.Double) defaultValue; }}}).get(), new BiFunction() {\n"
+ " @Override\n"
+ " public Object apply(Object arg1, Object arg2) {\n"
+ " final Double Q = (Double) arg1;\n"
Expand Down Expand Up @@ -1377,9 +1369,4 @@ private void givenUdf(
final UdfMetadata metadata = mock(UdfMetadata.class);
when(factory.getMetadata()).thenReturn(metadata);
}

private String onException(final String type) {
return String.format("logger.error(RecordProcessingError.recordProcessingError( \"Error processing %s\", "
+ "e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue;", type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, F0 STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `F0` STRING",
"timestampColumn" : null,
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"windowInfo" : null,
"orReplace" : false,
"isSource" : false
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n IFNULL(CAST(PARSE_DATE(TEST.F0, 'yyyy-MM-dd') AS STRING), '1900-01-01') VAL\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `VAL` STRING",
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"windowInfo" : null,
"orReplace" : false,
"isSource" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"timestampColumn" : null,
"sourceSchema" : "`ID` STRING KEY, `F0` STRING",
"pseudoColumnVersion" : 1
},
"keyColumnNames" : [ "ID" ],
"selectedKeys" : null,
"selectExpressions" : [ "IFNULL(CAST(PARSE_DATE(F0, 'yyyy-MM-dd') AS STRING), '1900-01-01') AS VAL" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"topicName" : "OUTPUT",
"timestampColumn" : null
},
"queryId" : "CSAS_OUTPUT_0",
"runtimeId" : null
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"metric.reporters" : "",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.connect.basic.auth.credentials.reload" : "false",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.stream.enabled" : "true",
"ksql.query.push.v2.interpreter.enabled" : "true",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.variable.substitution.enable" : "true",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.query.push.v2.alos.enabled" : "true",
"ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.query.pull.range.scan.enabled" : "true",
"ksql.transient.query.cleanup.service.initial.delay.seconds" : "600",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.lambdas.enabled" : "true",
"ksql.source.table.materialization.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.websocket.connection.max.timeout.ms" : "0",
"ksql.persistence.wrap.single.values" : null,
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.connect.basic.auth.credentials.source" : "NONE",
"ksql.schema.registry.url" : "schema_registry.url:0",
"ksql.properties.overrides.denylist" : "",
"ksql.service.id" : "some.ksql.service.id",
"ksql.query.push.v2.max.catchup.consumers" : "5",
"ksql.assert.topic.default.timeout.ms" : "1000",
"ksql.query.push.v2.enabled" : "false",
"ksql.transient.query.cleanup.service.enable" : "true",
"ksql.query.push.v2.metrics.enabled" : "true",
"ksql.rowpartition.rowoffset.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.endpoint.migrate.query" : "true",
"ksql.query.push.v2.registry.installed" : "false",
"ksql.streams.num.stream.threads" : "4",
"ksql.metrics.tags.custom" : "",
"ksql.query.push.v2.catchup.consumer.msg.window" : "50",
"ksql.runtime.feature.shared.enabled" : "false",
"ksql.udf.collect.metrics" : "false",
"ksql.new.query.planner.enabled" : "false",
"ksql.connect.request.headers.plugin" : null,
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.headers.columns.enabled" : "true",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.connect.request.timeout.ms" : "5000",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.query.cleanup.shutdown.timeout.ms" : "30000",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.create.or.replace.enabled" : "true",
"ksql.shared.runtimes.count" : "2",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.transient.query.cleanup.service.period.seconds" : "600",
"ksql.suppress.enabled" : "true",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.connect.basic.auth.credentials.file" : "",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.query.push.v2.new.latest.delay.ms" : "5000",
"ksql.query.push.v2.latest.reset.age.ms" : "30000",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.json_sr.converter.deserializer.enabled" : "true",
"ksql.assert.schema.default.timeout.ms" : "1000",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.continuation.tokens.enabled" : "false",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.query.pull.thread.pool.size" : "50",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading

0 comments on commit 36608cf

Please sign in to comment.