Skip to content

Commit

Permalink
feat: Change restriction to OffsetByteRange to allow functioning with…
Browse files Browse the repository at this point in the history
… runnerv2. (#674)

* feat: Change restriction to OffsetByteRange to allow functioning with runnerv2.

* fix: Rebase and fix test

* fix: Rebase
  • Loading branch information
dpcollins-google committed Jun 15, 2021
1 parent 558d505 commit 1749ca9
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private void onMessages(MessageResponse response) throws CheckedApiException {
checkState(
response.getMessagesCount() > 0,
String.format(
"Received an empty MessageResponse on stream with initial request %s.", initialRequest));
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest));
List<SequencedMessage> messages =
response.getMessagesList().stream()
.map(SequencedMessage::fromProto)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;

@AutoValue
@DefaultSchema(AutoValueSchema.class)
abstract class OffsetByteRange {
abstract OffsetRange getRange();

abstract long getByteCount();

static OffsetByteRange of(OffsetRange range, long byteCount) {
return new AutoValue_OffsetByteRange(range, byteCount);
}

static OffsetByteRange of(OffsetRange range) {
return of(range, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.joda.time.Duration;

Expand All @@ -43,23 +41,26 @@
* received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
* would return ProcessContinuation.resume().
*/
class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
implements HasProgress {
class OffsetByteRangeTracker extends TrackerWithProgress {
private final TopicBacklogReader backlogReader;
private final Duration minTrackingTime;
private final long minBytesReceived;
private final Stopwatch stopwatch;
private OffsetRange range;
private OffsetByteRange range;
private @Nullable Long lastClaimed;
private long byteCount = 0;

public OffsetByteRangeTracker(
OffsetRange range,
OffsetByteRange range,
TopicBacklogReader backlogReader,
Stopwatch stopwatch,
Duration minTrackingTime,
long minBytesReceived) {
checkArgument(range.getTo() == Long.MAX_VALUE);
checkArgument(
range.getRange().getTo() == Long.MAX_VALUE,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
checkArgument(
range.getByteCount() == 0L,
"May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
this.backlogReader = backlogReader;
this.minTrackingTime = minTrackingTime;
this.minBytesReceived = minBytesReceived;
Expand All @@ -86,32 +87,32 @@ public boolean tryClaim(OffsetByteProgress position) {
position.lastOffset().value(),
lastClaimed);
checkArgument(
toClaim >= range.getFrom(),
toClaim >= range.getRange().getFrom(),
"Trying to claim offset %s before start of the range %s",
toClaim,
range);
// split() has already been called, truncating this range. No more offsets may be claimed.
if (range.getTo() != Long.MAX_VALUE) {
boolean isRangeEmpty = range.getTo() == range.getFrom();
boolean isValidClosedRange = nextOffset() == range.getTo();
if (range.getRange().getTo() != Long.MAX_VALUE) {
boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
checkState(
isRangeEmpty || isValidClosedRange,
"Violated class precondition: offset range improperly split. Please report a beam bug.");
return false;
}
lastClaimed = toClaim;
byteCount += position.batchBytes();
range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
return true;
}

@Override
public OffsetRange currentRestriction() {
public OffsetByteRange currentRestriction() {
return range;
}

private long nextOffset() {
checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
}

/**
Expand All @@ -123,29 +124,33 @@ private boolean receivedEnough() {
if (duration.isLongerThan(minTrackingTime)) {
return true;
}
if (byteCount >= minBytesReceived) {
if (currentRestriction().getByteCount() >= minBytesReceived) {
return true;
}
return false;
}

@Override
public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
// Cannot split a bounded range. This should already be completely claimed.
if (range.getTo() != Long.MAX_VALUE) {
if (range.getRange().getTo() != Long.MAX_VALUE) {
return null;
}
if (!receivedEnough()) {
return null;
}
range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
range =
OffsetByteRange.of(
new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
range.getByteCount());
return SplitResult.of(
this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
}

@Override
@SuppressWarnings("unboxing.of.nullable")
public void checkDone() throws IllegalStateException {
if (range.getFrom() == range.getTo()) {
if (range.getRange().getFrom() == range.getRange().getTo()) {
return;
}
checkState(
Expand All @@ -154,18 +159,18 @@ public void checkDone() throws IllegalStateException {
range);
long lastClaimedNotNull = checkNotNull(lastClaimed);
checkState(
lastClaimedNotNull >= range.getTo() - 1,
lastClaimedNotNull >= range.getRange().getTo() - 1,
"Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
lastClaimedNotNull,
range,
lastClaimedNotNull + 1,
range.getTo());
range.getRange().getTo());
}

@Override
public Progress getProgress() {
ComputeMessageStatsResponse stats =
this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
return Progress.from(byteCount, stats.getMessageBytes());
return Progress.from(range.getByteCount(), stats.getMessageBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,14 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
private final SubscriptionPartitionProcessorFactory processorFactory;
private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
offsetReaderFactory;
private final SerializableBiFunction<
SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
private final SerializableBiFunction<SubscriptionPartition, OffsetByteRange, TrackerWithProgress>
trackerFactory;
private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;

PerSubscriptionPartitionSdf(
Duration maxSleepTime,
SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
SerializableBiFunction<
SubscriptionPartition,
OffsetRange,
RestrictionTracker<OffsetRange, OffsetByteProgress>>
SerializableBiFunction<SubscriptionPartition, OffsetByteRange, TrackerWithProgress>
trackerFactory,
SubscriptionPartitionProcessorFactory processorFactory,
SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
Expand All @@ -68,7 +64,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In

@ProcessElement
public ProcessContinuation processElement(
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
@Element SubscriptionPartition subscriptionPartition,
OutputReceiver<SequencedMessage> receiver)
throws Exception {
Expand Down Expand Up @@ -103,8 +99,18 @@ public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscrip
}

@NewTracker
public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(
@Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
public TrackerWithProgress newTracker(
@Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) {
return trackerFactory.apply(subscriptionPartition, range);
}

@GetSize
public double getSize(
@Element SubscriptionPartition subscriptionPartition,
@Restriction OffsetByteRange restriction) {
if (restriction.getRange().getTo() != Long.MAX_VALUE) {
return restriction.getByteCount();
}
return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -71,7 +70,7 @@ private Subscriber newSubscriber(

private SubscriptionPartitionProcessor newPartitionProcessor(
SubscriptionPartition subscriptionPartition,
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver)
throws ApiException {
checkSubscription(subscriptionPartition);
Expand All @@ -81,13 +80,13 @@ private SubscriptionPartitionProcessor newPartitionProcessor(
consumer ->
newSubscriber(
subscriptionPartition.partition(),
Offset.of(tracker.currentRestriction().getFrom()),
Offset.of(tracker.currentRestriction().getRange().getFrom()),
consumer),
options.flowControlSettings());
}

private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
SubscriptionPartition subscriptionPartition, OffsetRange initial) {
private TrackerWithProgress newRestrictionTracker(
SubscriptionPartition subscriptionPartition, OffsetByteRange initial) {
checkSubscription(subscriptionPartition);
return new OffsetByteRangeTracker(
initial,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.Serializable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;

Expand All @@ -27,6 +26,6 @@ interface SubscriptionPartitionProcessorFactory extends Serializable {

SubscriptionPartitionProcessor newProcessor(
SubscriptionPartition subscriptionPartition,
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
Expand All @@ -45,7 +44,7 @@

class SubscriptionPartitionProcessorImpl extends Listener
implements SubscriptionPartitionProcessor {
private final RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
private final OutputReceiver<SequencedMessage> receiver;
private final Subscriber subscriber;
private final SettableFuture<Void> completionFuture = SettableFuture.create();
Expand All @@ -54,7 +53,7 @@ class SubscriptionPartitionProcessorImpl extends Listener

@SuppressWarnings("methodref.receiver.bound.invalid")
SubscriptionPartitionProcessorImpl(
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver,
Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
FlowControlSettings flowControlSettings) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;

public abstract class TrackerWithProgress
extends RestrictionTracker<OffsetByteRange, OffsetByteProgress> implements HasProgress {}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public void setUp() {
when(ticker.read()).thenReturn(0L);
tracker =
new OffsetByteRangeTracker(
RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
OffsetByteRange.of(RANGE, 0),
reader,
Stopwatch.createUnstarted(ticker),
Duration.millis(500),
MIN_BYTES);
}

@Test
Expand All @@ -85,11 +89,15 @@ public void getProgressStatsFailure() {
public void claimSplitSuccess() {
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
assertEquals(10_001, splits.getPrimary().getTo());
assertEquals(10_001, splits.getResidual().getFrom());
assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
OffsetByteRange primary = splits.getPrimary();
assertEquals(RANGE.getFrom(), primary.getRange().getFrom());
assertEquals(10_001, primary.getRange().getTo());
assertEquals(MIN_BYTES * 2, primary.getByteCount());
OffsetByteRange residual = splits.getResidual();
assertEquals(10_001, residual.getRange().getFrom());
assertEquals(Long.MAX_VALUE, residual.getRange().getTo());
assertEquals(0, residual.getByteCount());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
Expand All @@ -99,10 +107,10 @@ public void claimSplitSuccess() {
@SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
public void splitWithoutClaimEmpty() {
when(ticker.read()).thenReturn(100000000000000L);
SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
assertEquals(RANGE, splits.getResidual());
SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom());
assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo());
assertEquals(RANGE, splits.getResidual().getRange());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
Expand Down
Loading

0 comments on commit 1749ca9

Please sign in to comment.