Skip to content

Commit

Permalink
feat: new UDFs for working with Maps (#5536)
Browse files Browse the repository at this point in the history
* initial version 3 new map udfs

* make a couple of fields final

* review feedback

* Update docs/developer-guide/ksqldb-reference/scalar-functions.md

Jim's doc feedback

Co-authored-by: Jim Galasyn <jim.galasyn@confluent.io>

* Update docs/developer-guide/ksqldb-reference/scalar-functions.md

Jim's doc feedback

Co-authored-by: Jim Galasyn <jim.galasyn@confluent.io>

* Update docs/developer-guide/ksqldb-reference/scalar-functions.md

Jim's doc feedback

Co-authored-by: Jim Galasyn <jim.galasyn@confluent.io>

Co-authored-by: Jim Galasyn <jim.galasyn@confluent.io>
  • Loading branch information
blueedgenick and JimGalasyn committed Jun 3, 2020
1 parent 00f5083 commit bc9ad2e
Show file tree
Hide file tree
Showing 23 changed files with 1,867 additions and 0 deletions.
46 changes: 46 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,52 @@ MAP(key VARCHAR := value, ...)

Construct a map from specific key-value tuples.

### `MAP_KEYS`

```sql
MAP_KEYS(a_map)
```

Returns an array that contains all of the keys from the specified map.

Returns NULL if the input map is NULL.

Example:
```sql
map_keys( map('apple' := 10, 'banana' := 20) ) => ['apple', 'banana']
```

### `MAP_VALUES`

```sql
MAP_VALUES(a_map)
```

Returns an array that contains all of the values from the specified map.

Returns NULL if the input map is NULL.

Example:
```sql
map_values( map('apple' := 10, 'banana' := 20) ) => [10, 20]
```

### `MAP_UNION`

```sql
MAP_UNION(map1, map2)
```

Returns a new map containing the union of all entries from both input maps. If a key is present in both input maps, the corresponding value from _map2_ is returned.

Returns NULL if all of the input maps are NULL.

Example:
```sql
map_union( map('apple' := 10, 'banana' := 20), map('cherry' := 99) ) => ['apple': 10, 'banana': 20, 'cherry': 99]

map_union( map('apple' := 10, 'banana' := 20), map('apple' := 50) ) => ['apple': 50, 'banana': 20]
```

### `SLICE`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.map;

import com.google.common.collect.Lists;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.util.List;
import java.util.Map;

@UdfDescription(
name = "map_keys",
description = "Returns an array of all the keys from the specified map, "
+ "or NULL if the input map is NULL.")
public class MapKeys {

@Udf
public <T> List<String> mapKeys(final Map<String, T> input) {
if (input == null) {
return null;
}
return Lists.newArrayList(input.keySet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.map;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@UdfDescription(
name = "map_union",
description = "Returns a new map containing the union of all entries from both input maps. "
+ "If a key is present in both input maps then the value from map2 is the one which "
+ "appears in the result. Returns NULL if all of the input maps are NULL.")
public class MapUnion {

@Udf
public <T> Map<String, T> union(
@UdfParameter(description = "first map to union") final Map<String, T> map1,
@UdfParameter(description = "second map to union") final Map<String, T> map2) {

final List<Map<String, T>> nonNullInputs =
Stream.of(map1, map2)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (nonNullInputs.size() == 0) {
return null;
}

final Map<String, T> output = new HashMap<>();
nonNullInputs.stream()
.forEach(output::putAll);
return output;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.map;

import com.google.common.collect.Lists;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.util.List;
import java.util.Map;

@UdfDescription(
name = "map_values",
description = "Returns an array of all the values from the specified map, "
+ "or NULL if the input map is NULL.")
public class MapValues {

@Udf
public <T> List<T> mapValues(final Map<String, T> input) {
if (input == null) {
return null;
}
return Lists.newArrayList(input.values());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.map;

import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.Maps;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;

public class MapKeysTest {

private MapKeys udf;

@Before
public void setUp() {
udf = new MapKeys();
}

@Test
public void shouldGetKeys() {
final Map<String, String> input = new HashMap<>();
input.put("foo", "spam");
input.put("bar", "baloney");
assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar"));
}

@Test
public void shouldHandleComplexValueTypes() {
final Map<String, Map<String, List<Double>>> input = Maps.newHashMap();

final Map<String, List<Double>> entry1 = Maps.newHashMap();
entry1.put("apple", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78)));
entry1.put("banana", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65)));
input.put("foo", entry1);

final Map<String, List<Double>> entry2 = Maps.newHashMap();
entry2.put("cherry", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78)));
entry2.put("date", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65)));
input.put("bar", entry2);

assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar"));
}

@Test
public void shouldReturnNullForNullInput() {
List<String> result = udf.mapKeys((Map<String, Long>) null);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullsFromMapWithNulls() {
final Map<String, Integer> input = Maps.newHashMap();
input.put("foo", 1);
input.put(null, null);
input.put("bar", null);
List<String> result = udf.mapKeys(input);
assertThat(result, containsInAnyOrder(null, "foo", "bar"));
}

@Test
public void shouldReturnEmptyListFromEmptyMap() {
final Map<String, BigDecimal> input = Maps.newHashMap();
assertThat(udf.mapKeys(input), empty());
}

}

0 comments on commit bc9ad2e

Please sign in to comment.