Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,14 @@ public static <K, V> WriteRecords<K, V> writeRecords() {

///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\

/**
* Default number of keys to redistribute Kafka inputs into.
*
* <p>This value is used when {@link Read#withRedistribute()} is used without {@link
* Read#withRedistributeNumKeys(int redistributeNumKeys)}.
*/
private static final int DEFAULT_REDISTRIBUTE_NUM_KEYS = 32768;

/**
* A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
* usage and configuration.
Expand Down Expand Up @@ -1099,7 +1107,11 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
* @return an updated {@link Read} transform.
*/
public Read<K, V> withRedistribute() {
return toBuilder().setRedistributed(true).build();
Builder<K, V> builder = toBuilder().setRedistributed(true);
if (getRedistributeNumKeys() == 0) {
builder = builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
}
return builder.build();
}

/**
Expand All @@ -1121,10 +1133,11 @@ public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
* Redistributes Kafka messages into a distinct number of keys for processing in subsequent
* steps.
*
* <p>Specifying an explicit number of keys is generally recommended over redistributing into an
* unbounded key space.
* <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
*
* <p>Must be used with {@link KafkaIO#withRedistribute()}.
* <p>Use zero to disable bucketing into a distinct number of keys.
*
* <p>Must be used with {@link Read#withRedistribute()}.
*
* @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
* @return an updated {@link Read} transform.
Expand Down Expand Up @@ -2667,13 +2680,30 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {

/** Enable Redistribute. */
public ReadSourceDescriptors<K, V> withRedistribute() {
return toBuilder().setRedistribute(true).build();
Builder<K, V> builder = toBuilder().setRedistribute(true);
if (getRedistributeNumKeys() == 0) {
builder = builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
}
return builder.build();
}

public ReadSourceDescriptors<K, V> withAllowDuplicates() {
return toBuilder().setAllowDuplicates(true).build();
}

/**
* Redistributes Kafka messages into a distinct number of keys for processing in subsequent
* steps.
*
* <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
*
* <p>Use zero to disable bucketing into a distinct number of keys.
*
* <p>Must be used with {@link ReadSourceDescriptors#withRedistribute()}.
*
* @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
* @return an updated {@link Read} transform.
*/
public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -792,6 +793,53 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
p.run();
}

@Test
public void testDefaultRedistributeNumKeys() {
int numElements = 1000;
// Redistribute is not used and does not modify the read transform further.
KafkaIO.Read<Integer, Long> read =
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
null, /*numKeys*/
null, /*offsetDeduplication*/
null /*topics*/);
assertFalse(read.isRedistributed());
assertEquals(0, read.getRedistributeNumKeys());

// Redistribute is used and defaulted the number of keys due to no user setting.
read =
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
true, /*redistribute*/
false, /*allowDuplicates*/
null, /*numKeys*/
null, /*offsetDeduplication*/
null /*topics*/);
assertTrue(read.isRedistributed());
// Default is defined by DEFAULT_REDISTRIBUTE_NUM_KEYS in KafkaIO.
assertEquals(32768, read.getRedistributeNumKeys());

// Redistribute is set with user-specified the number of keys.
read =
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
true, /*redistribute*/
false, /*allowDuplicates*/
10, /*numKeys*/
null, /*offsetDeduplication*/
null /*topics*/);
assertTrue(read.isRedistributed());
assertEquals(10, read.getRedistributeNumKeys());
}

@Test
public void testDisableRedistributeKafkaOffsetLegacy() {
thrown.expect(Exception.class);
Expand Down
Loading