Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kinesis]Fix kinesis sink can not retry to send messages #10420

Merged
merged 5 commits into from
Apr 29, 2021

Conversation

RobertIndie
Copy link
Member

Motivation

Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if retainOrdering is enabled, it will lead to subsequent messages can not be sent like the following:

17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034]

Modifications

  • Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send.

Verify

  • Disconnect the network
  • Send a message to the input topic of the kinesis sink connector. It will output logs:

18:12:58.448 [public/default/pulsar-kinesis-sink-0] DEBUG org.apache.pulsar.io.kinesis.KinesisSink - Published message to kinesis stream test-io with size 3
18:13:16.619 [kpl-daemon-0003] ERROR com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2021-04-28 18:13:16.618959] [0x00007b57][0x0000700009bf1000] [error] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 28 - Timeout was reached
18:13:16.619 [kpl-daemon-0003] ERROR com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2021-04-28 18:13:16.619002] [0x00007b57][0x0000700009bf1000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: -1
Exception name:
Error message: Unable to connect to endpoint
0 response headers:
...

  • When the internal retry of KinesisProducer fails, it will go into the onFailure method of ProducerSendCallback and start retry.

18:13:28.742 [pool-2-thread-1] INFO org.apache.pulsar.io.kinesis.KinesisSink - [test-io] Retry to publish message for replicator of Optional[persistent://public/default/test-kinesis-0]-Optional[2684354588] after 1000 ms.
...
18:14:07.141 [pool-2-thread-1] INFO org.apache.pulsar.io.kinesis.KinesisSink - [test-io] Retry to publish message for replicator of Optional[persistent://public/default/test-kinesis-0]-Optional[2684354588] after 1914 ms.
...
18:14:39.385 [pool-2-thread-1] INFO org.apache.pulsar.io.kinesis.KinesisSink - [test-io] Retry to publish message for replicator of Optional[persistent://public/default/test-kinesis-0]-Optional[2684354588] after 3713 ms.

  • Reconnect the network. It will send the message successfully.

18:14:47.102 [kpl-callback-pool-0-thread-0] DEBUG org.apache.pulsar.io.kinesis.KinesisSink - Successfully published message for test-io-shardId-000000000000 with latency 108652

  • It can also successfully send subsequent messages

18:14:52.166 [public/default/pulsar-kinesis-sink-0] DEBUG org.apache.pulsar.io.kinesis.KinesisSink - Published message to kinesis stream test-io with size 3
18:14:52.805 [kpl-callback-pool-0-thread-0] DEBUG org.apache.pulsar.io.kinesis.KinesisSink - Successfully published message for test-io-shardId-000000000000 with latency 640

@eolivelli
Copy link
Contributor

@dlg99 PTAL

@dlg99
Copy link
Contributor

dlg99 commented Apr 29, 2021

If I understood this approach correctly and to my best understanding of how the sink works, it will end up accumulating data in memory until the connection with kinesis is restored. If that takes too long, the PulsarSource will keep producing the data for the sink and eventually this will OOM. it needs a way (through the context?) to tell PulsarSource to pause.

I suggest trying your test with "Send a million messages to the input topic of the kinesis sink connector." and monitoring jvm's memory

@codelipenghui
Copy link
Contributor

@dlg99 @RobertIndie We can leverage the max unack message limitation to limit the memory of the connector when using retainOrdering=false, it needs users to use the Shared subscription for the connector. For retainedOrdering=true, we only retry the current message until success, so this will not introduce OOM issues.

@RobertIndie
Copy link
Member Author

Here I did some stress tests:

  • Start the sink connector with retainOrdering enabled.
    image
  • Disconnect the network to let the kinesis producer fails to send messages.
  • Produce 1 million messages with rate 100000/s.
    image
  • The sink connector will skip all the subsequent messages to retain ordering.
    image
  • The memory of sink connector has not grown significantly.

When retainOrdering is disabled, there will be no retry. The changes made by this PR will not affect this case.

@codelipenghui
Copy link
Contributor

@dlg99 Please help take a look.

Copy link
Contributor

@dlg99 dlg99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Thank you handling this.

@codelipenghui codelipenghui merged commit 345cd33 into apache:master Apr 29, 2021
@sijie
Copy link
Member

sijie commented Apr 29, 2021

@eolivelli @codelipenghui I marked this issue for 2.7.2

@eolivelli
Copy link
Contributor

Okay. Feel free to cherry pick. Otherwise I will do it tomorrow

codelipenghui pushed a commit that referenced this pull request Apr 30, 2021
### Motivation

Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if `retainOrdering` is enabled, it will lead to subsequent messages can not be sent like the following:
> 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN  org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034]


### Modifications

* Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send.

(cherry picked from commit 345cd33)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Apr 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants