Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Inline Java UDFs #2605

Open
wants to merge 36 commits into
base: master
from

Conversation

@mitch-seymour
Copy link
Contributor

commented Mar 26, 2019

Description

Add support for inline Java UDFs. This evolved from a discussion held in KLIP-2 (pending)

TODO:
  • Fix minor issues in standalone mode
  • Improve KSQL grammar updates (i.e. so keywords like begin or end don't clash with the inline UDF) I ended up using a more unique script delimiter: $$
  • More tests
  • More docs
  • Merge conflicts

Testing done

Unit tests, manual testing. In progress...

# tab 1
$ ./bin/ksql-server-start config/ksql-server.properties

# tab 2
$ ./bin/ksql

Now, in the CLI, try creating a couple of inline UDFs.

/** Greet function **/
ksql>  
    CREATE OR REPLACE FUNCTION GREET(name STRING)
    RETURNS STRING
    LANGUAGE JAVA
    WITH(author='Jane', description='Create a greeting', version='0.1.0')
    AS $$
        return "Hello " + NAME;
    $$;

 Message
------------------
 Function created
------------------

ksql> DESCRIBE FUNCTION GREET ;

Name        : GREET
Author      : 'Jane'
Version     : '0.1.0'
Type        : scalar
Jar         : internal
Variations  :

	Variation   : GREET(VARCHAR)
	Returns     : VARCHAR
	Description : 'Create a greeting'

ksql> DROP FUNCTION GREET ;

 Message
----------------------------
 Function GREET was dropped
---------------------------

ksql> DESCRIBE FUNCTION GREET ;
Can't find any functions with the name 'GREET'
/** More complicated zip function **/
ksql> 
    CREATE OR REPLACE FUNCTION ZIP(a VARCHAR, b VARCHAR)
    RETURNS MAP<VARCHAR, VARCHAR>
    LANGUAGE JAVA
    WITH(author='Bob', description='Create a map from two objects', version='0.1.0')
    AS $$
        import java.util.HashMap;
        import java.util.Map;
        Map<String,String> m =  new HashMap<String,String>();
        m.put(A, B);
        return m;
    $$ ;

Once an inline Java UDF is created, it can be used in a query just like any other UDF. Also, non-inline UDFs cannot be dropped :)

ksql> DROP FUNCTION LCASE ;
Cannot drop function: LCASE is not an inline function

Anyways, feel free to take this for a spin!

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@mitch-seymour mitch-seymour requested review from JimGalasyn and confluentinc/ksql as code owners Mar 26, 2019

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Mar 26, 2019

@confluentinc It looks like @mitch-seymour just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@mitch-seymour mitch-seymour referenced this pull request Mar 26, 2019
0 of 2 tasks complete
@hjafarpour

This comment has been minimized.

Copy link
Member

commented Mar 26, 2019

Thanks @mitch-seymour, I had a quick pass over the PR and the main issue at the moment is that we need to update the grammar in a way that the UDF_SCRIPT can handle any java code including when we have KSQL keywords in the code. Consider the following example which is the same as you have above but I changed the variable name from m to end. This code won't work:

/** More complicated zip function **/
ksql> CREATE OR REPLACE FUNCTION zip(a VARCHAR, b VARCHAR)
RETURNS MAP<VARCHAR, VARCHAR>
LANGUAGE JAVA
AS BEGIN
    import java.util.HashMap; \
    import java.util.Map; \
    Map<String,String> end =  new HashMap<String,String>();\
    end.put(args[0], args[1]);\
    return end;\
END ;
@JimGalasyn
Copy link
Member

left a comment

Thanks for the submission! Just a couple of suggestions for the reference topic.

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented Mar 26, 2019

@hjafarpour that's a good point. I'll revisit the grammar updates and see if I can make it more flexible :)

@JimGalasyn thanks for the suggestions regarding the new docs. I committed those changes :)

@mitch-seymour mitch-seymour force-pushed the mitch-seymour:feature-inline-java-udfs branch from db59131 to 5b2502b Mar 29, 2019

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented Mar 29, 2019

@hjafarpour I spent some time on the grammar updates but had a hard time implementing a version where we could ignore the BEGIN or END KSQL keywords in the inline script body. I got it working in interactive mode, oddly enough, but an issue came up during headless mode where multiple queries are read at once. In this case, the parser didn't understand where a CREATE FUNCTION query stopped and another began.

Anyways, I captured some of that work in this commit if you're interested, but ended up using a script delimiter that is more unique: $$. Please see my updated examples above and the updated syntax reference.

Finally, I'm finding my current workflow for iterating through the new unit tests a little tedious :/ Is there a workflow that doesn't require me to do a multi-project build while iterating on these tests?

schema.getElementType().getProp(AvroData.CONNECT_INTERNAL_TYPE_NAME),
AvroData.MAP_ENTRY_TYPE_NAME)
) {
// TODO: figure out why this test is broken

This comment has been minimized.

Copy link
@mitch-seymour

mitch-seymour Mar 29, 2019

Author Contributor

I had to make this change to get the build working locally. Not sure what I'm doing wrong yet. But this code in the master branch was causing this error when I build locally:

ksql-engine/src/test/java/io/confluent/ksql/EndToEndEngineTestUtil.java:[1075,57] cannot find symbol
[ERROR]   symbol:   variable CONNECT_INTERNAL_TYPE_NAME
[ERROR]   location: class io.confluent.connect.avro.AvroData

This comment has been minimized.

Copy link
@mitch-seymour

mitch-seymour Apr 9, 2019

Author Contributor

This change was reverted. Still not sure what the issue was...

@mitch-seymour mitch-seymour changed the title [WIP] Add Inline Java UDFs Add Inline Java UDFs Apr 9, 2019

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented Apr 9, 2019

@agavra @JimGalasyn @hjafarpour this PR is finally ready for review :) All of the tests are passing, though Jenkins is flagging a redundantmodifier warning that seems like a minor issue. Anyways, sorry this PR is so big. I had no idea how much work was involved or how much time it would take, but I'm pretty happy with where's it at and am ready for feedback.

Notes

  • I ended up not creating a separate KLIP since this evolved out of KLIP-2, and it sounds like it may not have been necessary (even though I initially intended to do so, but have been somewhat strapped for time lately). I can submit a KLIP if it's necessary though.

  • I ended up using special script delimiters, $$, since I found it challenging to use BEGIN and END blocks while also ignoring nested code that included those characters as well. Ignoring nested BEGIN / END characters wouldn't have been too challenging, except for the fact that we also need to ignore nested ;, which is used to end statements in Java code and KSQL expressions. It presented an interesting challenge of trying to ignore two terminating characters END and ;, and the side-effect was KSQL didn't know where one query stopped and another began.

  • Since I ended up using $$ to resolve this issue (users are much less likely to introduce code that prematurely terminates the statement with $$ as the delimiter), I also favored Postgres' CREATE OR REPLACE syntax over CREATE OR ALTER, since Postgres also supports $$.

Anyways, hope this makes sense. I'm looking forward to getting your feedback and hopefully getting this integrated into the codebase if it proves to be useful.

Object value;
try {
ExtensionSecurityManager.INSTANCE.pushInUdf();
value = se.evaluate(args);

This comment has been minimized.

Copy link
@mitch-seymour

mitch-seymour Apr 9, 2019

Author Contributor

actually, this may not be the proper way to prevent a System.exit. Any ideas here?

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

Looking at the code, this seems correct, though if we have to ask these questions then IMO we're structuring the code incorrectly. The security logic should definitely be a shared code path between builtin and inline udfs.

Note: currently, the only supported language is **JAVA**.

If the OR REPLACE clause is present, the statement doesn't fail if the function
already exists. Instead, the existing function will be replaced. Note: if an

This comment has been minimized.

Copy link
@JimGalasyn

JimGalasyn Apr 9, 2019

Member
Suggested change
already exists. Instead, the existing function will be replaced. Note: if an
already exists. Instead, the existing function replaced. If an
The following example creates a simple function. Note: the arguments are
converted to uppercase when passed to the inline script.

Also, when defining an inline UDF in the KSQL CLI, you will need to escape

This comment has been minimized.

Copy link
@JimGalasyn

JimGalasyn Apr 9, 2019

Member
Suggested change
Also, when defining an inline UDF in the KSQL CLI, you will need to escape
Also, when defining an inline UDF in the KSQL CLI, you must escape

**Description**

Drop an inline UDF from the function registry. Note: you cannot drop UDFs that were not

This comment has been minimized.

Copy link
@JimGalasyn

JimGalasyn Apr 9, 2019

Member
Suggested change
Drop an inline UDF from the function registry. Note: you cannot drop UDFs that were not
Drop an inline UDF from the function registry. You can't drop UDFs that weren't
**Description**

Drop an inline UDF from the function registry. Note: you cannot drop UDFs that were not
created via the CREATE [OR REPLACE] FUNCTION query. Furthermore, if an active query

This comment has been minimized.

Copy link
@JimGalasyn

JimGalasyn Apr 9, 2019

Member
Suggested change
created via the CREATE [OR REPLACE] FUNCTION query. Furthermore, if an active query
created by using the CREATE [OR REPLACE] FUNCTION query. Also, if an active query
@JimGalasyn
Copy link
Member

left a comment

LGTM, with a few suggestions.

JimGalasyn and others added some commits Apr 9, 2019

Update docs/developer-guide/syntax-reference.rst
Co-Authored-By: mitch-seymour <mitchseymour@gmail.com>
Update docs/developer-guide/syntax-reference.rst
Co-Authored-By: mitch-seymour <mitchseymour@gmail.com>
Update docs/developer-guide/syntax-reference.rst
Co-Authored-By: mitch-seymour <mitchseymour@gmail.com>
Update docs/developer-guide/syntax-reference.rst
Co-Authored-By: mitch-seymour <mitchseymour@gmail.com>
@agavra

This comment has been minimized.

Copy link
Contributor

commented Apr 11, 2019

I will do a first pass on this tomorrow or Friday 😄

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented Apr 11, 2019

Sounds good @agavra, thanks for taking a look :)

