Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#30870]: support consumer polling timeout in KafkaIO expansion service #30915

Merged
merged 12 commits into from
Apr 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,16 @@ static <K, V> void setupExternalBuilder(
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);

if (config.consumerPollingTimeout != null) {
if (config.consumerPollingTimeout <= 0) {
throw new IllegalArgumentException("consumerPollingTimeout should be > 0.");
}
builder.setConsumerPollingTimeout(
Duration.standardSeconds(config.consumerPollingTimeout));
} else {
builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -893,6 +903,7 @@ public static class Configuration {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeout;
private String timestampPolicy;

public void setConsumerConfig(Map<String, String> consumerConfig) {
Expand Down Expand Up @@ -934,6 +945,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}

public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
this.consumerPollingTimeout = consumerPollingTimeout;
}
}
}

Expand Down Expand Up @@ -1341,8 +1356,9 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
Expand Down Expand Up @@ -2386,8 +2402,9 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ public void testConstructKafkaRead() throws Exception {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
Field.of("timestamp_policy", FieldType.STRING)))
Field.of("timestamp_policy", FieldType.STRING),
Field.of("consumer_polling_timeout", FieldType.INT64)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
.withFieldValue("consumer_polling_timeout", 5L)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class KafkaIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put(
"getValueDeserializerProvider", "value_deserializer_provider");
READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn");
READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix is not complete. Previously, the error was

java.lang.AssertionError: Method getConsumerPollingTimeout will not be tracked when upgrading the 'KafkaIO.Read' transform. Please update 'KafkaIOTranslation.KafkaIOReadWithMetadataTranslator' to track the new method and update this test.

adding this line, the error became

java.lang.AssertionError: Field name consumer_polling_timeout was not found in the read transform schema defined in KafkaIOReadWithMetadataTranslator.

}

// A mapping from Write transform builder methods to the corresponding schema fields in
Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout', typing.Optional[int])])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
consumer_polling_timeout=None,
with_metadata=False,
expansion_service=None,
):
Expand All @@ -159,6 +161,10 @@ def __init__(
:param commit_offset_in_finalize: Whether to commit offsets when finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
:param consumer_polling_timeout: Kafka client polling request
timeout time in seconds. A lower timeout optimizes for latency. Increase
the timeout if the consumer is not fetching any records. Default is 2
seconds.
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
Expand Down Expand Up @@ -186,7 +192,8 @@ def __init__(
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
timestamp_policy=timestamp_policy,
consumer_polling_timeout=consumer_polling_timeout)),
expansion_service or default_io_expansion_service())


Expand Down