Skip to content

Commit

Permalink
fix: COLLECT_LIST can now be applied to tables (#3104)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jul 23, 2019
1 parent cd87cb8 commit c239785
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.Lists;
import io.confluent.ksql.function.udaf.TableUdaf;
import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.List;
Expand Down Expand Up @@ -69,27 +68,27 @@ public List<T> undo(final T valueToUndo, final List<T> aggregateValue) {
}

@UdafFactory(description = "collect values of a Bigint field into a single Array")
public static Udaf<Long, List<Long>> createCollectListLong() {
public static TableUdaf<Long, List<Long>> createCollectListLong() {
return listCollector();
}

@UdafFactory(description = "collect values of an Integer field into a single Array")
public static Udaf<Integer, List<Integer>> createCollectListInt() {
public static TableUdaf<Integer, List<Integer>> createCollectListInt() {
return listCollector();
}

@UdafFactory(description = "collect values of a Double field into a single Array")
public static Udaf<Double, List<Double>> createCollectListDouble() {
public static TableUdaf<Double, List<Double>> createCollectListDouble() {
return listCollector();
}

@UdafFactory(description = "collect values of a String/Varchar field into a single Array")
public static Udaf<String, List<String>> createCollectListString() {
public static TableUdaf<String, List<String>> createCollectListString() {
return listCollector();
}

@UdafFactory(description = "collect values of a Boolean field into a single Array")
public static Udaf<Boolean, List<Boolean>> createCollectListBool() {
public static TableUdaf<Boolean, List<Boolean>> createCollectListBool() {
return listCollector();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import io.confluent.ksql.function.udaf.Udaf;

import io.confluent.ksql.function.udaf.TableUdaf;
import java.util.List;
import org.junit.Test;

public class CollectListUdafTest {

@Test
public void shouldCollectInts() {
final Udaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
final TableUdaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
final Integer[] values = new Integer[] {3, 4, 5, 3};
List<Integer> runningList = udaf.initialize();
for (final Integer i : values) {
Expand All @@ -39,7 +40,7 @@ public void shouldCollectInts() {

@Test
public void shouldMergeIntLists() {
final Udaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
final TableUdaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();

List<Integer> lhs = udaf.initialize();
final Integer[] lhsValues = new Integer[] {1, 2, null, 3};
Expand All @@ -61,7 +62,7 @@ public void shouldMergeIntLists() {

@Test
public void shouldRespectSizeLimit() {
final Udaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
final TableUdaf<Integer, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
List<Integer> runningList = udaf.initialize();
for (int i = 1; i < 2500; i++) {
runningList = udaf.aggregate(i, runningList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,117 @@
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true,false]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true,false,true]}, "timestamp": 0}
]
},
{
"name": "collect_list int table",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');",
"CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 0}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 100}, "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [0]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [100]}, "timestamp": 0}
]
},
{
"name": "collect_list long table",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 2147483648}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 100}, "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [2147483648]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [100]}, "timestamp": 0}
]
},
{
"name": "collect_list double table",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 5.4}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": 100.1}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 500.9}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": 300.8}, "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [5.4]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": [100.1]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [500.9]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": [300.8]}, "timestamp": 0}
]
},
{
"name": "collect_list string table",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE TABLE TEST (ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": "foo"}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "baz"}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "VALUE": "bar"}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "baz"}, "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": {"ID": 100, "VALUE": "foo"}, "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": ["foo"]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["baz"]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID": 0, "COLLECTED": ["bar"]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["baz"]}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": []}, "timestamp": 0},
{"topic": "S2", "key": 100, "value": {"ID": 100, "COLLECTED": ["foo"]}, "timestamp": 0}
]
},
{
"name": "collect_list bool map table",
"format": ["JSON"],
"statements": [
"CREATE TABLE TEST (ID bigint, NAME varchar, VALUE map<varchar, boolean>) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE S2 as SELECT id, collect_list(value['key1']) AS collected FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":true, "key2":false}}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":false, "key2":true}}, "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": {"id": 0, "name": "zero", "value": {"key1":true, "key2":true}}, "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[false]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[]}, "timestamp": 0},
{"topic": "S2", "key": 0, "value": {"ID":0,"COLLECTED":[true]}, "timestamp": 0}
]
}
]
}

0 comments on commit c239785

Please sign in to comment.