Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Functions] Process async results in the same Java runnable thread #10618

Merged
merged 2 commits into from
May 18, 2021

Conversation

sijie
Copy link
Member

@sijie sijie commented May 17, 2021

Motivation

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

Modification

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.

*Motivation*

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

*Modification*

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.
handleResult(currentRecord, result);
} else {
// process the asynchronous results
processAsyncResults();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null result indicates the current record is placed into the queue for async processing.

The processAsyncResults() method takes the first element from the queue and checks/proceeds with the result.

  1. It seems a newly added record triggers the check of whether the oldest added record is done. This behavior is undesirable since records are independent.
  2. I'm wondering there'll always be one element left in the queue regardless whether it's done or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nlu90 Nice catch! I have addressed your comment in commit 326ae15

srcRecord, t);
stats.incrUserExceptions(t);
srcRecord.fail();
private void processAsyncResults() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this empty method

Copy link
Member

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except the above minor comment

@codelipenghui codelipenghui merged commit 1cbd5b4 into apache:master May 18, 2021
@sijie sijie deleted the fix_kinesis branch May 18, 2021 18:00
codelipenghui pushed a commit that referenced this pull request May 21, 2021
…10618)

* [Functions] Process async results in the same Java runnable thread

*Motivation*

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

*Modification*

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.

* Process the async results in the different thread

(cherry picked from commit 1cbd5b4)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label May 21, 2021
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
…pache#10618)

* [Functions] Process async results in the same Java runnable thread

*Motivation*

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

*Modification*

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.

* Process the async results in the different thread
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…pache#10618)

* [Functions] Process async results in the same Java runnable thread

*Motivation*

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

*Modification*

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.

* Process the async results in the different thread
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants