Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6018: Make KafkaFuture.Future an interface #4033

Closed
wants to merge 2 commits into from

Conversation

steven-aerts
Copy link
Contributor

Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface makes
them a functional interface. This makes them Java 8 lambda compatible.

@ijuma
Copy link
Contributor

ijuma commented Oct 31, 2017

cc @cmccabe

@steven-aerts
Copy link
Contributor Author

Yes it does.

As discussed in KAFKA-6018 the interface is marked as @InterfaceStability.Evolving and documented as:

This will eventually become a thin shim on top of Java 8's CompletableFuture.

If you want I can create a KIP for this. But is this change big enough for this?

@ijuma
Copy link
Contributor

ijuma commented Oct 31, 2017

Yeah, a KIP is definitely required for any change with compatibility impact. And we can't do it until Kafka 2.0. We will definitely drop support for Java 7 by then and if we are going to break compatibility, we should take that into account as well.

@steven-aerts
Copy link
Contributor Author

So you propose me to create a KIP, or wait with the KIP for Kafka 2.0?

Btw, Java 8 is not required for this, so from that perspective it does not need to be lined up with the deprecation of Java 7.

@ijuma
Copy link
Contributor

ijuma commented Oct 31, 2017

You can propose the KIP now, that's independent of the release where the code change is included. I understand that Java 8 is not required for this. My point is that if we have to break compatibility and we can rely on Java 8 classes, then the solution may be different than if we can't rely on Java 8 classes.

For example,KafkaFuture may extend CompletionStage or CompletableFuture. Or we could remove KafkaFuture altogether. Just things to think about.

@cmccabe
Copy link
Contributor

cmccabe commented Oct 31, 2017

How about just creating a new interface, which KafkaFuture#Function can implement? Then there is no compatibility impact.

@ijuma
Copy link
Contributor

ijuma commented Oct 31, 2017

@cmccabe I think you'd have to have methods that take the new interface as the parameter instead of the existing abstract class.

@cmccabe
Copy link
Contributor

cmccabe commented Oct 31, 2017

You could add a separate function overload which took the interface, rather than the concrete class. Then we could deprecate the variant which took the concrete class.

@ijuma
Copy link
Contributor

ijuma commented Oct 31, 2017

Yeah, that's a possibility and the kind of thing that should be in the KIP.

@steven-aerts
Copy link
Contributor Author

Just updated the pull request, so the change becomes a backwards compatible change.

This is now all tracked under KIP-218

@steven-aerts steven-aerts force-pushed the KAFKA-6018 branch 2 times, most recently from ea745e6 to 693ac25 Compare December 2, 2017 08:46
/**
* A function which takes objects of type A and returns objects of type B.
*
* Replaced by {@link FunctionInterface}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need an @deprecated javadoc tag here as well as the annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, update coming.

public abstract <R> KafkaFuture<R> thenApply(FunctionInterface<T, R> function);

/**
* @see KafkaFuture#thenApply(FunctionInterface)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need an @deprecated javadoc tag here as well as the annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
addWaiter(new Applicant<>(function, future));
return future;
}

/**
* @See KafkaFutureImpl#thenApply(FunctionInterface)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need an @deprecated javadoc tag here as well as the annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
return thenApply((FunctionInterface<T, R>) function);
}

@Override
protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this to be useful to me in my work on KIP-183 I also need addWaiter() to be made public on KafkaFuture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will take it in this commit.

Copy link
Contributor

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass review.

Copy link
Contributor Author

@steven-aerts steven-aerts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review Tom. Adapting my changes in the coming minutes.

/**
* A function which takes objects of type A and returns objects of type B.
*
* Replaced by {@link FunctionInterface}.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, update coming.

public abstract <R> KafkaFuture<R> thenApply(FunctionInterface<T, R> function);

/**
* @see KafkaFuture#thenApply(FunctionInterface)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
addWaiter(new Applicant<>(function, future));
return future;
}

/**
* @See KafkaFutureImpl#thenApply(FunctionInterface)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
return thenApply((FunctionInterface<T, R>) function);
}

@Override
protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will take it in this commit.

@cmccabe
Copy link
Contributor

cmccabe commented Dec 6, 2017

Looks good overall (and good call, @tombentley , on the missing deprecated annotations)

@xvrl
Copy link
Member

xvrl commented Dec 8, 2017

@steven-aerts someone just pointed this out to me. I was about to suggest we make the addWaiter method public. While we're at it, can we also incorporate the documentation and exception handling fixes I added here #4308

@steven-aerts
Copy link
Contributor Author

@Kvrl For me it is perfect to see #4308 as a part of this KIP proposal.

@kurtostfeld
Copy link
Contributor

kurtostfeld commented Dec 13, 2017

The Kafka streams project in this repo already has both of these functional interfaces with identical signatures

./streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
./streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java

Ideally, the project would just use the JDK8 interfaces and not redefine it's own implementations. AFAIK, the next major version of Kafka is dropping JDK7 support anyway.

https://docs.oracle.com/javase/8/docs/api/java/util/function/BiConsumer.html
https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html

If JDK7 compatibility is still required for future versions of Kafka, then you can't use those. Even then there's no good reason to have redundant identical versions of BiConsumer/Function in the Kafka code base.

The Kafka Streams API has several function types that have identical JDK8 versions. Ideally, the Kafka project would just use the JDK8 versions for all of these:

org.apache.kafka.streams.kstream.ForeachAction -> java.util.function.BiConsumer
org.apache.kafka.streams.kstream.Initializer -> java.util.function.Supplier
org.apache.kafka.streams.kstream.KeyValueMapper -> java.util.function.BiFunction
org.apache.kafka.streams.kstream.Predicate -> java.util.function.BiPredicate
org.apache.kafka.streams.kstream.Reducer -> java.util.function.BinaryOperator
org.apache.kafka.streams.kstream.ValueJoiner -> java.util.function.BiFunction
org.apache.kafka.streams.kstream.ValueMapper -> java.util.function.Function

* Returns a new KafkaFuture with the same result or exception as this future, that executes the given action
* when this future completes.
*
* When this future is id done, the given action is invoked with the result (or null if none) and the exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extraneous id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

* exceptionally with the supplied action's exception.
* Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future
* completes exceptionally with this future's exception.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness I would add

The action may be invoked by the thread that calls {@code whenComplete} or it may be invoked
 by the thread that completes the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also add it to thenApply while we're at it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, will update the PR.

Copy link
Contributor Author

@steven-aerts steven-aerts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kurtostfeld I see your point that the streams library has interfaces with a
signature equal to the FunctionInterface and the BiConsumer.

I still propose to keep them separate and stick to the current proposal of the
KIP for the following reasons:

  • The names used in the kafka streams for those interfaces are very use
    case specific, e.g. #whenComplete(ForeachAction) sounds rather strange to me.
  • To prevent a circular dependency those interfaces have to be moved to a different jar.
  • The interfaces we define now are inner class interfaces, so it is clear that they are linked to KafkaFuture.

I totally agree that once JDK7 is dropped we have to migrate to standard java classes.
I think this is rather easy to do this in a backwards compatible way, by letting the kafka
specific classes implement the JDK8 classes. So KafkaFuture.BiConsumer will then
implement the java.util.function.BiConsumer and KafkaFuture could then implement
CompletableFuture. But that is for me all work to be done under KIP-118.

* Returns a new KafkaFuture with the same result or exception as this future, that executes the given action
* when this future completes.
*
* When this future is id done, the given action is invoked with the result (or null if none) and the exception
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

* exceptionally with the supplied action's exception.
* Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future
* completes exceptionally with this future's exception.
*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, will update the PR.

@steven-aerts
Copy link
Contributor Author

Just updated the commit to take in the latest review comments of xavier fixed one of my own in the code of colin.

@steven-aerts steven-aerts mentioned this pull request Dec 14, 2017
3 tasks
@kurtostfeld
Copy link
Contributor

Thank you for the reply. Two quick follow on comments:

  • There is no circular dependency. The streams project depends on clients, not the other way around. Code that needs to be accessible to both could go in clients without making any dependency or project structure changes.
  • If JDK8 classes can be used, you wouldn't want to keep custom versions of BiConsumer/Function/etc, or have custom versions that extend the JDK version, you would just want to use the JDK version and remove analogs from the Kafka code base. I'd imagine you would also want to use JDK8 CompletableFuture rather than have a custom KafkaFuture.

@xvrl
Copy link
Member

xvrl commented Dec 18, 2017

@kurtostfeld even if the interfaces have the same signatures, I think the specific naming in streams helps clarify the intent. And for someone using Java 8 lambdas, the name doesn't really matter, so renaming interfaces in this context seems a bit counterproductive and does not provide much value other than deduplicating the BiConsumer signature. Until we drop support for JDK7, I think it's simpler to keep things as is and revisit the APIs at that point.

@steven-aerts
Copy link
Contributor Author

The KIP for this PR was accepted.

@xvrl can you merge this PR? Or do you still see some work which needs to be done?

@xvrl
Copy link
Member

xvrl commented Dec 19, 2017 via email

@cmccabe
Copy link
Contributor

cmccabe commented Jan 3, 2018

Interestingly, when using Java's CompletableFuture, this code:

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

class CompleteableFoo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> original = new CompletableFuture<>();
        CompletableFuture<Void> next = original.
            whenComplete((result,error)->{throw new RuntimeException("exception 2");});
        original.completeExceptionally(new RuntimeException("exception 1"));
        next.get();
    }
}

Prints this:

cmccabe@aurora:~/test> java CompleteableFoo
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception 1
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at CompleteableFoo.main(CompleteableFoo.java:10)
Caused by: java.lang.RuntimeException: exception 1
        at CompleteableFoo.main(CompleteableFoo.java:9)

In other words, the exception thrown by the BiConsumer is ignored, if there is already an exception being processed.

So I think your exception handling here is reasonable (because it matches up with what people will expect from CF)

@cmccabe
Copy link
Contributor

cmccabe commented Jan 3, 2018

+1 (non-binding)

@xvrl
Copy link
Member

xvrl commented Jan 8, 2018

@ijuma are we waiting on anything from you to +1 this? I would like to see this merged for the next release.

@ewencp ewencp closed this Jan 12, 2018
@ewencp ewencp reopened this Jan 12, 2018
@xvrl
Copy link
Member

xvrl commented Jan 17, 2018

@steven-aerts, it looks like @ijuma mentioned a few minor things he'd like to get addressed:

  • Remove the deprecation for Function
  • Updating the KIP document to add the new signature of interfaces and methods being added (for posterity)
  • Possibly rename FunctionInterface to BaseFunction

See the KIP discussion thread for more details. Once those are addressed we can get this merged.
Do you thing you'll have time to get to it soon so this can make it into 1.1?

Replace the abstract class KafkaFuture.Function with an interface, so it becomes a
java 8 @FuncionalInterface and can be used with a lambda.
@steven-aerts
Copy link
Contributor Author

@ijuma @xvrl,

  • Removed @Deprecation and replaced it with a statement that those constructs might be deprecated/removed in a future version.
  • Renamed FunctionInterface to BaseFunction

Time to update the KIP so it is ready for the release notes.

@ewencp
Copy link
Contributor

ewencp commented Jan 30, 2018

LGTM, merging to trunk for 1.1.0. Thanks for the contribution @steven-aerts!

@ewencp ewencp closed this in ae42cc8 Jan 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants