Skip to content

Commit e9c640f

Browse files
feat: Add max message per batch option (#14)
1 parent d340c52 commit e9c640f

7 files changed

Lines changed: 72 additions & 15 deletions

File tree

src/main/java/com/google/cloud/pubsublite/spark/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
public class Constants {
2626
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
2727
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
28-
public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000;
28+
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
2929
public static StructType DEFAULT_SCHEMA =
3030
new StructType(
3131
new StructField[] {
@@ -46,6 +46,8 @@ public class Constants {
4646

4747
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
4848

49+
public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
50+
"pubsublite.flowcontrol.maxmessagesperbatch";
4951
public static String BYTES_OUTSTANDING_CONFIG_KEY =
5052
"pubsublite.flowcontrol.byteoutstandingperpartition";
5153
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =

src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public MicroBatchReader createMicroBatchReader(
9898
Ticker.systemTicker()),
9999
subscriptionPath,
100100
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
101+
pslDataSourceOptions.maxMessagesPerBatch(),
101102
topicPartitionCount);
102103
}
103104
}

src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings;
4343
import java.io.IOException;
4444
import java.io.Serializable;
45-
import java.util.Optional;
4645
import javax.annotation.Nullable;
4746
import org.apache.spark.sql.sources.v2.DataSourceOptions;
4847

@@ -55,18 +54,19 @@ public abstract class PslDataSourceOptions implements Serializable {
5554

5655
public abstract SubscriptionPath subscriptionPath();
5756

58-
@Nullable
5957
public abstract FlowControlSettings flowControlSettings();
6058

61-
public abstract long maxBatchOffsetRange();
59+
public abstract long maxMessagesPerBatch();
6260

6361
public static Builder builder() {
6462
return new AutoValue_PslDataSourceOptions.Builder()
6563
.setCredentialsKey(null)
66-
// TODO(jiangmichael): Revisit this later about if we need to expose this as a user
67-
// configurable option. Ideally we should expose bytes range/# msgs range not
68-
// offsets range since PSL doesn't guarantee offset = msg.
69-
.setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE);
64+
.setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH)
65+
.setFlowControlSettings(
66+
FlowControlSettings.builder()
67+
.setMessagesOutstanding(Constants.DEFAULT_MESSAGES_OUTSTANDING)
68+
.setBytesOutstanding(Constants.DEFAULT_BYTES_OUTSTANDING)
69+
.build());
7070
}
7171

7272
public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
@@ -75,10 +75,10 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions
7575
}
7676

7777
Builder builder = builder();
78-
Optional<String> value;
79-
if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) {
80-
builder.setCredentialsKey(value.get());
81-
}
78+
options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY).ifPresent(builder::setCredentialsKey);
79+
options
80+
.get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY)
81+
.ifPresent(mmpb -> builder.setMaxMessagesPerBatch(Long.parseLong(mmpb)));
8282
return builder
8383
.setSubscriptionPath(
8484
SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get()))
@@ -103,7 +103,7 @@ public abstract static class Builder {
103103

104104
public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath);
105105

106-
public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange);
106+
public abstract Builder setMaxMessagesPerBatch(long maxMessagesPerBatch);
107107

