Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5605] Add support for channel splitting to the gRPC read "source" and propagate "split" calls to the downstream receiver #10501

Merged
merged 3 commits into from Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,19 +17,24 @@
*/
package org.apache.beam.fn.harness;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.HandlesSplits.SplitResult;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest.DesiredSplit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand All @@ -47,6 +52,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -131,6 +137,11 @@ public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
private final BeamFnDataClient beamFnDataClient;
private final Coder<WindowedValue<OutputT>> coder;

private final Object splittingLock = new Object();
// 0-based index of the current element being processed
private long index = -1;
// 0-based index of the first element to not process, aka the first element of the residual
private long stopIndex = Long.MAX_VALUE;
private InboundDataClient readFuture;

BeamFnDataReadRunner(
Expand Down Expand Up @@ -170,7 +181,109 @@ public void registerInputLocation() {
apiServiceDescriptor,
LogicalEndpoint.of(processBundleInstructionIdSupplier.get(), pTransformId),
coder,
consumer);
this::forwardElementToConsumer);
}

public void forwardElementToConsumer(WindowedValue<OutputT> element) throws Exception {
synchronized (splittingLock) {
if (index == stopIndex - 1) {
return;
}
index += 1;
}
consumer.accept(element);
}

public void split(
ProcessBundleSplitRequest request, ProcessBundleSplitResponse.Builder response) {
DesiredSplit desiredSplit = request.getDesiredSplitsMap().get(pTransformId);
if (desiredSplit == null) {
return;
}

long totalBufferSize = desiredSplit.getEstimatedInputElements();

HandlesSplits splittingConsumer = null;
if (consumer instanceof HandlesSplits) {
splittingConsumer = ((HandlesSplits) consumer);
}

synchronized (splittingLock) {
// Since we hold the splittingLock, we guarantee that we will not pass the next element
// to the downstream consumer. We still have a race where the downstream consumer may
// have yet to see the element or has completed processing the element by the time
// we ask it to split (even after we have asked for its progress).

// If the split request we received was delayed and is less then the known number of elements
// then use "index + 1" as the total size. Similarly, if we have already split and the
// split request is bounded incorrectly, use the stop index as the upper bound.
if (totalBufferSize < index + 1) {
totalBufferSize = index + 1;
} else if (totalBufferSize > stopIndex) {
totalBufferSize = stopIndex;
}

// In the case where we have yet to process an element, set the current element progress to 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an else clause below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its logically the same where the else clause is the default based upon what we initialize.

double currentElementProgress = 1;

// If we have started processing at least one element, attempt to get the downstream
// progress defaulting to 0.5 if no progress was able to get fetched.
if (index >= 0) {
if (splittingConsumer != null) {
currentElementProgress = splittingConsumer.getProgress();
} else {
currentElementProgress = 0.5;
}
}

checkArgument(
desiredSplit.getAllowedSplitPointsList().isEmpty(),
"TODO: BEAM-3836, support split point restrictions.");

// Now figure out where to split.
//
// The units here (except for keepOfElementRemainder) are all in terms of number or
// (possibly fractional) elements.

// Compute the amount of "remaining" work that we know of.
double remainder = totalBufferSize - index - currentElementProgress;
// Compute the number of elements (including fractional elements) that we should "keep".
double keep = remainder * desiredSplit.getFractionOfRemainder();

// If the downstream operator says the progress is less than 1 then the element could be
// splittable.
if (currentElementProgress < 1) {
// See if the amount we need to keep falls within the current element's remainder and if
// so, attempt to split it.
double keepOfElementRemainder = keep / (1 - currentElementProgress);
if (keepOfElementRemainder < 1) {
SplitResult splitResult =
splittingConsumer != null ? splittingConsumer.trySplit(keepOfElementRemainder) : null;
if (splitResult != null) {
stopIndex = index + 1;
response
.addPrimaryRoots(splitResult.getPrimaryRoot())
.addResidualRoots(splitResult.getResidualRoot())
.addChannelSplitsBuilder()
.setLastPrimaryElement(index - 1)
.setFirstResidualElement(stopIndex);
return;
}
}
}

// Otherwise, split at the closest element boundary.
int newStopIndex =
Ints.checkedCast(index + Math.max(1, Math.round(currentElementProgress + keep)));
if (newStopIndex < stopIndex) {
stopIndex = newStopIndex;
response
.addChannelSplitsBuilder()
.setLastPrimaryElement(stopIndex - 1)
.setFirstResidualElement(stopIndex);
return;
}
}
}

public void blockTillReadFinishes() throws Exception {
Expand Down
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness;

import com.google.auto.value.AutoValue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;

public interface HandlesSplits {
SplitResult trySplit(double fractionOfRemainder);

double getProgress();

@AutoValue
abstract class SplitResult {
public static SplitResult of(
BeamFnApi.BundleApplication primaryRoot, BeamFnApi.DelayedBundleApplication residualRoot) {
return new AutoValue_HandlesSplits_SplitResult(primaryRoot, residualRoot);
}

public abstract BeamFnApi.BundleApplication getPrimaryRoot();

public abstract BeamFnApi.DelayedBundleApplication getResidualRoot();
}
}

This file was deleted.

This file was deleted.