Skip to content

Commit

Permalink
feat: add functions as UDF input types (#6955)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Feb 9, 2021
1 parent c0cccaa commit 1b39ab5
Show file tree
Hide file tree
Showing 17 changed files with 683 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.types;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;

public final class LambdaType extends ObjectType {

private final ImmutableList<ParamType> inputTypes;
private final ParamType returnType;

private LambdaType(
final List<ParamType> inputTypes,
final ParamType returnType
) {
this.inputTypes = ImmutableList.copyOf(
Objects.requireNonNull(inputTypes, "inputTypes"));
this.returnType = Objects.requireNonNull(returnType, "returnType");
}

public static LambdaType of(
final List<ParamType> inputTypes,
final ParamType returnType
) {
return new LambdaType(inputTypes, returnType);
}

public List<ParamType> inputTypes() {
return inputTypes;
}

public ParamType returnType() {
return returnType;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LambdaType lambdaType = (LambdaType) o;
return Objects.equals(inputTypes, lambdaType.inputTypes)
&& Objects.equals(returnType, lambdaType.returnType);
}

@Override
public int hashCode() {
return Objects.hash(inputTypes, returnType);
}

@Override
public String toString() {
return "LAMBDA<" + inputTypes + ", " + returnType + ">";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public final class AppInfo {
final Properties props = new Properties();
try (InputStream resourceAsStream = AppInfo.class.getResourceAsStream(
"/git.properties")) {
if (resourceAsStream != null) {
props.load(resourceAsStream);
}
props.load(resourceAsStream);
}
commitId = props.getProperty("git.commit.id", commitId).trim();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* @param <C> A context type to be passed through to the plugin.
*/
public final class ExpressionTreeRewriter<C> {

public static final class Context<C> {
private final C context;
private final ExpressionVisitor<Expression, C> rewriter;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.execution.codegen.helpers.TriFunction;
import io.confluent.ksql.execution.function.UdfUtil;
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.name.FunctionName;
Expand All @@ -34,6 +35,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;

class UdafTypes {
Expand All @@ -53,6 +56,9 @@ class UdafTypes {
.add(List.class)
.add(Map.class)
.add(Timestamp.class)
.add(Function.class)
.add(BiFunction.class)
.add(TriFunction.class)
.build();

private final Type inputType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Transform an array with a function
*/
@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static.
@UdfDescription(
name = "array_transform",
category = FunctionCategory.ARRAY,
description = "Apply a function to each element in an array. "
+ "The transformed array is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class ArrayTransform {

@Udf
public <T, R> List<R> arrayTransform(
@UdfParameter(description = "The array") final List<T> array,
@UdfParameter(description = "The lambda function") final Function<T, R> function
) {
if (array == null) {
return null;
}
return array.stream().map(item -> {
if (item == null) {
return null;
}
return function.apply(item);
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.map;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Transform a map's key and values using two lambda functions
*/
@UdfDescription(
name = "map_transform",
category = FunctionCategory.MAP,
description = "Apply one function to each key and "
+ "one function to each value of a map. "
+ "The two arguments for each function are in order: key, value. "
+ "The first function provided will be applied to each key and the "
+ "second one applied to each value. "
+ "The transformed map is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class MapTransform {

@Udf
public <K,V,R,T> Map<R,T> mapTransform(
@UdfParameter(description = "The map") final Map<K, V> map,
@UdfParameter(description = "The key lambda function") final BiFunction<K, V, R> biFunction1,
@UdfParameter(description = "The value lambda function") final BiFunction<K, V, T> biFunction2
) {
if (map == null) {
return null;
}

return map.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> biFunction1.apply(entry.getKey(), entry.getValue()),
entry -> biFunction2.apply(entry.getKey(), entry.getValue())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.map;

import io.confluent.ksql.execution.codegen.helpers.TriFunction;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Map;
import java.util.Map.Entry;

/**
* Reduce a map using an initial state and function
*/
@UdfDescription(
name = "map_reduce",
category = FunctionCategory.MAP,
description = "Reduce the input map down to a single value "
+ "using an initial state and a function. "
+ "The initial state (s) is passed into the scope of the function. "
+ "Each invocation returns a new value for s, "
+ "which the next invocation will receive. "
+ "The final value for s is returned."
+ "The three arguments for the function are in order: key, value, state.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class ReduceMap {

@Udf
public <K,V,S> S reduceMap(
@UdfParameter(description = "The map") final Map<K, V> map,
@UdfParameter(description = "The initial state") final S initialState,
@UdfParameter(description = "The reduce function") final TriFunction<K, V, S, S> triFunction
) {
if (map == null) {
return null;
}

S state = initialState;
for (Entry<K, V> entry : map.entrySet()) {
state = triFunction.apply(entry.getKey(), entry.getValue(), state);
}
return state;
}
}
Loading

0 comments on commit 1b39ab5

Please sign in to comment.