Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunner #26274

Merged
merged 1 commit into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
Expand All @@ -46,7 +44,6 @@
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.util.DoFnUtils;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
Expand Down Expand Up @@ -484,77 +481,6 @@ static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(
windowedValue.getPane()));
}

static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
private final AtomicBoolean collectorSealed;
private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;

FutureCollectorImpl() {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
collectorSealed = new AtomicBoolean(true);
}

@Override
public void add(CompletionStage<WindowedValue<OutT>> element) {
checkState(
!collectorSealed.get(),
"Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements.");

// We need synchronize guard against scenarios when watermark/finish bundle trigger outputs.
synchronized (this) {
outputFuture =
outputFuture.thenCombine(
element,
(collection, event) -> {
collection.add(event);
return collection;
});
}
}

@Override
public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) {
checkState(
!collectorSealed.get(),
"Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");

synchronized (this) {
outputFuture = FutureUtils.combineFutures(outputFuture, elements);
}
}

@Override
public void discard() {
collectorSealed.compareAndSet(false, true);

synchronized (this) {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
}
}

@Override
public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
/*
* We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
* and an empty collection will be returned.
*/
collectorSealed.compareAndSet(false, true);

synchronized (this) {
final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture;
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
return sealedOutputFuture;
}
}

@Override
public void prepare() {
boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
checkState(
isCollectorSealed,
"Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
}
}

/**
* Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
* emits values to the main output only, which is a single {@link
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.util.WindowedValue;

class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
private final AtomicBoolean collectorSealed;
private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;

FutureCollectorImpl() {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
collectorSealed = new AtomicBoolean(true);
}

@Override
public void add(CompletionStage<WindowedValue<OutT>> element) {
checkState(
!collectorSealed.get(),
"Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements.");

// We need synchronize guard against scenarios when watermark/finish bundle trigger outputs.
synchronized (this) {
outputFuture =
outputFuture.thenCombine(
element,
(collection, event) -> {
collection.add(event);
return collection;
});
}
}

@Override
public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) {
checkState(
!collectorSealed.get(),
"Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");

synchronized (this) {
outputFuture = FutureUtils.combineFutures(outputFuture, elements);
}
}

@Override
public void discard() {
collectorSealed.compareAndSet(false, true);

synchronized (this) {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
}
}

@Override
public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
/*
* We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
* and an empty collection will be returned.
*/
collectorSealed.compareAndSet(false, true);

synchronized (this) {
final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture;
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
return sealedOutputFuture;
}
}

@Override
public void prepare() {
boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
checkState(
isCollectorSealed,
"Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testKeyedOutputFutures() {
options.setNumThreadsForProcessElement(4);

final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
final FutureCollector<Void> futureCollector = new DoFnOp.FutureCollectorImpl<>();
final FutureCollector<Void> futureCollector = new FutureCollectorImpl<>();
futureCollector.prepare();

final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.junit.Before;
import org.junit.Test;

/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */
/** Unit tests for {@linkplain FutureCollectorImpl}. */
public final class FutureCollectorImplTest {
private static final List<String> RESULTS = ImmutableList.of("hello", "world");
private FutureCollector<String> futureCollector = new DoFnOp.FutureCollectorImpl<>();
private FutureCollector<String> futureCollector = new FutureCollectorImpl<>();

@Before
public void setup() {
futureCollector = new DoFnOp.FutureCollectorImpl<>();
futureCollector = new FutureCollectorImpl<>();
}

@Test(expected = IllegalStateException.class)
Expand Down