Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/
package org.apache.beam.runners.fnexecution.jobsubmission;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
Expand Down Expand Up @@ -89,17 +90,34 @@ public synchronized void start() {
@Override
public void onSuccess(@Nullable PipelineResult pipelineResult) {
if (pipelineResult != null) {
checkArgument(
pipelineResult.getState() == PipelineResult.State.DONE,
"Success on non-Done state: " + pipelineResult.getState());
setState(JobState.Enum.DONE);
switch (pipelineResult.getState()) {
case DONE:
setState(Enum.DONE);
break;
case RUNNING:
setState(Enum.RUNNING);
break;
case CANCELLED:
setState(Enum.CANCELLED);
break;
case FAILED:
setState(Enum.FAILED);
break;
default:
setState(JobState.Enum.UNSPECIFIED);
}
} else {
setState(JobState.Enum.UNSPECIFIED);
}
}

@Override
public void onFailure(Throwable throwable) {
public void onFailure(@Nonnull Throwable throwable) {
if (throwable instanceof CancellationException) {
// We have canceled execution, just update the job state
setState(JobState.Enum.CANCELLED);
return;
}
String message = String.format("Error during job invocation %s.", getId());
LOG.error(message, throwable);
sendMessage(
Expand Down Expand Up @@ -133,9 +151,12 @@ public synchronized void cancel() {
new FutureCallback<PipelineResult>() {
@Override
public void onSuccess(@Nullable PipelineResult pipelineResult) {
if (pipelineResult != null) {
// Do not cancel when we are already done.
if (pipelineResult != null
&& pipelineResult.getState() != PipelineResult.State.DONE) {
try {
pipelineResult.cancel();
setState(JobState.Enum.CANCELLED);
} catch (IOException exn) {
throw new RuntimeException(exn);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.jobsubmission;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/** Tests for {@link JobInvocation}. */
public class JobInvocationTest {

private static ExecutorService executorService;

private JobInvocation jobInvocation;
private ControllablePipelineRunner runner;

@BeforeClass
public static void init() {
executorService = Executors.newFixedThreadPool(1);
}

@AfterClass
public static void shutdown() {
executorService.shutdownNow();
executorService = null;
}

@Before
public void setup() {
JobInfo jobInfo =
JobInfo.create("jobid", "jobName", "retrievalToken", Struct.getDefaultInstance());
ListeningExecutorService listeningExecutorService =
MoreExecutors.listeningDecorator(executorService);
Pipeline pipeline = Pipeline.create();
runner = new ControllablePipelineRunner();
jobInvocation =
new JobInvocation(
jobInfo, listeningExecutorService, PipelineTranslation.toProto(pipeline), runner);
}

@Test(timeout = 10_000)
public void testStateAfterCompletion() throws Exception {
jobInvocation.start();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING));

TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DONE);
runner.setResult(pipelineResult);

awaitJobState(jobInvocation, JobApi.JobState.Enum.DONE);
}

@Test(timeout = 10_000)
public void testStateAfterCompletionWithoutResult() throws Exception {
jobInvocation.start();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING));

// Let pipeline finish without a result.
runner.setResult(null);

awaitJobState(jobInvocation, JobApi.JobState.Enum.UNSPECIFIED);
}

@Test(timeout = 10_000)
public void testStateAfterCancellation() throws Exception {
jobInvocation.start();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING));

jobInvocation.cancel();
awaitJobState(jobInvocation, JobApi.JobState.Enum.CANCELLED);
}

@Test(timeout = 10_000)
public void testStateAfterCancellationWithPipelineResult() throws Exception {
jobInvocation.start();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING));

TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.FAILED);
runner.setResult(pipelineResult);
awaitJobState(jobInvocation, JobApi.JobState.Enum.FAILED);

jobInvocation.cancel();
pipelineResult.cancelLatch.await();
}

@Test(timeout = 10_000)
public void testNoCancellationWhenDone() throws Exception {
jobInvocation.start();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING));

TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DONE);
runner.setResult(pipelineResult);
awaitJobState(jobInvocation, JobApi.JobState.Enum.DONE);

jobInvocation.cancel();
assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.DONE));
// Ensure that cancel has not been called
assertThat(pipelineResult.cancelLatch.getCount(), is(1L));
}

private static void awaitJobState(JobInvocation jobInvocation, JobApi.JobState.Enum jobState)
throws Exception {
while (jobInvocation.getState() != jobState) {
Thread.sleep(50);
}
}

private static class ControllablePipelineRunner implements PortablePipelineRunner {

private final CountDownLatch latch = new CountDownLatch(1);
private volatile PipelineResult result;

@Override
public PipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
latch.await();
return result;
}

void setResult(PipelineResult pipelineResult) {
result = pipelineResult;
latch.countDown();
}
}

private static class TestPipelineResult implements PipelineResult {

private final State state;
private final CountDownLatch cancelLatch = new CountDownLatch(1);

private TestPipelineResult(State state) {
this.state = state;
}

@Override
public State getState() {
return state;
}

@Override
public State cancel() {
cancelLatch.countDown();
return State.CANCELLED;
}

@Override
public State waitUntilFinish(Duration duration) {
return null;
}

@Override
public State waitUntilFinish() {
return null;
}

@Override
public MetricResults metrics() {
return null;
}
}
}