@agavra
Copy link
Contributor

left a comment

Solid work @mitch-seymour! This is my first pass, so I tried to hold back nits (though some snuck through anyway) and didn't look too deeply at unit tests. Overall the approach seems sound to me, most of the comments are behavioral or code-structure related.

Also at this point if you ever want to get on a call with us to discuss things further, let me know and I'll help set that up - both for this PR and to move along #2553.

Create a new inline UDF with the specified arguments and properties.
Currently, the only supported language is Java.

If the OR REPLACE clause is present, the statement doesn't fail if the function

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

I think I side with @hjafarpour on this comment: #2553 (comment) - we should fail a REPLACE if there are any active queries using the UDFs. That way we don't have to worry about the semantics of different queries using the different versions of the same UDF simultaneously.

This would mean that you would first need to drop/terminate the query. Unfortunately, tracking this is non-trivial 😨 - so if it gets too unwieldily perhaps it can wait for another PR (though I do think we should not enable the feature until that functionality is there)

@@ -137,6 +138,25 @@ void addFunction(final KsqlFunction function) {
curr.update(function, order);
}

void addOrReplaceFunction(final KsqlFunction function) {
synchronized (this) {

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

I know that addFunction is only called in a single threaded manner, but partially thread-safe classes make me very nervous 😨 . I'd rather we made everything that modifies state synchronized (including the getFunction and dropFunction functionality) to make sure that we don't run into any issues. Plus this shouldn't be a performance bottleneck so I'm not too worried (it's run once-per-query).

also nit: if you synchronized (this) you may as well just synchronize the whole method

while (it.hasNext()) {
final KsqlFunction ksqlFunction = it.next().getValue();
if (ksqlFunction.getFunctionName().equals(functionName)) {
it.remove();

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

this isn't sufficient as we need to also remove it from the underlying trie so that you won't be able to retrieve a dropped function and that there isn't "ghost nodes" in the trie. Please add a unit test for this as well.

@@ -66,6 +89,10 @@ public boolean isInternal() {
return internal;
}

public boolean isInline() {

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

please add this to equals and hashcode

return arguments;
}

private static String[] elementsToParamNames(final List<TableElement> elements) {

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

nit: prefer List<String> to String[] (ditto for any other usage of a non-primitive array)


@Immutable
public class CreateFunction
extends Statement implements ExecutableDdlStatement {

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

I'm not certain this should classify as DDL, though maybe @hjafarpour can clarify here - I would prefer creating dedicated logic for these commands. This also means custom validators for the KsqlResource (see RequestValidator).

.put(DropFunction.class, createHandler(
StatementExecutor::handleExecutableDdl,
DropFunction.class,
"DROP FUNCTION"))

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

I don't think it makes sense to support DROP FUNCTION in headless mode - all the queries are given up front!

void addOrReplaceFunction(final KsqlFunction function) {
synchronized (this) {
if (allFunctions.containsKey(function.getArguments())) {
allFunctions.remove(function.getArguments());

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

same comment as below, we need to make sure to remove the old function from the underlying Trie

Object value;
try {
ExtensionSecurityManager.INSTANCE.pushInUdf();
value = se.evaluate(args);

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

Looking at the code, this seems correct, though if we have to ask these questions then IMO we're structuring the code incorrectly. The security logic should definitely be a shared code path between builtin and inline udfs.

import org.apache.kafka.connect.data.Schema;

@Immutable
public class CreateFunction

This comment has been minimized.

Copy link
@agavra

agavra Apr 12, 2019

Contributor

Can you please add corresponding methods in SqlFormatter to format this and DropFunction?

@agavra agavra requested a review from confluentinc/ksql Apr 12, 2019

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented Apr 15, 2019

@agavra thanks so much for the feedback! I'll start working my way through the suggestions in the next few days (I'm a bit slammed this week unfortunately). also, I'd love to jump on a call at some point. Maybe sometime next week (or anytime after the 22nd)? thanks again, I know reviewing 1500 lines of code isn't super fun lol.

@agavra

This comment has been minimized.

Copy link
Contributor

commented May 9, 2019

@mitch-seymour I had totally lost track of this pr - I apologize for that! I see that you're going to be at Kafka Summit London. Do you want to meet up there in person and we can resolve any open issues?

@mitch-seymour

This comment has been minimized.

Copy link
Contributor Author

commented May 10, 2019

@agavra no worries, I've been busy preparing for Kafka Summit so haven't been able to work on this further. but sure, let's meet in person at Kafka Summit. I took a wild guess at your work email address and it didn't bounce back, so we can coordinate a time to meet up via email if that works for you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.