diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index 75bc395f27f0..58d07acdb5d9 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -259,6 +259,52 @@ MAP(key VARCHAR := value, ...) Construct a map from specific key-value tuples. +### `MAP_KEYS` + +```sql +MAP_KEYS(a_map) +``` + +Returns an array that contains all of the keys from the specified map. + +Returns NULL if the input map is NULL. + +Example: +```sql +map_keys( map('apple' := 10, 'banana' := 20) ) => ['apple', 'banana'] +``` + +### `MAP_VALUES` + +```sql +MAP_VALUES(a_map) +``` + +Returns an array that contains all of the values from the specified map. + +Returns NULL if the input map is NULL. + +Example: +```sql +map_values( map('apple' := 10, 'banana' := 20) ) => [10, 20] +``` + +### `MAP_UNION` + +```sql +MAP_UNION(map1, map2) +``` + +Returns a new map containing the union of all entries from both input maps. If a key is present in both input maps, the corresponding value from _map2_ is returned. + +Returns NULL if all of the input maps are NULL. + +Example: +```sql +map_union( map('apple' := 10, 'banana' := 20), map('cherry' := 99) ) => ['apple': 10, 'banana': 20, 'cherry': 99] + +map_union( map('apple' := 10, 'banana' := 20), map('apple' := 50) ) => ['apple': 50, 'banana': 20] +``` ### `SLICE` diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapKeys.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapKeys.java new file mode 100644 index 000000000000..535c30911016 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapKeys.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import com.google.common.collect.Lists; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import java.util.List; +import java.util.Map; + +@UdfDescription( + name = "map_keys", + description = "Returns an array of all the keys from the specified map, " + + "or NULL if the input map is NULL.") +public class MapKeys { + + @Udf + public List mapKeys(final Map input) { + if (input == null) { + return null; + } + return Lists.newArrayList(input.keySet()); + } + +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapUnion.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapUnion.java new file mode 100644 index 000000000000..c9c622bd1feb --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapUnion.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@UdfDescription( + name = "map_union", + description = "Returns a new map containing the union of all entries from both input maps. " + + "If a key is present in both input maps then the value from map2 is the one which " + + "appears in the result. Returns NULL if all of the input maps are NULL.") +public class MapUnion { + + @Udf + public Map union( + @UdfParameter(description = "first map to union") final Map map1, + @UdfParameter(description = "second map to union") final Map map2) { + + final List> nonNullInputs = + Stream.of(map1, map2) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (nonNullInputs.size() == 0) { + return null; + } + + final Map output = new HashMap<>(); + nonNullInputs.stream() + .forEach(output::putAll); + return output; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapValues.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapValues.java new file mode 100644 index 000000000000..b3fc82064dd3 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapValues.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import com.google.common.collect.Lists; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import java.util.List; +import java.util.Map; + +@UdfDescription( + name = "map_values", + description = "Returns an array of all the values from the specified map, " + + "or NULL if the input map is NULL.") +public class MapValues { + + @Udf + public List mapValues(final Map input) { + if (input == null) { + return null; + } + return Lists.newArrayList(input.values()); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapKeysTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapKeysTest.java new file mode 100644 index 000000000000..36728e73a6bc --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapKeysTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.Maps; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class MapKeysTest { + + private MapKeys udf; + + @Before + public void setUp() { + udf = new MapKeys(); + } + + @Test + public void shouldGetKeys() { + final Map input = new HashMap<>(); + input.put("foo", "spam"); + input.put("bar", "baloney"); + assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar")); + } + + @Test + public void shouldHandleComplexValueTypes() { + final Map>> input = Maps.newHashMap(); + + final Map> entry1 = Maps.newHashMap(); + entry1.put("apple", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); + entry1.put("banana", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); + input.put("foo", entry1); + + final Map> entry2 = Maps.newHashMap(); + entry2.put("cherry", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); + entry2.put("date", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); + input.put("bar", entry2); + + assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar")); + } + + @Test + public void shouldReturnNullForNullInput() { + List result = udf.mapKeys((Map) null); + assertThat(result, is(nullValue())); + } + + @Test + public void shouldReturnNullsFromMapWithNulls() { + final Map input = Maps.newHashMap(); + input.put("foo", 1); + input.put(null, null); + input.put("bar", null); + List result = udf.mapKeys(input); + assertThat(result, containsInAnyOrder(null, "foo", "bar")); + } + + @Test + public void shouldReturnEmptyListFromEmptyMap() { + final Map input = Maps.newHashMap(); + assertThat(udf.mapKeys(input), empty()); + } + +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapUnionTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapUnionTest.java new file mode 100644 index 000000000000..2f23a09e880a --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapUnionTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.Maps; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class MapUnionTest { + + private MapUnion udf; + + @Before + public void setUp() { + udf = new MapUnion(); + } + + @Test + public void shouldUnionNonEmptyMaps() { + final Map input1 = Maps.newHashMap(); + input1.put("foo", "spam"); + input1.put("bar", "baloney"); + + final Map input2 = Maps.newHashMap(); + input2.put("one", "apple"); + input2.put("two", "banana"); + input2.put("three", "cherry"); + + final Map result = udf.union(input1, input2); + assertThat(result.size(), is(5)); + assertThat(result.get("foo"), is("spam")); + assertThat(result.get("two"), is("banana")); + } + + @Test + public void shouldReturnNullForNullInput() { + Map result = udf.union((Map) null, (Map) null); + assertThat(result, is(nullValue())); + } + + @Test + public void shouldUnionWithNullMap() { + final Map input1 = Maps.newHashMap(); + input1.put("foo", 1); + input1.put("bar", 2); + + final Map result = udf.union(input1, null); + assertThat(result.size(), is(2)); + assertThat(result.get("foo"), is(1)); + assertThat(result.keySet(), containsInAnyOrder("foo", "bar")); + } + + @Test + public void shouldHandleComplexValueTypes() { + final Map> input1 = Maps.newHashMap(); + input1.put("apple", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); + input1.put("banana", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); + + final Map> input2 = Maps.newHashMap(); + input2.put("foo", Arrays.asList(Double.valueOf(123.456))); + + final Map> result = udf.union(input1, input2); + assertThat(result.size(), is(3)); + assertThat(result.get("banana"), contains(Double.valueOf(43.21), Double.valueOf(87.65))); + assertThat(result.keySet(), containsInAnyOrder("foo", "banana", "apple")); + } + + @Test + public void shouldRetainLatestValueForDuplicateKey() { + final Map input1 = Maps.newHashMap(); + input1.put("foo", "spam"); + input1.put("bar", "baloney"); + + final Map input2 = Maps.newHashMap(); + input2.put("foo", "apple"); + input2.put("two", "banana"); + input2.put("three", "cherry"); + + final Map result = udf.union(input1, input2); + assertThat(result.size(), is(4)); + assertThat(result.get("foo"), is("apple")); + } + + @Test + public void shouldUnionMapWithNulls() { + final Map input1 = Maps.newHashMap(); + input1.put("one", "apple"); + input1.put("two", "banana"); + input1.put("three", "cherry"); + + final Map input2 = Maps.newHashMap(); + input2.put("foo", "bar"); + input2.put(null, null); + input2.put("baz", null); + + final Map result = udf.union(input1, input2); + assertThat(result.size(), is(6)); + assertThat(result.get("two"), is("banana")); + assertThat(result.get("foo"), is("bar")); + assertThat(result.get("baz"), is(nullValue())); + assertThat(result.keySet(), containsInAnyOrder("one", "two", "three", null, "foo", "baz")); + } + + @Test + public void shouldReturnEmptyMapFromEmptyMaps() { + final Map input1 = Maps.newHashMap(); + final Map input2 = Maps.newHashMap(); + assertThat(udf.union(input1, input2), equalTo(Collections.EMPTY_MAP)); + } + +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapValuesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapValuesTest.java new file mode 100644 index 000000000000..ec35e0427b00 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapValuesTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the + * License. + */ + +package io.confluent.ksql.function.udf.map; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.Maps; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class MapValuesTest { + + private MapValues udf; + + @Before + public void setUp() { + udf = new MapValues(); + } + + @Test + public void shouldGetKeys() { + final Map input = new HashMap<>(); + input.put("foo", "spam"); + input.put("bar", "baloney"); + assertThat(udf.mapValues(input), containsInAnyOrder("spam", "baloney")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHandleComplexValueTypes() { + final Map>> input = Maps.newHashMap(); + + final Map> entry1 = Maps.newHashMap(); + entry1.put("apple", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); + entry1.put("banana", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); + input.put("foo", entry1); + + final Map> entry2 = Maps.newHashMap(); + entry2.put("cherry", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); + entry2.put("date", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); + input.put("bar", entry2); + + List>> values = udf.mapValues(input); + assertThat(values, containsInAnyOrder(entry1, entry2)); + } + + @Test + public void shouldReturnNullForNullInput() { + List result = udf.mapValues((Map) null); + assertThat(result, is(nullValue())); + } + + @Test + public void shouldReturnNullsFromMapWithNulls() { + final Map input = Maps.newHashMap(); + input.put("foo", 1); + input.put(null, null); + input.put("bar", null); + List result = udf.mapValues(input); + assertThat(result, containsInAnyOrder(1, null, null)); + } + + @Test + public void shouldReturnEmptyListFromEmptyMap() { + final Map input = Maps.newHashMap(); + assertThat(udf.mapValues(input), empty()); + } + +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/plan.json new file mode 100644 index 000000000000..c0942e26001e --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, A_MAP MAP) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `A_MAP` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_SORT(MAP_KEYS(INPUT.A_MAP)) KEYS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `KEYS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `A_MAP` MAP" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_SORT(MAP_KEYS(A_MAP)) AS KEYS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/spec.json new file mode 100644 index 000000000000..2f478869bca1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/spec.json @@ -0,0 +1,106 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591170785049, + "path" : "query-validation-tests\\map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "map_keys", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "A_MAP" : { + "foo" : 10, + "bar" : 20, + "baz" : 30 + } + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "A_MAP" : { + "foo" : 10, + "bar" : null + } + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "A_MAP" : { } + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "A_MAP" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "KEYS" : [ "bar", "baz", "foo" ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "KEYS" : [ "bar", "foo" ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "KEYS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "KEYS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id STRING KEY, a_map MAP) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_keys(a_map)) as keys FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys/6.0.0_1591170785049/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/plan.json new file mode 100644 index 000000000000..f6183ac97de2 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, A_MAP MAP>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `A_MAP` MAP>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_SORT(MAP_KEYS(INPUT.A_MAP)) KEYS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `KEYS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `A_MAP` MAP>" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_SORT(MAP_KEYS(A_MAP)) AS KEYS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/spec.json new file mode 100644 index 000000000000..8af57be8cd73 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/spec.json @@ -0,0 +1,106 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591170785106, + "path" : "query-validation-tests\\map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT>> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "map_keys with non-primitive values", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "A_MAP" : { + "foo" : [ 1, 2, 3 ], + "bar" : [ 10, 20, 30 ], + "baz" : [ 100, 200, 300 ] + } + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "A_MAP" : { + "foo" : [ 1, 2, 3 ], + "bar" : null + } + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "A_MAP" : { } + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "A_MAP" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "KEYS" : [ "bar", "baz", "foo" ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "KEYS" : [ "bar", "foo" ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "KEYS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "KEYS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id STRING KEY, a_map MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_keys(a_map)) as keys FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_keys_with_non-primitive_values/6.0.0_1591170785106/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/plan.json new file mode 100644 index 000000000000..01b7c3abbea3 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, MAP_1 MAP, MAP_2 MAP) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `MAP_1` MAP, `MAP_2` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n MAP_UNION(INPUT.MAP_1, INPUT.MAP_2) COMBINED\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `COMBINED` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `MAP_1` MAP, `MAP_2` MAP" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "MAP_UNION(MAP_1, MAP_2) AS COMBINED" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/spec.json new file mode 100644 index 000000000000..3411ec6a29c3 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/spec.json @@ -0,0 +1,146 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591170785216, + "path" : "query-validation-tests\\map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT, MAP_2 MAP> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "map_union", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "MAP_1" : { + "foo" : 10, + "bar" : 20, + "baz" : 30 + }, + "MAP_2" : { + "foo" : 99, + "apple" : -1 + } + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "MAP_1" : { + "foo" : 10, + "bar" : 20 + }, + "MAP_2" : { + "foo" : null + } + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "MAP_1" : { + "foo" : 10, + "bar" : 20 + }, + "MAP_2" : { } + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "MAP_1" : { }, + "MAP_2" : { } + } + }, { + "topic" : "test_topic", + "key" : "r5", + "value" : { + "MAP_1" : null, + "MAP_2" : { + "foo" : null + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "COMBINED" : { + "foo" : 99, + "bar" : 20, + "baz" : 30, + "apple" : -1 + } + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "COMBINED" : { + "foo" : null, + "bar" : 20 + } + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "COMBINED" : { + "foo" : 10, + "bar" : 20 + } + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "COMBINED" : { } + } + }, { + "topic" : "OUTPUT", + "key" : "r5", + "value" : { + "COMBINED" : { + "foo" : null + } + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id STRING KEY, map_1 MAP, map_2 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, map_union(map_1, map_2) as combined FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_union/6.0.0_1591170785216/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/plan.json new file mode 100644 index 000000000000..55fedd10bbeb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, A_MAP MAP) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `A_MAP` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_SORT(MAP_VALUES(INPUT.A_MAP)) VALS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `VALS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `A_MAP` MAP" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ARRAY_SORT(MAP_VALUES(A_MAP)) AS VALS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/spec.json new file mode 100644 index 000000000000..d3ca7020ab87 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/spec.json @@ -0,0 +1,106 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591170785149, + "path" : "query-validation-tests\\map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "map_values", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "A_MAP" : { + "foo" : 10, + "bar" : 20, + "baz" : 30 + } + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "A_MAP" : { + "foo" : 10, + "bar" : null + } + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "A_MAP" : { } + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "A_MAP" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "VALS" : [ 10, 20, 30 ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "VALS" : [ 10, null ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "VALS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "VALS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id STRING KEY, a_map MAP) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_values(a_map)) as vals FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values/6.0.0_1591170785149/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/plan.json new file mode 100644 index 000000000000..d8ff94dda4ad --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, A_MAP MAP>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `A_MAP` MAP>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n MAP_VALUES(INPUT.A_MAP) VALS\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `VALS` ARRAY>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `A_MAP` MAP>" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "MAP_VALUES(A_MAP) AS VALS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/spec.json new file mode 100644 index 000000000000..e49401b2df2e --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/spec.json @@ -0,0 +1,119 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591170785183, + "path" : "query-validation-tests\\map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT>> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT>> NOT NULL" + }, + "testCase" : { + "name" : "map_values with non-primitive values", + "inputs" : [ { + "topic" : "test_topic", + "key" : "r1", + "value" : { + "A_MAP" : { + "foo" : [ 1, 2, 3 ] + } + } + }, { + "topic" : "test_topic", + "key" : "r2", + "value" : { + "A_MAP" : { + "foo" : null, + "bar" : null + } + } + }, { + "topic" : "test_topic", + "key" : "r3", + "value" : { + "A_MAP" : { } + } + }, { + "topic" : "test_topic", + "key" : "r4", + "value" : { + "A_MAP" : null + } + }, { + "topic" : "test_topic", + "key" : "r5", + "value" : { + "A_MAP" : { + "foo" : [ null ], + "bar" : [ null ] + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "r1", + "value" : { + "VALS" : [ [ 1, 2, 3 ] ] + } + }, { + "topic" : "OUTPUT", + "key" : "r2", + "value" : { + "VALS" : [ null, null ] + } + }, { + "topic" : "OUTPUT", + "key" : "r3", + "value" : { + "VALS" : [ ] + } + }, { + "topic" : "OUTPUT", + "key" : "r4", + "value" : { + "VALS" : null + } + }, { + "topic" : "OUTPUT", + "key" : "r5", + "value" : { + "VALS" : [ [ null ], [ null ] ] + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id STRING KEY, a_map MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, map_values(a_map) as vals FROM INPUT;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_map_values_with_non-primitive_values/6.0.0_1591170785183/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json index 187bb41e62fe..9353f581e1e8 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json @@ -28,6 +28,110 @@ "outputs": [ {"topic": "OUTPUT", "value": {"KSQL_COL_0": "London"}} ] + }, + { + "name": "map_keys", + "statements": [ + "CREATE STREAM INPUT (id STRING KEY, a_map MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_keys(a_map)) as keys FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"A_MAP": {"foo": 10, "bar": 20, "baz": 30} } }, + {"topic": "test_topic", "key": "r2", "value": {"A_MAP": {"foo": 10, "bar": null} } }, + {"topic": "test_topic", "key": "r3", "value": {"A_MAP": { } } }, + {"topic": "test_topic", "key": "r4", "value": {"A_MAP": null } } + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"KEYS": ["bar", "baz", "foo"]} }, + {"topic": "OUTPUT", "key": "r2", "value": {"KEYS": ["bar", "foo"]} }, + {"topic": "OUTPUT", "key": "r3", "value": {"KEYS": [] } }, + {"topic": "OUTPUT", "key": "r4", "value": {"KEYS": null } } + ] + }, + { + "name": "map_keys with non-primitive values", + "statements": [ + "CREATE STREAM INPUT (id STRING KEY, a_map MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_keys(a_map)) as keys FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"A_MAP": {"foo": [1, 2, 3], "bar": [10, 20, 30], "baz": [100, 200, 300] } } }, + {"topic": "test_topic", "key": "r2", "value": {"A_MAP": {"foo": [1, 2, 3], "bar": null} } }, + {"topic": "test_topic", "key": "r3", "value": {"A_MAP": { } } }, + {"topic": "test_topic", "key": "r4", "value": {"A_MAP": null } } + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"KEYS": ["bar", "baz", "foo"]} }, + {"topic": "OUTPUT", "key": "r2", "value": {"KEYS": ["bar", "foo"]} }, + {"topic": "OUTPUT", "key": "r3", "value": {"KEYS": [] } }, + {"topic": "OUTPUT", "key": "r4", "value": {"KEYS": null } } + ] + }, + { + "name": "map_values", + "statements": [ + "CREATE STREAM INPUT (id STRING KEY, a_map MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, array_sort(map_values(a_map)) as vals FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"A_MAP": {"foo": 10, "bar": 20, "baz": 30} } }, + {"topic": "test_topic", "key": "r2", "value": {"A_MAP": {"foo": 10, "bar": null} } }, + {"topic": "test_topic", "key": "r3", "value": {"A_MAP": { } } }, + {"topic": "test_topic", "key": "r4", "value": {"A_MAP": null } } + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"VALS": [10, 20, 30]} }, + {"topic": "OUTPUT", "key": "r2", "value": {"VALS": [10, null]} }, + {"topic": "OUTPUT", "key": "r3", "value": {"VALS": [] } }, + {"topic": "OUTPUT", "key": "r4", "value": {"VALS": null } } + ] + }, + { + "name": "map_values with non-primitive values", + "statements": [ + "CREATE STREAM INPUT (id STRING KEY, a_map MAP>) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, map_values(a_map) as vals FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"A_MAP": {"foo": [1, 2, 3] } } }, + {"topic": "test_topic", "key": "r2", "value": {"A_MAP": {"foo": null, "bar": null} } }, + {"topic": "test_topic", "key": "r3", "value": {"A_MAP": { } } }, + {"topic": "test_topic", "key": "r4", "value": {"A_MAP": null } }, + {"topic": "test_topic", "key": "r5", "value": {"A_MAP": {"foo": [null], "bar": [null] } } } + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"VALS": [[1, 2, 3]]} }, + {"topic": "OUTPUT", "key": "r2", "value": {"VALS": [null, null]} }, + {"topic": "OUTPUT", "key": "r3", "value": {"VALS": [] } }, + {"topic": "OUTPUT", "key": "r4", "value": {"VALS": null } }, + {"topic": "OUTPUT", "key": "r5", "value": {"VALS": [[null], [null]]} } + ] + }, + { + "name": "map_union", + "statements": [ + "CREATE STREAM INPUT (id STRING KEY, map_1 MAP, map_2 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, map_union(map_1, map_2) as combined FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "r1", "value": {"MAP_1": {"foo": 10, "bar": 20, "baz": 30}, + "MAP_2": {"foo": 99, "apple": -1} } }, + {"topic": "test_topic", "key": "r2", "value": {"MAP_1": {"foo": 10, "bar": 20}, + "MAP_2": {"foo": null} } }, + {"topic": "test_topic", "key": "r3", "value": {"MAP_1": {"foo": 10, "bar": 20}, + "MAP_2": { } } }, + {"topic": "test_topic", "key": "r4", "value": {"MAP_1": { }, + "MAP_2": { } } }, + {"topic": "test_topic", "key": "r5", "value": {"MAP_1": null, + "MAP_2": {"foo": null} } } + ], + "outputs": [ + {"topic": "OUTPUT", "key": "r1", "value": {"COMBINED": {"foo": 99, "bar": 20, "baz": 30, "apple": -1} } }, + {"topic": "OUTPUT", "key": "r2", "value": {"COMBINED": {"foo": null, "bar":20} } }, + {"topic": "OUTPUT", "key": "r3", "value": {"COMBINED": {"foo": 10, "bar":20} } }, + {"topic": "OUTPUT", "key": "r4", "value": {"COMBINED": { } } }, + {"topic": "OUTPUT", "key": "r5", "value": {"COMBINED": {"foo": null } } } + ] } ] } \ No newline at end of file