108108
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
109109

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ public boolean next() {
6464
msg = subscriber.messageIfAvailable();
6565
break;
6666
} catch (TimeoutException e) {
67-
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
67+
log.atWarning().log(
68+
String.format(
69+
"Unable to get any messages in last %s. Partition: %d; Current message offset: %s; End message offset: %d.",
70+
SUBSCRIBER_PULL_TIMEOUT.toString(),
71+
endOffset.partition().value(),
72+
currentMsg == null ? "null" : currentMsg.offset().value(),
73+
endOffset.offset()));
6874
} catch (Throwable t) {
6975
throw new IllegalStateException("Failed to retrieve messages.", t);
7076
}

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class PslMicroBatchReader implements MicroBatchReader {
4242
private final SubscriptionPath subscriptionPath;
4343
private final FlowControlSettings flowControlSettings;
4444
private final long topicPartitionCount;
45+
private final long maxMessagesPerBatch;
4546
@Nullable private SparkSourceOffset startOffset = null;
4647
private SparkSourceOffset endOffset;
4748

@@ -52,6 +53,7 @@ public PslMicroBatchReader(
5253
PerTopicHeadOffsetReader headOffsetReader,
5354
SubscriptionPath subscriptionPath,
5455
FlowControlSettings flowControlSettings,
56+
long maxMessagesPerBatch,
5557
long topicPartitionCount) {
5658
this.cursorClient = cursorClient;
5759
this.committer = committer;
@@ -60,6 +62,7 @@ public PslMicroBatchReader(
6062
this.subscriptionPath = subscriptionPath;
6163
this.flowControlSettings = flowControlSettings;
6264
this.topicPartitionCount = topicPartitionCount;
65+
this.maxMessagesPerBatch = maxMessagesPerBatch;
6366
}
6467

6568
@Override
@@ -79,7 +82,11 @@ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
7982
"end offset is not instance of SparkSourceOffset.");
8083
endOffset = (SparkSourceOffset) end.get();
8184
} else {
82-
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
85+
SparkSourceOffset headOffset =
86+
PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
87+
endOffset =
88+
PslSparkUtils.getSparkEndOffset(
89+
headOffset, startOffset, maxMessagesPerBatch, topicPartitionCount);
8390
}
8491
}
8592

src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.pubsublite.SubscriptionPath;
2626
import com.google.cloud.pubsublite.internal.CursorClient;
2727
import com.google.common.collect.ListMultimap;
28+
import com.google.common.math.LongMath;
2829
import com.google.protobuf.ByteString;
2930
import com.google.protobuf.util.Timestamps;
3031
import java.util.ArrayList;
@@ -132,4 +133,24 @@ public static SparkSourceOffset getSparkStartOffset(
132133
"Failed to get information from PSL and construct startOffset", e);
133134
}
134135
}
136+
137+
// EndOffset = min(startOffset + batchOffsetRange, headOffset)
138+
public static SparkSourceOffset getSparkEndOffset(
139+
SparkSourceOffset headOffset,
140+
SparkSourceOffset startOffset,
141+
long maxMessagesPerBatch,
142+
long topicPartitionCount) {
143+
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
144+
for (int i = 0; i < topicPartitionCount; i++) {
145+
Partition p = Partition.of(i);
146+
SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L);
147+
long head = headOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
148+
long start = startOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
149+
map.put(
150+
p,
151+
SparkPartitionOffset.create(
152+
p, Math.min(LongMath.saturatedAdd(start, maxMessagesPerBatch), head)));
153+
}
154+
return new SparkSourceOffset(map);
155+
}
135156
}

src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class PslMicroBatchReaderTest {
4040
private final PartitionSubscriberFactory partitionSubscriberFactory =
4141
mock(PartitionSubscriberFactory.class);
4242
private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class);
43+
private static final long MAX_MESSAGES_PER_BATCH = 20000;
4344
private final PslMicroBatchReader reader =
4445
new PslMicroBatchReader(
4546
cursorClient,
@@ -48,6 +49,7 @@ public class PslMicroBatchReaderTest {
4849
headOffsetReader,
4950
UnitTestExamples.exampleSubscriptionPath(),
5051
OPTIONS.flowControlSettings(),
52+
MAX_MESSAGES_PER_BATCH,
5153
2);
5254

5355
private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) {
@@ -119,4 +121,22 @@ public void testPlanInputPartitionNoMessage() {
119121
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset));
120122
assertThat(reader.planInputPartitions()).hasSize(1);
121123
}
124+
125+
@Test
126+
public void testMaxMessagesPerBatch() {
127+
when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath()))
128+
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
129+
when(headOffsetReader.getHeadOffset())
130+
.thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L));
131+
reader.setOffsetRange(Optional.empty(), Optional.empty());
132+
assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap())
133+
.containsExactly(
134+
Partition.of(0L),
135+
// the maxMessagesPerBatch setting takes effect as 100L + maxMessagesPerBatch is less
136+
// than
137+
// 10000000L.
138+
SparkPartitionOffset.create(Partition.of(0L), 100L + MAX_MESSAGES_PER_BATCH - 1L),
139+
Partition.of(1L),
140+
SparkPartitionOffset.create(Partition.of(1L), -1L));
141+
}
122142
}

0 commit comments

Comments
 (0)