Skip to content

Commit

Permalink
Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunn…
Browse files Browse the repository at this point in the history
…er (#26274)
  • Loading branch information
ryucc committed Apr 14, 2023
1 parent f1fba08 commit 78b5ffb
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
Expand All @@ -46,7 +44,6 @@
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.util.DoFnUtils;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
Expand Down Expand Up @@ -484,77 +481,6 @@ static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(
windowedValue.getPane()));
}

static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
private final AtomicBoolean collectorSealed;
private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;

FutureCollectorImpl() {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
collectorSealed = new AtomicBoolean(true);
}

@Override
public void add(CompletionStage<WindowedValue<OutT>> element) {
checkState(
!collectorSealed.get(),
"Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements.");

// We need synchronize guard against scenarios when watermark/finish bundle trigger outputs.
synchronized (this) {
outputFuture =
outputFuture.thenCombine(
element,
(collection, event) -> {
collection.add(event);
return collection;
});
}
}

@Override
public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) {
checkState(
!collectorSealed.get(),
"Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");

synchronized (this) {
outputFuture = FutureUtils.combineFutures(outputFuture, elements);
}
}

@Override
public void discard() {
collectorSealed.compareAndSet(false, true);

synchronized (this) {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
}
}

@Override
public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
/*
* We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
* and an empty collection will be returned.
*/
collectorSealed.compareAndSet(false, true);

synchronized (this) {
final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture;
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
return sealedOutputFuture;
}
}

@Override
public void prepare() {
boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
checkState(
isCollectorSealed,
"Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
}
}

/**
* Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
* emits values to the main output only, which is a single {@link
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.util.WindowedValue;

class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
private final AtomicBoolean collectorSealed;
private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;

FutureCollectorImpl() {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
collectorSealed = new AtomicBoolean(true);
}

@Override
public void add(CompletionStage<WindowedValue<OutT>> element) {
checkState(
!collectorSealed.get(),
"Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements.");

// We need synchronize guard against scenarios when watermark/finish bundle trigger outputs.
synchronized (this) {
outputFuture =
outputFuture.thenCombine(
element,
(collection, event) -> {
collection.add(event);
return collection;
});
}
}

@Override
public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) {
checkState(
!collectorSealed.get(),
"Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");

synchronized (this) {
outputFuture = FutureUtils.combineFutures(outputFuture, elements);
}
}

@Override
public void discard() {
collectorSealed.compareAndSet(false, true);

synchronized (this) {
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
}
}

@Override
public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
/*
* We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
* and an empty collection will be returned.
*/
collectorSealed.compareAndSet(false, true);

synchronized (this) {
final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture;
outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
return sealedOutputFuture;
}
}

@Override
public void prepare() {
boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
checkState(
isCollectorSealed,
"Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testKeyedOutputFutures() {
options.setNumThreadsForProcessElement(4);

final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
final FutureCollector<Void> futureCollector = new DoFnOp.FutureCollectorImpl<>();
final FutureCollector<Void> futureCollector = new FutureCollectorImpl<>();
futureCollector.prepare();

final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.junit.Before;
import org.junit.Test;

/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */
/** Unit tests for {@linkplain FutureCollectorImpl}. */
public final class FutureCollectorImplTest {
private static final List<String> RESULTS = ImmutableList.of("hello", "world");
private FutureCollector<String> futureCollector = new DoFnOp.FutureCollectorImpl<>();
private FutureCollector<String> futureCollector = new FutureCollectorImpl<>();

@Before
public void setup() {
futureCollector = new DoFnOp.FutureCollectorImpl<>();
futureCollector = new FutureCollectorImpl<>();
}

@Test(expected = IllegalStateException.class)
Expand Down

0 comments on commit 78b5ffb

Please sign in to comment.