Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract BundleManager to an Interface in SamzaRunner #26268

Merged
merged 5 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,317 +17,18 @@
*/
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
* proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
* least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
* bundles have completed.
*
* <p>A bundle is considered complete only when the outputs corresponding to each element in the
* bundle have been resolved and the watermark associated with the bundle(if any) is propagated
* downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
* In case of synchronous ParDo, outputs of the element is resolved immediately after the
* processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
* all the future emitted by the processElement is resolved.
*
* <p>This class is not thread safe and the current implementation relies on the assumption that
* messages are dispatched to BundleManager in a single threaded mode.
*
* @param <OutT> output type of the {@link DoFnOp}
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BundleManager<OutT> {
private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;

private final long maxBundleSize;
private final long maxBundleTimeMs;
private final BundleProgressListener<OutT> bundleProgressListener;
private final FutureCollector<OutT> futureCollector;
private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
private final String bundleCheckTimerId;

// Number elements belonging to the current active bundle
private transient AtomicLong currentBundleElementCount;
// Number of bundles that are in progress but not yet finished
private transient AtomicLong pendingBundleCount;
// Denotes the start time of the current active bundle
private transient AtomicLong bundleStartTime;
// Denotes if there is an active in progress bundle. Note at a given time, we can have multiple
// bundle in progress.
// This flag denotes if there is a bundle that is current and hasn't been closed.
private transient AtomicBoolean isBundleStarted;
// Holder for watermark which gets propagated when the bundle is finished.
private transient Instant bundleWatermarkHold;
// A future that is completed once all futures belonging to the current active bundle are
// completed. The value is null if there are no futures in the current active bundle.
private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
private transient CompletionStage<Void> watermarkFuture;

public BundleManager(
BundleProgressListener<OutT> bundleProgressListener,
FutureCollector<OutT> futureCollector,
long maxBundleSize,
long maxBundleTimeMs,
Scheduler<KeyedTimerData<Void>> bundleTimerScheduler,
String bundleCheckTimerId) {
this.maxBundleSize = maxBundleSize;
this.maxBundleTimeMs = maxBundleTimeMs;
this.bundleProgressListener = bundleProgressListener;
this.bundleTimerScheduler = bundleTimerScheduler;
this.bundleCheckTimerId = bundleCheckTimerId;
this.futureCollector = futureCollector;

if (maxBundleSize > 1) {
scheduleNextBundleCheck();
}

// instance variable initialization for bundle tracking
this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
this.currentBundleElementCount = new AtomicLong(0L);
this.isBundleStarted = new AtomicBoolean(false);
this.pendingBundleCount = new AtomicLong(0L);
this.watermarkFuture = CompletableFuture.completedFuture(null);
}

/*
* Schedule in processing time to check whether the current bundle should be closed. Note that
* we only approximately achieve max bundle time by checking as frequent as half of the max bundle
* time set by users. This would violate the max bundle time by up to half of it but should
* acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
*/
private void scheduleNextBundleCheck() {
final Instant nextBundleCheckTime =
Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
final TimerInternals.TimerData timerData =
TimerInternals.TimerData.of(
this.bundleCheckTimerId,
StateNamespaces.global(),
nextBundleCheckTime,
nextBundleCheckTime,
TimeDomain.PROCESSING_TIME);
bundleTimerScheduler.schedule(
new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
}

void tryStartBundle() {
futureCollector.prepare();

if (isBundleStarted.compareAndSet(false, true)) {
LOG.debug("Starting a new bundle.");
// make sure the previous bundle is sealed and futures are cleared
Preconditions.checkArgument(
currentActiveBundleDoneFutureReference.get() == null,
"Current active bundle done future should be null before starting a new bundle.");
bundleStartTime.set(System.currentTimeMillis());
pendingBundleCount.incrementAndGet();
bundleProgressListener.onBundleStarted();
}

currentBundleElementCount.incrementAndGet();
}

void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
// propagate watermark immediately if no bundle is in progress and all the previous bundles have
// completed.
if (!isBundleStarted() && pendingBundleCount.get() == 0) {
LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark);
bundleProgressListener.onWatermark(watermark, emitter);
return;
}

// hold back the watermark since there is either a bundle in progress or previously closed
// bundles are unfinished.
this.bundleWatermarkHold = watermark;

// for batch mode, the max watermark should force the bundle to close
if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
/*
* Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark
* downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark.
* If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures.
*/
if (isBundleStarted()) {
LOG.info(
"Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
tryFinishBundle(emitter);
watermarkFuture.toCompletableFuture().join();
} else {
LOG.info(
"Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
watermarkFuture.toCompletableFuture().join();
bundleProgressListener.onWatermark(watermark, emitter);
}
}
}

void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
// this is internal timer in processing time to check whether a bundle should be closed
if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
tryFinishBundle(emitter);
scheduleNextBundleCheck();
}
}

