Skip to content
Permalink
Browse files
[FLINK-27699][python][connector/pulsar] Support StopCursor.at_publish…
…_time

This closes #19771.
  • Loading branch information
deadwind4 authored and dianfu committed May 21, 2022
1 parent e7d004d commit 18a967f8ad7b22c2942e227fb84f08f552660b5a
Showing 2 changed files with 19 additions and 7 deletions.
@@ -172,10 +172,21 @@ def latest() -> 'StopCursor':

@staticmethod
def at_event_time(timestamp: int) -> 'StopCursor':
warnings.warn(
"at_event_time is deprecated. Use at_publish_time instead.", DeprecationWarning)
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atEventTime(timestamp))

@staticmethod
def at_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop when message publishTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atPublishTime(timestamp))


class PulsarSource(Source):
"""
@@ -188,7 +199,7 @@ class PulsarSource(Source):
>>> source = PulsarSource() \\
... .builder() \\
... .set_topics(TOPIC1, TOPIC2) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
@@ -255,7 +266,7 @@ class PulsarSourceBuilder(object):
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.at_event_time(int(time.time() * 1000)))
... .set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
... .build()
"""

@@ -310,7 +321,7 @@ def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
setTopics or setTopicPattern in this builder.
set_topics or set_topic_pattern in this builder.
"""
warnings.warn("set_topics_pattern is deprecated. Use set_topic_pattern instead.",
DeprecationWarning, stacklevel=2)
@@ -320,7 +331,7 @@ def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
setTopics or setTopicPattern in this builder.
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topic_pattern)
return self
@@ -179,7 +179,7 @@ def test_pulsar_source(self):
.set_topics('ada') \
.set_start_cursor(StartCursor.earliest()) \
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_bounded_stop_cursor(StopCursor.at_event_time(22)) \
.set_bounded_stop_cursor(StopCursor.at_publish_time(22)) \
.set_subscription_name('ff') \
.set_subscription_type(SubscriptionType.Exclusive) \
.set_deserialization_schema(
@@ -254,12 +254,13 @@ def test_source_deprecated_method(self):
pulsar_source = PulsarSource.builder() \
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topics_pattern('ada.*') \
.set_topic_pattern('ada.*') \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
.set_unbounded_stop_cursor(StopCursor.at_publish_time(4444)) \
.set_subscription_name('ff') \
.set_config(test_option, True) \
.set_config_with_dict({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.build()
configuration = get_field_value(pulsar_source.get_java_function(), "sourceConfiguration")
self.assertEqual(

0 comments on commit 18a967f

Please sign in to comment.