From 1a042cd2bb9babc52a7aa071638aa408a1e47b73 Mon Sep 17 00:00:00 2001 From: Steven Zhang <35498506+stevenpyzhang@users.noreply.github.com> Date: Fri, 26 Feb 2021 00:28:54 -0800 Subject: [PATCH] feat: implement correct logic for nested lambdas and more complex lambda expressions (#7056) * feat: implement correct logic for nested lambdas and more complex lambda expressions * qtt * update qtt and add more comments to clarify code --- .../ksql/execution/codegen/CodeGenRunner.java | 26 +- .../execution/codegen/SqlToJavaVisitor.java | 22 +- .../execution/codegen/TypeContextUtil.java | 51 ++++ .../execution/util/ExpressionTypeManager.java | 16 +- .../codegen/SqlToJavaVisitorTest.java | 182 ++++++++++++-- .../util/ExpressionTypeManagerTest.java | 72 ++++++ .../7.0.0_1614300463493/plan.json | 145 +++++++++++ .../7.0.0_1614300463493/spec.json | 147 ++++++++++++ .../7.0.0_1614300463493/topology | 13 + .../7.0.0_1614300463439/plan.json | 145 +++++++++++ .../7.0.0_1614300463439/spec.json | 227 ++++++++++++++++++ .../7.0.0_1614300463439/topology | 13 + .../complex-lambda.json | 43 ++++ 13 files changed, 1069 insertions(+), 33 deletions(-) create mode 100644 ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/TypeContextUtil.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/topology create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/complex-lambda.json diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java index 799f29f248e7..ebf140a60100 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java @@ -183,23 +183,34 @@ public Void visitLikePredicate(final LikePredicate node, final TypeContext conte @Override public Void visitFunctionCall(final FunctionCall node, final TypeContext context) { - final List argumentTypes = new ArrayList<>(); final FunctionName functionName = node.getName(); + + // this context gets updated as we process non lambda arguments + final TypeContext currentTypeContext = context.getCopy(); + + final List argumentTypes = new ArrayList<>(); + final List typeContextsForChildren = new ArrayList<>(); final boolean hasLambda = node.hasLambdaFunctionCallArguments(); for (final Expression argExpr : node.getArguments()) { - final TypeContext childContext = context.getCopy(); + final TypeContext childContext = TypeContextUtil.contextForExpression( + argExpr, context, currentTypeContext + ); + typeContextsForChildren.add(childContext); + + // pass a copy of the context to the type checker so that type checking in one + // expression subtree doesn't interfere with type checking in another one final SqlType resolvedArgType = - expressionTypeManager.getExpressionSqlType(argExpr, childContext); + expressionTypeManager.getExpressionSqlType(argExpr, childContext.getCopy()); if (argExpr instanceof LambdaFunctionCall) { argumentTypes.add( SqlArgument.of( - SqlLambda.of(context.getLambdaInputTypes(), childContext.getSqlType()))); + SqlLambda.of(currentTypeContext.getLambdaInputTypes(), resolvedArgType))); } else { argumentTypes.add(SqlArgument.of(resolvedArgType)); // for lambdas - we save the type information to resolve the lambda generics if (hasLambda) { - context.visitType(resolvedArgType); + currentTypeContext.visitType(resolvedArgType); } } } @@ -211,8 +222,9 @@ public Void visitFunctionCall(final FunctionCall node, final TypeContext context function.newInstance(ksqlConfig) ); - for (final Expression argExpr : node.getArguments()) { - process(argExpr, context.getCopy()); + final List arguments = node.getArguments(); + for (int i = 0; i < arguments.size(); i++) { + process(arguments.get(i), typeContextsForChildren.get(i)); } return null; } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 501b16817ecc..5aed03b17bce 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -452,21 +452,33 @@ public Pair visitFunctionCall( final String instanceName = funNameToCodeName.apply(functionName); final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName()); + + // this context gets updated as we process non lambda arguments + final TypeContext currentTypeContext = context.getCopy(); + final List argumentSchemas = new ArrayList<>(); + final List typeContextsForChildren = new ArrayList<>(); final boolean hasLambda = node.hasLambdaFunctionCallArguments(); + for (final Expression argExpr : node.getArguments()) { - final TypeContext childContext = context.getCopy(); + final TypeContext childContext = TypeContextUtil.contextForExpression( + argExpr, context, currentTypeContext + ); + typeContextsForChildren.add(childContext); + + // pass a copy of the context to the type checker so that type checking in one + // expression subtree doesn't interfere with type checking in another one final SqlType resolvedArgType = - expressionTypeManager.getExpressionSqlType(argExpr, childContext); + expressionTypeManager.getExpressionSqlType(argExpr, childContext.getCopy()); if (argExpr instanceof LambdaFunctionCall) { argumentSchemas.add( SqlArgument.of( - SqlLambda.of(context.getLambdaInputTypes(), childContext.getSqlType()))); + SqlLambda.of(currentTypeContext.getLambdaInputTypes(), resolvedArgType))); } else { argumentSchemas.add(SqlArgument.of(resolvedArgType)); // for lambdas - we save the type information to resolve the lambda generics if (hasLambda) { - context.visitType(resolvedArgType); + currentTypeContext.visitType(resolvedArgType); } } } @@ -494,7 +506,7 @@ public Pair visitFunctionCall( } joiner.add( - process(convertArgument(arg, sqlType, paramType), context.getCopy()) + process(convertArgument(arg, sqlType, paramType), typeContextsForChildren.get(i)) .getLeft()); } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/TypeContextUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/TypeContextUtil.java new file mode 100644 index 000000000000..40a79179b0d1 --- /dev/null +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/TypeContextUtil.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.codegen; + +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.LambdaFunctionCall; + +public final class TypeContextUtil { + private TypeContextUtil() { + + } + + /** + * Returns a copy of the appropriate context to use when processing an expression subtree. + * A copy is required to prevent different subtrees from getting a context that's been + * modified by another subtree. For non-lambdas we want to use the parent context because + * there may be valid overlapping lambda parameter names in different child nodes. For + * lambdas, we want to use the updateContext which has the type information the lambda + * expression body needs. + * + * @param expression the current expression we're processing + * @param parentContext the context passed into the parent node of the expression + * @param updatedContext the context that has been updated as we processed other child + * nodes + * @return a copy of either parent or current type context to be passed to the child + */ + public static TypeContext contextForExpression( + final Expression expression, + final TypeContext parentContext, + final TypeContext updatedContext + ) { + if (expression instanceof LambdaFunctionCall) { + return updatedContext.getCopy(); + } else { + return parentContext.getCopy(); + } + } +} diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 688a150838f6..07d52d62932f 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.codegen.TypeContext; +import io.confluent.ksql.execution.codegen.TypeContextUtil; import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; @@ -473,23 +474,30 @@ public Void visitFunctionCall( final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName()); + // this context gets updated as we process non lambda arguments + final TypeContext currentTypeContext = expressionTypeContext.getCopy(); + final List argTypes = new ArrayList<>(); final boolean hasLambda = node.hasLambdaFunctionCallArguments(); for (final Expression expression : node.getArguments()) { - final TypeContext childContext = expressionTypeContext.getCopy(); + final TypeContext childContext = TypeContextUtil.contextForExpression( + expression, expressionTypeContext, currentTypeContext + ); process(expression, childContext); final SqlType resolvedArgType = childContext.getSqlType(); + if (expression instanceof LambdaFunctionCall) { argTypes.add( SqlArgument.of( - SqlLambda.of(expressionTypeContext.getLambdaInputTypes(), - childContext.getSqlType()))); + SqlLambda.of( + currentTypeContext.getLambdaInputTypes(), + resolvedArgType))); } else { argTypes.add(SqlArgument.of(resolvedArgType)); // for lambdas - we save the type information to resolve the lambda generics if (hasLambda) { - expressionTypeContext.visitType(resolvedArgType); + currentTypeContext.visitType(resolvedArgType); } } } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index 74ddbe887910..c1c128a656d3 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -30,7 +30,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; @@ -65,7 +64,6 @@ import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.execution.expression.tree.SubscriptExpression; import io.confluent.ksql.execution.expression.tree.TimeLiteral; -import io.confluent.ksql.execution.expression.tree.TimestampLiteral; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.WhenClause; import io.confluent.ksql.function.FunctionRegistry; @@ -74,18 +72,16 @@ import io.confluent.ksql.function.types.ArrayType; import io.confluent.ksql.function.types.GenericType; import io.confluent.ksql.function.types.LambdaType; -import io.confluent.ksql.function.types.MapType; -import io.confluent.ksql.function.types.ParamType; import io.confluent.ksql.function.types.ParamTypes; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.Operator; import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.KsqlConfig; import java.math.BigDecimal; -import java.sql.Timestamp; import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -243,10 +239,10 @@ public void shouldPostfixFunctionInstancesWithUniqueId() { final KsqlScalarFunction ssFunction = mock(KsqlScalarFunction.class); final UdfFactory catFactory = mock(UdfFactory.class); final KsqlScalarFunction catFunction = mock(KsqlScalarFunction.class); - givenUdf("SUBSTRING", ssFactory, ssFunction); + givenUdf("SUBSTRING", ssFactory, ssFunction, SqlTypes.STRING); when(ssFunction.parameters()) .thenReturn(ImmutableList.of(ParamTypes.STRING, ParamTypes.INTEGER, ParamTypes.INTEGER)); - givenUdf("CONCAT", catFactory, catFunction); + givenUdf("CONCAT", catFactory, catFunction, SqlTypes.STRING); when(catFunction.parameters()) .thenReturn(ImmutableList.of(ParamTypes.STRING, ParamTypes.STRING)); final FunctionName ssName = FunctionName.of("SUBSTRING"); @@ -284,7 +280,7 @@ public void shouldImplicitlyCastFunctionCallParameters() { // Given: final UdfFactory udfFactory = mock(UdfFactory.class); final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); - givenUdf("FOO", udfFactory, udf); + givenUdf("FOO", udfFactory, udf, SqlTypes.STRING); when(udf.parameters()).thenReturn(ImmutableList.of(ParamTypes.DOUBLE, ParamTypes.LONG)); // When: @@ -312,7 +308,7 @@ public void shouldImplicitlyCastFunctionCallParametersVariadic() { // Given: final UdfFactory udfFactory = mock(UdfFactory.class); final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); - givenUdf("FOO", udfFactory, udf); + givenUdf("FOO", udfFactory, udf, SqlTypes.STRING); when(udf.parameters()).thenReturn(ImmutableList.of(ParamTypes.DOUBLE, ArrayType.of(ParamTypes.LONG))); when(udf.isVariadic()).thenReturn(true); @@ -344,7 +340,7 @@ public void shouldHandleFunctionCallsWithGenerics() { // Given: final UdfFactory udfFactory = mock(UdfFactory.class); final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); - givenUdf("FOO", udfFactory, udf); + givenUdf("FOO", udfFactory, udf, SqlTypes.STRING); when(udf.parameters()).thenReturn(ImmutableList.of(GenericType.of("T"), GenericType.of("T"))); // When: @@ -893,12 +889,12 @@ public void shouldGenerateCorrectCodeForInPredicate() { } @Test - public void shouldGenerateCorrectCodeForTransformLambdaExpression() { + public void shouldGenerateCorrectCodeForLambdaExpression() { // Given: final UdfFactory udfFactory = mock(UdfFactory.class); final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); - givenUdf("ABS", udfFactory, udf); - givenUdf("TRANSFORM", udfFactory, udf); + givenUdf("ABS", udfFactory, udf, SqlTypes.STRING); + givenUdf("TRANSFORM", udfFactory, udf, SqlTypes.STRING); when(udf.parameters()). thenReturn(ImmutableList.of( ArrayType.of(ParamTypes.DOUBLE), @@ -925,11 +921,11 @@ javaExpression, equalTo( } @Test - public void shouldGenerateCorrectCodeForReduceLambdaExpression() { + public void shouldGenerateCorrectCodeForLambdaExpressionWithTwoArguments() { // Given: final UdfFactory udfFactory = mock(UdfFactory.class); final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); - givenUdf("REDUCE", udfFactory, udf); + givenUdf("REDUCE", udfFactory, udf, SqlTypes.STRING); when(udf.parameters()). thenReturn(ImmutableList.of( ArrayType.of(ParamTypes.DOUBLE), @@ -967,6 +963,155 @@ javaExpression, equalTo( " }\n" + "}))")); } + + @Test + public void shouldGenerateCorrectCodeForFunctionWithMultipleLambdas() { + // Given: + final UdfFactory udfFactory = mock(UdfFactory.class); + final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); + givenUdf("function", udfFactory, udf, SqlTypes.STRING); + when(udf.parameters()). + thenReturn(ImmutableList.of( + ArrayType.of(ParamTypes.DOUBLE), + ParamTypes.STRING, + LambdaType.of( + ImmutableList.of(ParamTypes.DOUBLE, ParamTypes.STRING), + ParamTypes.DOUBLE), + LambdaType.of( + ImmutableList.of(ParamTypes.DOUBLE, ParamTypes.STRING), + ParamTypes.STRING) + )); + + final Expression expression = new FunctionCall ( + FunctionName.of("function"), + ImmutableList.of( + ARRAYCOL, + COL1, + new LambdaFunctionCall( + ImmutableList.of("X", "S"), + new ArithmeticBinaryExpression( + Operator.ADD, + new LambdaVariable("X"), + new LambdaVariable("X")) + ), + new LambdaFunctionCall( + ImmutableList.of("X", "S"), + new SearchedCaseExpression( + ImmutableList.of( + new WhenClause( + new ComparisonExpression( + ComparisonExpression.Type.LESS_THAN, new LambdaVariable("X"), new IntegerLiteral(10)), + new StringLiteral("test") + ), + new WhenClause( + new ComparisonExpression( + ComparisonExpression.Type.LESS_THAN, new LambdaVariable("X"), new IntegerLiteral(100)), + new StringLiteral("test2") + ) + ), + Optional.of(new LambdaVariable("S")) + ) + ))); + + // When: + final String javaExpression = sqlToJavaVisitor.process(expression); + + // Then + assertThat( + javaExpression, equalTo("((String) function_0.evaluate(COL4, COL1, new BiFunction() {\n" + + " @Override\n" + + " public Object apply(Object arg1, Object arg2) {\n" + + " final Double X = (Double) arg1;\n" + + " final String S = (String) arg2;\n" + + " return (X + X);\n" + + " }\n" + + "}, new BiFunction() {\n" + + " @Override\n" + + " public Object apply(Object arg1, Object arg2) {\n" + + " final Double X = (Double) arg1;\n" + + " final String S = (String) arg2;\n" + + " return ((java.lang.String)SearchedCaseFunction.searchedCaseFunction(ImmutableList.copyOf(Arrays.asList( SearchedCaseFunction.whenClause( new Supplier() { @Override public Boolean get() { return ((((Object)(X)) == null || ((Object)(10)) == null) ? false : (X < 10)); }}, new Supplier() { @Override public java.lang.String get() { return \"test\"; }}), SearchedCaseFunction.whenClause( new Supplier() { @Override public Boolean get() { return ((((Object)(X)) == null || ((Object)(100)) == null) ? false : (X < 100)); }}, new Supplier() { @Override public java.lang.String get() { return \"test2\"; }}))), new Supplier() { @Override public java.lang.String get() { return S; }}));\n" + + " }\n" + + "}))")); + } + + @Test + public void shouldGenerateCorrectCodeForNestedLambdas() { + // Given: + final UdfFactory udfFactory = mock(UdfFactory.class); + final KsqlScalarFunction udf = mock(KsqlScalarFunction.class); + givenUdf("nested", udfFactory, udf, SqlTypes.DOUBLE); + when(udf.parameters()). + thenReturn(ImmutableList.of( + ArrayType.of(ParamTypes.DOUBLE), + ParamTypes.DOUBLE, + LambdaType.of( + ImmutableList.of(ParamTypes.DOUBLE, ParamTypes.DOUBLE), + ParamTypes.DOUBLE)) + ); + + final Expression expression = new ArithmeticBinaryExpression( + Operator.ADD, + new FunctionCall( + FunctionName.of("nested"), + ImmutableList.of( + ARRAYCOL, + new IntegerLiteral(0), + new LambdaFunctionCall( + ImmutableList.of("A", "B"), + new ArithmeticBinaryExpression( + Operator.ADD, + new FunctionCall( + FunctionName.of("nested"), + ImmutableList.of( + ARRAYCOL, + new IntegerLiteral(0), + new LambdaFunctionCall( + ImmutableList.of("Q", "V"), + new ArithmeticBinaryExpression( + Operator.ADD, + new LambdaVariable("Q"), + new LambdaVariable("V")) + ))), + new LambdaVariable("B")) + ))), + new IntegerLiteral(5) + ); + + // When: + final String javaExpression = sqlToJavaVisitor.process(expression); + + // Then + assertThat( + javaExpression, equalTo( + "(((Double) nested_0.evaluate(COL4, (Double)NullSafe.apply(0,new Function() {\n" + + " @Override\n" + + " public Object apply(Object arg1) {\n" + + " final Integer val = (Integer) arg1;\n" + + " return val.doubleValue();\n" + + " }\n" + + "}), new BiFunction() {\n" + + " @Override\n" + + " public Object apply(Object arg1, Object arg2) {\n" + + " final Double A = (Double) arg1;\n" + + " final Integer B = (Integer) arg2;\n" + + " return (((Double) nested_1.evaluate(COL4, (Double)NullSafe.apply(0,new Function() {\n" + + " @Override\n" + + " public Object apply(Object arg1) {\n" + + " final Integer val = (Integer) arg1;\n" + + " return val.doubleValue();\n" + + " }\n" + + "}), new BiFunction() {\n" + + " @Override\n" + + " public Object apply(Object arg1, Object arg2) {\n" + + " final Double Q = (Double) arg1;\n" + + " final Integer V = (Integer) arg2;\n" + + " return (Q + V);\n" + + " }\n" + + "})) + B);\n" + + " }\n" + + "})) + 5)")); + } @Test public void shouldThrowErrorOnEmptyLambdaInput() { @@ -1005,12 +1150,15 @@ public void shouldThrowOnTimeLiteral() { } private void givenUdf( - final String name, final UdfFactory factory, final KsqlScalarFunction function + final String name, + final UdfFactory factory, + final KsqlScalarFunction function, + final SqlType returnType ) { when(functionRegistry.isAggregate(FunctionName.of(name))).thenReturn(false); when(functionRegistry.getUdfFactory(FunctionName.of(name))).thenReturn(factory); when(factory.getFunction(anyList())).thenReturn(function); - when(function.getReturnType(anyList())).thenReturn(SqlTypes.STRING); + when(function.getReturnType(anyList())).thenReturn(returnType); final UdfMetadata metadata = mock(UdfMetadata.class); when(factory.getMetadata()).thenReturn(metadata); } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index c5c8ffed4ec6..61bc47f9c260 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -524,6 +524,78 @@ public void shouldFailToEvaluateLambdaWithMismatchedArgumentNumber() { "Was expecting 1 arguments but found 2, [X, Y]. Check your lambda statement.")); } + @Test + public void shouldHandleMultipleLambdasInSameFunctionCallWithDifferentVariableNames() { + // Given: + givenUdfWithNameAndReturnType("TRANSFORM", SqlTypes.INTEGER); + final Expression expression = new ArithmeticBinaryExpression( + Operator.ADD, + new FunctionCall( + FunctionName.of("TRANSFORM"), + ImmutableList.of( + ARRAYCOL, + new IntegerLiteral(0), + new LambdaFunctionCall( + ImmutableList.of("A", "B"), + new ArithmeticBinaryExpression( + Operator.ADD, + new LambdaVariable("A"), + new LambdaVariable("B")) + ), + new LambdaFunctionCall( + ImmutableList.of("K", "V"), + new ArithmeticBinaryExpression( + Operator.ADD, + new LambdaVariable("K"), + new LambdaVariable("V")) + ))), + new IntegerLiteral(5) + ); + + // When: + final SqlType result = expressionTypeManager.getExpressionSqlType(expression); + + assertThat(result, is(SqlTypes.INTEGER)); + } + + @Test + public void shouldHandleNestedLambdas() { + // Given: + givenUdfWithNameAndReturnType("TRANSFORM", SqlTypes.INTEGER); + final Expression expression = new ArithmeticBinaryExpression( + Operator.ADD, + new FunctionCall( + FunctionName.of("TRANSFORM"), + ImmutableList.of( + ARRAYCOL, + new IntegerLiteral(0), + new LambdaFunctionCall( + ImmutableList.of("A", "B"), + new ArithmeticBinaryExpression( + Operator.ADD, + new FunctionCall( + FunctionName.of("TRANSFORM"), + ImmutableList.of( + ARRAYCOL, + new IntegerLiteral(0), + new LambdaFunctionCall( + ImmutableList.of("Q", "V"), + new ArithmeticBinaryExpression( + Operator.ADD, + new LambdaVariable("Q"), + new LambdaVariable("V")) + ))), + new LambdaVariable("B")) + ))), + new IntegerLiteral(5) + ); + + // When: + final SqlType result = expressionTypeManager.getExpressionSqlType(expression); + + assertThat(result, is(SqlTypes.INTEGER)); + } + @Test public void shouldHandleStructFieldDereference() { // Given: diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/plan.json new file mode 100644 index 000000000000..fa89513f36c6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/plan.json @@ -0,0 +1,145 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, MAPPING MAP>) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `MAPPING` MAP>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n TRANSFORM(FILTER(TEST.MAPPING, (A, B) => ((LEN(A) > 2) AND (REDUCE(0, B, (C, D) => (C + D)) < 20))), (X, Y) => LPAD(X, REDUCE(2, Y, (S, K) => ABS((ABS(K) - S))), 'a'), (X, Y) => REDUCE(0, ARRAY_UNION(Y, TRANSFORM(Y, (Z) => (Z * 3))), (E, F) => (E + F))) OUTPUT\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `OUTPUT` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `MAPPING` MAP>" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TRANSFORM(FILTER(MAPPING, (A, B) => ((LEN(A) > 2) AND (REDUCE(0, B, (C, D) => (C + D)) < 20))), (X, Y) => LPAD(X, REDUCE(2, Y, (S, K) => ABS((ABS(K) - S))), 'a'), (X, Y) => REDUCE(0, ARRAY_UNION(Y, TRANSFORM(Y, (Z) => (Z * 3))), (E, F) => (E + F))) AS OUTPUT" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/spec.json new file mode 100644 index 000000000000..17c812dbebd3 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/spec.json @@ -0,0 +1,147 @@ +{ + "version" : "7.0.0", + "timestamp" : 1614300463493, + "path" : "query-validation-tests/complex-lambda.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `MAPPING` MAP>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` STRING KEY, `OUTPUT` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "complex lambda", + "inputs" : [ { + "topic" : "test_topic", + "key" : "one", + "value" : { + "MAPPING" : { + "a" : [ 2, 4, 5 ], + "bcd" : [ -5, 7 ] + } + } + }, { + "topic" : "test_topic", + "key" : "two", + "value" : { + "MAPPING" : { + "hello" : [ 200, 4, 5 ], + "hey" : [ 14, -3, -15, 3 ], + "wow" : [ 2, 3, 4 ] + } + } + }, { + "topic" : "test_topic", + "key" : "three", + "value" : { + "MAPPING" : { + "a" : null, + "bcdefg" : [ -15, 72 ] + } + } + }, { + "topic" : "test_topic", + "key" : "four", + "value" : { + "MAPPING" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "one", + "value" : { + "OUTPUT" : { + "abcd" : 8 + } + } + }, { + "topic" : "OUTPUT", + "key" : "two", + "value" : { + "OUTPUT" : { + "hey" : -4, + "w" : 36 + } + } + }, { + "topic" : "OUTPUT", + "key" : "three", + "value" : { + "OUTPUT" : { } + } + }, { + "topic" : "OUTPUT", + "key" : "four", + "value" : { + "OUTPUT" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (ID STRING KEY, MAPPING MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, TRANSFORM(FILTER(MAPPING, (a, b) => LEN(a) > 2 AND REDUCE(0, b, (c, d) => c+d) < 20), (X,Y) => LPAD(x, REDUCE(2, Y, (s, k) => ABS(ABS(k)-s)), 'a'), (X,Y) => REDUCE(0, ARRAY_UNION(Y, TRANSFORM(Y, z => z*3)), (e, f) => e+f)) AS OUTPUT FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `OUTPUT` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `MAPPING` MAP>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_complex_lambda/7.0.0_1614300463493/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/plan.json new file mode 100644 index 000000000000..c05be4c17561 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/plan.json @@ -0,0 +1,145 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE MAP>) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` MAP>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n TRANSFORM(TRANSFORM(TEST.VALUE, (X, Y) => X, (X, Y) => FILTER(Y, (Z) => (Z < 5))), (X, Y) => UCASE(X), (K, V) => ARRAY_MAX(V)) FILTERED_TRANSFORMED\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `FILTERED_TRANSFORMED` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` MAP>" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TRANSFORM(TRANSFORM(VALUE, (X, Y) => X, (X, Y) => FILTER(Y, (Z) => (Z < 5))), (X, Y) => UCASE(X), (K, V) => ARRAY_MAX(V)) AS FILTERED_TRANSFORMED" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/spec.json new file mode 100644 index 000000000000..f0030dda9df6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/spec.json @@ -0,0 +1,227 @@ +{ + "version" : "7.0.0", + "timestamp" : 1614300463439, + "path" : "query-validation-tests/complex-lambda.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` MAP>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `FILTERED_TRANSFORMED` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "transform a map with array values", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : { + "a" : [ 2, null, 5, 4 ], + "b" : [ -1, -2 ] + } + } + }, { + "topic" : "test_topic", + "key" : 1, + "value" : { + "value" : { + "c" : [ null, null, -1 ], + "t" : [ 3, 1 ] + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "value" : { + "d" : [ 4 ], + "q" : [ 0, 0 ] + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "FILTERED_TRANSFORMED" : { + "A" : 4, + "B" : -1 + } + } + }, { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "FILTERED_TRANSFORMED" : { + "C" : -1, + "T" : 3 + } + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "FILTERED_TRANSFORMED" : { + "D" : 4, + "Q" : 0 + } + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "record", + "name" : "KsqlDataSourceSchema_VALUE", + "fields" : [ { + "name" : "key", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "value", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", "int" ] + } ], + "default" : null + } ], + "connect.internal.type" : "MapEntry" + }, + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_VALUE" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE MAP>) WITH (kafka_topic='test_topic', value_format='AVRO');", "CREATE STREAM OUTPUT as SELECT ID, TRANSFORM(TRANSFORM(VALUE, (x,y) => x, (x,y) => FIlTER(y, z => z < 5)), (x,y) => UCASE(x) , (k,v) => ARRAY_MAX(v)) as FILTERED_TRANSFORMED from TEST emit changes;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `FILTERED_TRANSFORMED` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` MAP>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "record", + "name" : "KsqlDataSourceSchema_VALUE", + "fields" : [ { + "name" : "key", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "value", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", "int" ] + } ], + "default" : null + } ], + "connect.internal.type" : "MapEntry" + }, + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_VALUE" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "FILTERED_TRANSFORMED", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "record", + "name" : "KsqlDataSourceSchema_FILTERED_TRANSFORMED", + "fields" : [ { + "name" : "key", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "value", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.internal.type" : "MapEntry" + } + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/complex-lambda_-_transform_a_map_with_array_values/7.0.0_1614300463439/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/complex-lambda.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/complex-lambda.json new file mode 100644 index 000000000000..ab21cd7b7be1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/complex-lambda.json @@ -0,0 +1,43 @@ +{ + "comments": [ + "Tests covering the use of advanced lambda functions." + ], + "tests": [ + { + "name": "transform a map with array values", + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE MAP>) WITH (kafka_topic='test_topic', value_format='AVRO');", + "CREATE STREAM OUTPUT as SELECT ID, TRANSFORM(TRANSFORM(VALUE, (x,y) => x, (x,y) => FIlTER(y, z => z < 5)), (x,y) => UCASE(x) , (k,v) => ARRAY_MAX(v)) as FILTERED_TRANSFORMED from TEST emit changes;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0,"value": {"value": {"a": [2,null,5,4], "b": [-1,-2]}}}, + {"topic": "test_topic", "key": 1,"value": {"value": {"c": [null,null,-1], "t": [3, 1]}}}, + {"topic": "test_topic", "key": 2,"value": {"value": {"d": [4], "q": [0, 0]}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"FILTERED_TRANSFORMED":{"A": 4, "B": -1}}}, + {"topic": "OUTPUT", "key": 1, "value": {"FILTERED_TRANSFORMED":{"C": -1, "T": 3}}}, + {"topic": "OUTPUT", "key": 2, "value": {"FILTERED_TRANSFORMED":{"D": 4, "Q": 0}}} + ] + }, + { + "name": "complex lambda", + "statements": [ + "CREATE STREAM test (ID STRING KEY, MAPPING MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, TRANSFORM(FILTER(MAPPING, (a, b) => LEN(a) > 2 AND REDUCE(0, b, (c, d) => c+d) < 20), (X,Y) => LPAD(x, REDUCE(2, Y, (s, k) => ABS(ABS(k)-s)), 'a'), (X,Y) => REDUCE(0, ARRAY_UNION(Y, TRANSFORM(Y, z => z*3)), (e, f) => e+f)) AS OUTPUT FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "one", "value": {"MAPPING": {"a": [2,4,5], "bcd": [-5,7]}}}, + {"topic": "test_topic", "key": "two", "value": {"MAPPING": {"hello": [200,4,5], "hey": [14, -3, -15, 3], "wow": [2, 3, 4]}}}, + {"topic": "test_topic", "key": "three", "value": {"MAPPING": {"a": null, "bcdefg": [-15,72]}}}, + {"topic": "test_topic", "key": "four", "value": {"MAPPING": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "one", "value": {"OUTPUT":{"abcd": 8}}}, + {"topic": "OUTPUT", "key": "two", "value": {"OUTPUT":{"hey": -4, "w": 36}}}, + {"topic": "OUTPUT", "key": "three", "value": {"OUTPUT": {}}}, + {"topic": "OUTPUT", "key": "four", "value": {"OUTPUT":null}} + ] + } + ] +} \ No newline at end of file