/**
* Signal the bundle manager to handle failure. We discard the output collected as part of
* processing the current element and reset the bundle count.
*
* @param t failure cause
*/
void signalFailure(Throwable t) {
LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
futureCollector.discard();
// reset the bundle start flag only if the bundle has started
isBundleStarted.compareAndSet(true, false);
public interface BundleManager<OutT> {
void tryStartBundle();

// bundle start may not necessarily mean we have actually started the bundle since some of the
// invariant check conditions within bundle start could throw exceptions. so rely on bundle
// start time
if (bundleStartTime.get() != Long.MAX_VALUE) {
currentBundleElementCount.set(0L);
bundleStartTime.set(Long.MAX_VALUE);
pendingBundleCount.decrementAndGet();
currentActiveBundleDoneFutureReference.set(null);
}
}

void tryFinishBundle(OpEmitter<OutT> emitter) {

// we need to seal the output for each element within a bundle irrespective of the whether we
// decide to finish the
// bundle or not
CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = futureCollector.finish();

if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
LOG.debug("Finishing the current bundle.");

// reset the bundle count
// seal the bundle and emit the result future (collection of results)
// chain the finish bundle invocation on the finish bundle
currentBundleElementCount.set(0L);
bundleStartTime.set(Long.MAX_VALUE);
Instant watermarkHold = bundleWatermarkHold;
bundleWatermarkHold = null;

CompletionStage<Void> currentActiveBundleDoneFuture =
currentActiveBundleDoneFutureReference.get();
outputFuture =
outputFuture.thenCombine(
currentActiveBundleDoneFuture != null
? currentActiveBundleDoneFuture
: CompletableFuture.completedFuture(null),
(res, ignored) -> {
bundleProgressListener.onBundleFinished(emitter);
return res;
});

BiConsumer<Collection<WindowedValue<OutT>>, Void> watermarkPropagationFn;
if (watermarkHold == null) {
watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet();
} else {
watermarkPropagationFn =
(ignored, res) -> {
LOG.debug("Propagating watermark: {} to downstream.", watermarkHold);
bundleProgressListener.onWatermark(watermarkHold, emitter);
pendingBundleCount.decrementAndGet();
};
}
void processWatermark(Instant watermark, OpEmitter<OutT> emitter);

// We chain the current watermark emission with previous watermark and the output futures
// since bundles can finish out of order but we still want the watermark to be emitted in
// order.
watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn);
currentActiveBundleDoneFutureReference.set(null);
} else if (isBundleStarted.get()) {
final CompletableFuture<Collection<WindowedValue<OutT>>> finalOutputFuture =
outputFuture.toCompletableFuture();
currentActiveBundleDoneFutureReference.updateAndGet(
maybePrevFuture -> {
CompletableFuture<Void> prevFuture =
maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter);

return CompletableFuture.allOf(prevFuture, finalOutputFuture);
});
}
void signalFailure(Throwable t);

// emit the future to the propagate it to rest of the DAG
emitter.emitFuture(outputFuture);
}

@VisibleForTesting
long getCurrentBundleElementCount() {
return currentBundleElementCount.longValue();
}

@VisibleForTesting
@Nullable
CompletionStage<Void> getCurrentBundleDoneFuture() {
return currentActiveBundleDoneFutureReference.get();
}

@VisibleForTesting
void setCurrentBundleDoneFuture(CompletableFuture<Void> currentBundleResultFuture) {
this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
}

@VisibleForTesting
long getPendingBundleCount() {
return pendingBundleCount.longValue();
}

@VisibleForTesting
void setPendingBundleCount(long value) {
pendingBundleCount.set(value);
}

@VisibleForTesting
boolean isBundleStarted() {
return isBundleStarted.get();
}

@VisibleForTesting
void setBundleWatermarkHold(Instant watermark) {
this.bundleWatermarkHold = watermark;
}

/**
* We close the current bundle in progress if one of the following criteria is met 1. The bundle
* count &ge; maxBundleSize 2. Time elapsed since the bundle started is &ge; maxBundleTimeMs 3.
* Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs
*
* @return true - if one of the criteria above is satisfied; false - otherwise
*/
private boolean shouldFinishBundle() {
return isBundleStarted.get()
&& (currentBundleElementCount.get() >= maxBundleSize
|| System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
|| BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
}
void tryFinishBundle(OpEmitter<OutT> emitter);

/**
* A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
Expand All @@ -339,7 +40,7 @@ private boolean shouldFinishBundle() {
*
* @param <OutT>
*/
public interface BundleProgressListener<OutT> {
interface BundleProgressListener<OutT> {
void onBundleStarted();

void onBundleFinished(OpEmitter<OutT> emitter);
Expand Down
Loading