Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28870][Connector/Pulsar] Improve the Pulsar source performance when meeting small data rates. #15

Merged
merged 1 commit into from Jan 10, 2023

Conversation

syhily
Copy link
Contributor

@syhily syhily commented Jan 10, 2023

What is the purpose of the change

When using Pulsar Source to consume data, if the data rate is small, e.g. 2 msg/s, there will be long periods of time when no messages are consumed.

This is caused by the default PulsarSourceOptions.PULSAR_MAX_FETCH_TIME and PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS options. Pulsar Source will try to pull messages until any conditions exceed. Such as fetch until 100 messages or fetch 10 seconds.

We have to add a new fetch time option for Pulsar Source. We would consider there is no message on the current topic if this fetch time exceeds. This would make sure the source would stop fetching messages when the 100ms exceed. Avoid hanging on small message income rates.

Brief change log

  • Add new PulsarSourceOptions.PULSAR_SINGLE_FETCH_TIME option.
  • Change the polling timeout to PulsarSourceOptions.PULSAR_SINGLE_FETCH_TIME.

Verifying this change

This change is already covered by existing tests, such as PulsarPartitionSplitReaderTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two comments here:

  1. To keep the user experience consistent, is it possible to make singleFetchTime optional and only into use if configured?
  2. The option name may be maxFetchOneMessageTime or sth for comprehension.

@syhily syhily force-pushed the hotfix/improve-source-speed branch from 1879a3d to 746e903 Compare January 10, 2023 03:45
@syhily
Copy link
Contributor Author

syhily commented Jan 10, 2023

To keep the user experience consistent, is it possible to make singleFetchTime optional and only into use if configured?

Yep.

@syhily syhily force-pushed the hotfix/improve-source-speed branch 2 times, most recently from 3ada124 to c6fee88 Compare January 10, 2023 03:55
@tisonkun
Copy link
Member


Error:  src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java:[46] (imports) ImportOrder: Import org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME appears after other imports that it should precede
Error:  src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java:[65] (imports) ImportOrder: Import org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME appears after other imports that it should precede
Error:  src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java:[52] (imports) ImportOrder: Import org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME appears after other imports that it should precede
Error:  src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java:[43] (imports) ImportOrder: Import org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME appears after other imports that it should precede

checkstyle fails :/

@syhily syhily force-pushed the hotfix/improve-source-speed branch 2 times, most recently from e65d81e to ff5d49d Compare January 10, 2023 04:08
@syhily syhily force-pushed the hotfix/improve-source-speed branch from ff5d49d to ed117b9 Compare January 10, 2023 04:16
@tisonkun
Copy link
Member

Merging...

@tisonkun tisonkun merged commit 6991f38 into apache:main Jan 10, 2023
@syhily syhily deleted the hotfix/improve-source-speed branch January 10, 2023 09:09
cbornet pushed a commit to cbornet/flink-connector-pulsar that referenced this pull request Jun 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants