Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-23: Pulsar Java Client Interceptors. #2471

Merged
merged 85 commits into from
Sep 5, 2018
Merged

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Aug 29, 2018

Motivation

Support user to add interceptors to producer and consumer.

Modifications

Add Consumer interceptors.

Message<T> beforeConsume(Message<T> message);
void onAcknowledge(MessageId messageId, Throwable cause);
void onAcknowledgeCumulative(MessageId messageId, Throwable cause);

Add Producer interceptors.

Message<T> beforeSend(Message<T> message);
void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable cause);

Result

Users can using interceptors in multiple scenarios, such as for applications to add
custom logging or processing.

Master Issue: #2476

codelipenghui added 16 commits August 24, 2018 07:43
Add interceptor(ProducerInterceptor<T>... interceptors) method to ProducerBuilder.java and ProducerBuilderImpl.java
Add ProducerInterceptors to ProducerBase.java
Implement before consume in consumer interceptors.
Add UT for before consumer in consumer interceptors.
Add interceptor(ProducerInterceptor<T>... interceptors) method to ProducerBuilder.java and ProducerBuilderImpl.java
Add ProducerInterceptors to ProducerBase.java
Implement before consume in consumer interceptors.
Add UT for before consumer in consumer interceptors.
@sijie sijie requested review from merlimat and jiazhai August 29, 2018 17:21
@sijie sijie added area/client type/feature The PR added a new feature or issue requested a new feature labels Aug 29, 2018
@sijie sijie added this to the 2.2.0-incubating milestone Aug 29, 2018
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui the change LGTM! a few nits around using mockito

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change LGTM, just left few comments.

One thing we should consider is to add ReaderInterceptor as well. Even though the implementation is completely based on consumer, from an API perspective they are completely different.

Internally the ReaderInterceptor would be just a wrapper of ConsumerInterceptor.

Another idea, which might be out of the scope of this PR is to have Client level interceptor, both to be automatically attached to any producer/consumer created by the client instance and to also intercept events like consumer/producer created.

* @param messageId message to ack, null if acknowledge fail.
* @param cause the exception on acknowledge.
*/
void onAcknowledge(MessageId messageId, Throwable cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception on acknowledge is an artifact of existing APIs. The current implementation is not throwing exceptions anymore (based on the fact that messages that are failed to be acknowledged will be replayed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In doAcknowledge method, if check state failed.

if (getState() != State.Ready && getState() != State.Connecting) {
            stats.incrementNumAcksFailed();
            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
            if (AckType.Individual.equals(ackType)) {
                onAcknowledge(messageId, exception);
            } else if (AckType.Cumulative.equals(ackType)) {
                onAcknowledgeCumulative(messageId, exception);
            }
            return FutureUtil.failedFuture(exception);
        }

So i thought parameter of exception should be keep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually only be called when the consumer has been closed.

@@ -105,7 +105,6 @@ void add(MessageImpl<?> msg, SendCallback callback) {
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
msg.getDataBuffer(), batchedMessageMetadataAndPayload);
messages.add(msg);
msgBuilder.recycle();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for why the message builder cannot be recycled anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerIntercetpor want to get the message properties. The message properties is lazy init by call getProperties(). In getProperties method:

 public synchronized Map<String, String> getProperties() {
        if (this.properties == null) {
            if (msgMetadataBuilder.getPropertiesCount() > 0) {
                this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
                        .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
            } else {
                this.properties = Collections.emptyMap();
            }
        }
        return this.properties;
    }

So i change call msgBuilder.recycle() after call interceptor.

public void sendComplete(Exception e) {
                if (e != null) {
                    stats.incrementSendFailed();
                    onSendAcknowledgement(interceptorMessage, null, e);
                    future.completeExceptionally(e);
                } else {
                    onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
                    future.complete(interceptorMessage.getMessageId());
                    stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
                }
                interceptorMessage.getDataBuffer().release();
                interceptorMessage.getMessageBuilder().recycle();
                while (nextCallback != null) {
                    SendCallback sendCallback = nextCallback;
                    MessageImpl<?> msg = nextMsg;
                    msg.getDataBuffer().retain();
                    if (e != null) {
                        stats.incrementSendFailed();
                        onSendAcknowledgement((Message<T>) msg, null, e);
                        sendCallback.getFuture().completeExceptionally(e);
                    } else {
                        onSendAcknowledgement((Message<T>) msg, msg.getMessageId(), null);
                        sendCallback.getFuture().complete(msg.getMessageId());
                        stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
                    }
                    msg.getDataBuffer().release();
                    msg.getMessageBuilder().recycle();
                    nextMsg = nextCallback.getNextMessage();
                    nextCallback = nextCallback.getNextSendCallback();
                }
            }


sendAsync(message, new SendCallback() {
MessageImpl<T> interceptorMessage = (MessageImpl<T>) beforeSend(message);
interceptorMessage.getDataBuffer().retain();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on why we are retaining the buffer here and when it will be released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@sijie sijie changed the title Pulsar Java Client Interceptors. PIP-23: Pulsar Java Client Interceptors. Aug 29, 2018
@sijie
Copy link
Member

sijie commented Aug 29, 2018

@merlimat : created #2476 for tracking the tasks for interceptors. lets focus on producer/consumer interceptor for this PR.

One thing we should consider is to add ReaderInterceptor as well. Even though the implementation is completely based on consumer, from an API perspective they are completely different.

Created #2477

Another idea, which might be out of the scope of this PR is to have Client level interceptor, both to be automatically attached to any producer/consumer created by the client instance and to also intercept events like consumer/producer created.

Created #2478

@merlimat
Copy link
Contributor

@merlimat : created #2476 for tracking the tasks for interceptors. lets focus on producer/consumer interceptor for this PR.

👍 Sounds good

codelipenghui added 6 commits August 30, 2018 14:41
Implement before consume in consumer interceptors.
Add UT for before consumer in consumer interceptors.
…consumers/producers, it might be good to also pass a reference to the Consumer and Producer.
…ally to ensure release. Change foreach to for++."

This reverts commit d605cad
… try/finally to ensure release. Change foreach to for++.""

This reverts commit aaffaf7
@sijie sijie changed the base branch from master to branch-2.1 August 30, 2018 07:31
@sijie sijie changed the base branch from branch-2.1 to master August 30, 2018 07:31
@codelipenghui
Copy link
Contributor Author

@merlimat I'm already address all your comments.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sijie
Copy link
Member

sijie commented Aug 31, 2018

retest this please

@merlimat
Copy link
Contributor

merlimat commented Sep 3, 2018

codelipenghui added 5 commits September 4, 2018 09:37
# Conflicts:
#	pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
# Conflicts:
#	pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@sijie
Copy link
Member

sijie commented Sep 4, 2018

run java8 tests

@sijie
Copy link
Member

sijie commented Sep 4, 2018

@codelipenghui I fixed ReplicatorTest and V1_ReplicatorTest. so the CI should be passing now. once the CI is passed, will merge it.

@sijie
Copy link
Member

sijie commented Sep 5, 2018

run java8 tests

@sijie sijie merged commit 7a92111 into apache:master Sep 5, 2018
@sijie
Copy link
Member

sijie commented Sep 5, 2018

@codelipenghui I just merged the interceptors change. Great work!

@codelipenghui codelipenghui deleted the PIP-23 branch September 6, 2018 01:10
@codelipenghui
Copy link
Contributor Author

@sijie Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants