Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#30870]: support consumer polling timeout in KafkaIO expansion service #30915

Merged
merged 12 commits into from
Apr 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,16 @@ static <K, V> void setupExternalBuilder(
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);

if (config.consumerPollingTimeoutSeconds != null) {
if (config.consumerPollingTimeoutSeconds <= 0) {
throw new IllegalArgumentException("consumerPollingTimeoutSeconds should be > 0.");
}
builder.setConsumerPollingTimeout(
Duration.standardSeconds(config.consumerPollingTimeoutSeconds));
} else {
builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -893,6 +903,7 @@ public static class Configuration {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeoutSeconds;
private String timestampPolicy;

public void setConsumerConfig(Map<String, String> consumerConfig) {
Expand Down Expand Up @@ -934,6 +945,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}

public void setConsumerPollingTimeoutSeconds(Long consumerPollingTimeoutSeconds) {
this.consumerPollingTimeoutSeconds = consumerPollingTimeoutSeconds;
}
}
}

Expand Down Expand Up @@ -1342,7 +1357,7 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* The default is 2 seconds.
xianhualiu marked this conversation as resolved.
Show resolved Hide resolved
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
Expand Down Expand Up @@ -2387,7 +2402,7 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* The default is 2 seconds.
xianhualiu marked this conversation as resolved.
Show resolved Hide resolved
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ public void testConstructKafkaRead() throws Exception {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
Field.of("timestamp_policy", FieldType.STRING)))
Field.of("timestamp_policy", FieldType.STRING),
Field.of("consumer_polling_timeout_seconds", FieldType.INT64)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
.withFieldValue("consumer_polling_timeout_seconds", 5L)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout_seconds', typing.Optional[int])])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
consumer_polling_timeout_seconds=None,
with_metadata=False,
expansion_service=None,
):
Expand All @@ -159,6 +161,8 @@ def __init__(
:param commit_offset_in_finalize: Whether to commit offsets when finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
:param consumer_polling_timeout_seconds: Kafka client polling request
timeout time in seconds. Default is 2 seconds.
xianhualiu marked this conversation as resolved.
Show resolved Hide resolved
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
Expand Down Expand Up @@ -186,7 +190,9 @@ def __init__(
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
timestamp_policy=timestamp_policy,
consumer_polling_timeout_seconds=consumer_polling_timeout_seconds
)),
expansion_service or default_io_expansion_service())


Expand Down