Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,13 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction
throw new RuntimeException(e);
}
});
PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry();
ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient();

try {
PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
PTransformFunctionRegistry finishFunctionRegistry =
bundleProcessor.getFinishFunctionRegistry();
ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient();

try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) {
try (Closeable closeTracker = stateTracker.activate()) {
// Already in reverse topological order so we don't need to do anything.
Expand Down Expand Up @@ -354,14 +355,16 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction
response.setRequiresFinalization(true);
}
}

// Mark the bundle processor as re-usable.
bundleProcessorCache.release(
request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
} catch (Exception e) {
bundleProcessorCache.release(
request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
// Make sure we clean-up from the active set of bundle processors.
bundleProcessorCache.discard(bundleProcessor);
throw e;
}
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
}

public BeamFnApi.InstructionResponse.Builder progress(BeamFnApi.InstructionRequest request)
Expand Down Expand Up @@ -648,13 +651,12 @@ public BundleProcessor find(String instructionId) {
}

/**
* Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} will be reset before
* being added to the cache and will be marked as inactive.
* Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} will be marked as
* inactive and reset before being added to the cache.
*/
void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
activeBundleProcessors.remove(bundleProcessor.getInstructionId());
try {
bundleProcessor.setInstructionId(null);
bundleProcessor.reset();
cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
} catch (Exception e) {
Expand All @@ -665,6 +667,11 @@ void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
}
}

/** Discard an active {@link BundleProcessor} instead of being re-used. */
void discard(BundleProcessor bundleProcessor) {
activeBundleProcessors.remove(bundleProcessor.getInstructionId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do bundleProcessor.setInstructionId(null); before discarding it to avoid the same race condition as reset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unnecessary since reset will not be invoked.

}

/** Shutdown all the cached {@link BundleProcessor}s, running the tearDown() functions. */
void shutdown() throws Exception {
cachedBundleProcessors.invalidateAll();
Expand Down Expand Up @@ -742,6 +749,7 @@ synchronized void setInstructionId(String instructionId) {
}

void reset() throws Exception {
this.instructionId = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this broke Java PreCommit (it looks like this was submitted while precommits weren't running). Running .gradlew :sdks:java:harness:spotbugsMain on master right now yields:

Inconsistent synchronization of org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessor.instructionId; locked 66% of time                                                                                               
                                                                                
                                                                                
Bug type IS2_INCONSISTENT_SYNC (click for details)                              
In class org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessor
Field org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessor.instructionId
Synchronized 66% of the time                                                    
Unsynchronized access at ProcessBundleHandler.java:[line 752]                   
Synchronized access at ProcessBundleHandler.java:[line 748]                     
Synchronized access at ProcessBundleHandler.java:[line 744] 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #15085

getStartFunctionRegistry().reset();
getFinishFunctionRegistry().reset();
getSplitListener().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.argThat;
Expand Down Expand Up @@ -99,9 +101,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
Expand All @@ -119,8 +119,6 @@ public class ProcessBundleHandlerTest {
private static final String DATA_INPUT_URN = "beam:runner:source:v1";
private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";

@Rule public ExpectedException thrown = ExpectedException.none();

@Mock private BeamFnDataClient beamFnDataClient;
@Captor private ArgumentCaptor<ThrowingConsumer<Exception, WindowedValue<String>>> consumerCaptor;

Expand Down Expand Up @@ -644,6 +642,14 @@ public void testBundleProcessorIsFoundWhenActive() {
// After it is released, ensure the bundle processor is no longer found
cache.release("descriptorId", bundleProcessor);
assertNull(cache.find("known"));

// Once it is active, ensure the bundle processor is found
cache.get("descriptorId", "known", () -> bundleProcessor);
assertSame(bundleProcessor, cache.find("known"));

// After it is discarded, ensure the bundle processor is no longer found
cache.discard(bundleProcessor);
assertNull(cache.find("known"));
}

@Test
Expand Down Expand Up @@ -676,6 +682,7 @@ public void testBundleProcessorReset() throws Exception {
bundleFinalizationCallbacks);

bundleProcessor.reset();
assertNull(bundleProcessor.getInstructionId());
verify(startFunctionRegistry, times(1)).reset();
verify(finishFunctionRegistry, times(1)).reset();
verify(splitListener, times(1)).clear();
Expand Down Expand Up @@ -727,16 +734,19 @@ public void testCreatingPTransformExceptionsArePropagated() throws Exception {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("TestException");
throw new IllegalStateException("TestException");
}),
new BundleProcessorCache());
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
.build());
assertThrows(
"TestException",
IllegalStateException.class,
() ->
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId("1L"))
.build()));
}

@Test
Expand Down Expand Up @@ -813,7 +823,7 @@ public void testBundleFinalizationIsPropagated() throws Exception {
}

@Test
public void testPTransformStartExceptionsArePropagated() throws Exception {
public void testPTransformStartExceptionsArePropagated() {
BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
BeamFnApi.ProcessBundleDescriptor.newBuilder()
.putTransforms(
Expand Down Expand Up @@ -854,23 +864,24 @@ public void testPTransformStartExceptionsArePropagated() throws Exception {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("TestException");
startFunctionRegistry.register(
pTransformId, ProcessBundleHandlerTest::throwException);
return null;
}),
new BundleProcessorCache());
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
.build());

assertThrows(
"TestException",
IllegalStateException.class,
() ->
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId("1L"))
.build()));
// BundleProcessor is not re-added back to the BundleProcessorCache in case of an exception
// during bundle processing
assertThat(
handler.bundleProcessorCache.getCachedBundleProcessors(), equalTo(Collections.EMPTY_MAP));
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
}

@Test
Expand Down Expand Up @@ -915,23 +926,25 @@ public void testPTransformFinishExceptionsArePropagated() throws Exception {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("TestException");
finishFunctionRegistry.register(
pTransformId, ProcessBundleHandlerTest::throwException);
return null;
}),
new BundleProcessorCache());
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
.build());
assertThrows(
"TestException",
IllegalStateException.class,
() ->
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId("1L"))
.build()));

// BundleProcessor is not re-added back to the BundleProcessorCache in case of an exception
// during bundle processing
assertThat(
handler.bundleProcessorCache.getCachedBundleProcessors(), equalTo(Collections.EMPTY_MAP));
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
}

@Test
Expand Down Expand Up @@ -1089,19 +1102,22 @@ public Object createRunnerForPTransform(
}

private void doStateCalls(BeamFnStateClient beamFnStateClient) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("State API calls are unsupported");
beamFnStateClient.handle(
StateRequest.newBuilder().setInstructionId("SUCCESS"),
new CompletableFuture<>());
}
}),
new BundleProcessorCache());
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
.build());
assertThrows(
"State API calls are unsupported",
IllegalStateException.class,
() ->
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId("1L"))
.build()));
}

@Test
Expand Down Expand Up @@ -1155,20 +1171,23 @@ public Object createRunnerForPTransform(
}

private void doTimerRegistrations(BeamFnTimerClient beamFnTimerClient) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Timers are unsupported");
beamFnTimerClient.register(
LogicalEndpoint.timer("1L", "2L", "Timer"),
Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE),
(timer) -> {});
}
}),
new BundleProcessorCache());
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
.build());
assertThrows(
"Timers are unsupported",
IllegalStateException.class,
() ->
handler.processBundle(
BeamFnApi.InstructionRequest.newBuilder()
.setProcessBundle(
BeamFnApi.ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId("1L"))
.build()));
}

private static void throwException() {
Expand Down