Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5202] register UDF/UDAF with ServiceLoader #6580

Merged
merged 1 commit into from
Oct 8, 2018

Conversation

mingmxu
Copy link

@mingmxu mingmxu commented Oct 5, 2018

add @AutoService(UdfUdafRegister.class) to register all UDF/UDAF in one call,

  1. avoid multiple registerUdf()/registerUdaf() methods in SqlTransform;
  2. enable UDF/UDAF load in BeamSqlCli;

R: @akedin @apilloud @amaliujia @xumingming

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

List<UdafDefinition> newUdafs = new ArrayList<>();

for (UdfUdafRegister ins :
Lists.newArrayList(ServiceLoader.<UdfUdafRegister>load(UdfUdafRegister.class))) {
Copy link
Contributor

@amaliujia amaliujia Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServiceLoader implements Iterable interface so you don't need to wrap it into a arraylist to for each.


/** Auto register for test. */
@AutoService(UdfUdafRegister.class)
public static class UdfUdafRegisterTest implements UdfUdafRegister {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why UdfUdafRegisterTest is a static class without a static variable/function? Why to make it static?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static nested classes have nothing to do with static members, it's about access to the outer class' non-static variables (obviously): https://docs.oracle.com/javase/tutorial/java/javaOO/nested.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static in this context means that it doesn't reference the the parent class. If it wasn't static it would only be accessible on an instantiation of BeamSqlDslUdfUdafTest, which means the service loader couldn't find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member

@apilloud apilloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about doing something like so we can support UDFs in our JDBC based SQL shell and this gets us much closer. I still need to make some internal refactors and hook this up to BeamCalciteSchemaFactory once it goes in.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this to meta/provider/UdfUdafProvider.java

@Override
public Map<String, Class<? extends BeamSqlUdf>> getEvalUdfs() {
return new HashMap<String, Class<? extends BeamSqlUdf>>(
ImmutableMap.of("cubic", CubicInteger.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These udfs are registered as long as the test jar is in the classpath. Can you make the name of these test functions a little more unique so we don't have collisions?

/** Register for UDF and UDAF. */
public interface UdfUdafRegister {
default Map<String, Class<? extends BeamSqlUdf>> getEvalUdfs() {
return new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use Collections.emptyMap() for these.

/** Load all UDF/UDAF from {@link UdfUdafRegister}. */
public void autoRegisterUdfUdaf() {
for (UdfUdafRegister ins :
Lists.newArrayList(ServiceLoader.<UdfUdafRegister>load(UdfUdafRegister.class))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Lists.newArrayList is unneeded.

@@ -151,6 +156,35 @@ public static SqlTransform query(String queryString) {
.build();
}

/** Load all UDF/UDAF from {@link UdfUdafRegister}. */
public SqlTransform autoRegisterUdfUdaf() {
List<UdfDefinition> newUdfs = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of copying everything, could you just set a bool, then call env.autoRegisterUdfUdaf() in registerFunctions?


/** Auto register for test. */
@AutoService(UdfUdafRegister.class)
public static class UdfUdafRegisterTest implements UdfUdafRegister {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static in this context means that it doesn't reference the the parent class. If it wasn't static it would only be accessible on an instantiation of BeamSqlDslUdfUdafTest, which means the service loader couldn't find it.

Copy link
Contributor

@akedin akedin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice feature! Few comments:


/** Auto register for test. */
@AutoService(UdfUdafRegister.class)
public static class UdfUdafRegisterTest implements UdfUdafRegister {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static nested classes have nothing to do with static members, it's about access to the outer class' non-static variables (obviously): https://docs.oracle.com/javase/tutorial/java/javaOO/nested.html


/** Register for UDF and UDAF. */
public interface UdfUdafRegister {
default Map<String, Class<? extends BeamSqlUdf>> getEvalUdfs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By reading this it's unclear what are eval udfs vs serialize udfs, add comments. And I would probably just call these methods getBeamSqlUdfs() and getSerializableFunctionUdfs() to avoid inventing another name for these things.

Another approach that I can think of (don't know if it's better or not) to make it a bit more extensible is to use TypeDescriptor-style type specification:

udfs = ImmutableMap.builder()
   .put(BeamSqlUdf.class, new HashMap<String, BeamSqlUdf>())
   .put(SerializableFunction.class, new HashMap<String, SerializableFunction<?, ?>>)
   .build();

public <T> Map<String, T> getUdf(TypeDescriptor<T> udfTypeDescriptor) {
   udfType = // get udf class from udfTypeDescriptor
   return udfs.get(udfType);
}

I imagine the usage will be like: register.getUdfs(new TypeDescriptor<BeamSqlUdf>> {}).

And I wonder wether it will be useful to have methods like register.getUdf(String name, TypeDescriptor udfType);

import org.apache.beam.sdk.transforms.SerializableFunction;

/** Register for UDF and UDAF. */
public interface UdfUdafRegister {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we call these either Registrar or Registry (pipeline options registrar, coder registry)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the same as TableProvider but for Udf, so what about UdfProivder? Or should we rename TableProvider to TableRegistry?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UdfUdafProvider sounds good to me.

}

default Map<String, SerializableFunction<?, ?>> getSerializeUdfs() {
return new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should return an immutable empty map in this case (Collections.emptyMap())

@amaliujia
Copy link
Contributor

A further thought is, as a following step, we could allow user set jar path to point to user defined functions. Both SQL shell and SqlTransform can benefit from it.

@mingmxu mingmxu force-pushed the BEAM-5202 branch 2 times, most recently from d1b1fb9 to aa37bb1 Compare October 5, 2018 18:46
@mingmxu
Copy link
Author

mingmxu commented Oct 5, 2018

Thanks @ALL, PLNK

Updates:

  1. move to meta/provider/UdfUdafProvider;
  2. keep AutoService code in BeamSqlEnv only;

@amaliujia
Copy link
Contributor

LGTM

Copy link
Contributor

@akedin akedin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@apilloud apilloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks! LGTM


@Override
public Map<String, Class<? extends BeamSqlUdf>> getBeamSqlUdfs() {
return new HashMap<String, Class<? extends BeamSqlUdf>>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this the first time around. The new HashMap is unnecessary here and below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me fix it and will merge after CI pass

move to `meta/provider/UdfUdafProvider` and remove duplicated AutoService code.

fixup

fix
@mingmxu mingmxu merged commit 8a88ffd into apache:master Oct 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants