Skip to content

Commit

Permalink
execute temporal workflows asynchronously (#3427)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 19, 2021
1 parent d2eb5c0 commit 01a7c6a
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
package io.airbyte.workers.temporal;

import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.workflow.Functions;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.ImmutablePair;

public class TemporalUtils {

Expand Down Expand Up @@ -83,4 +88,31 @@ public static JobRunConfig createJobRunConfig(String jobId, int attemptId) {
.withAttemptId((long) attemptId);
}

/**
* Allows running a given temporal workflow stub asynchronously. This method only works for
* workflows that take one argument. Because of the iface that Temporal supplies, in order to handle
* other method signatures, if we need to support them, we will need to add another helper with that
* number of args. For a reference on how Temporal recommends to do this see their docs:
* https://docs.temporal.io/docs/java/workflows#asynchronous-start
*
* @param workflowStub - workflow stub to be executed
* @param function - function on the workflow stub to be executed
* @param arg1 - argument to be supplied to the workflow function
* @param outputType - class of the output type of the workflow function
* @param <STUB> - type of the workflow stub
* @param <A1> - type of the argument of the workflow stub
* @param <R> - type of the return of the workflow stub
* @return pair of the workflow execution (contains metadata on the asynchronously running job) and
* future that can be used to await the result of the workflow stub's function
*/
public static <STUB, A1, R> ImmutablePair<WorkflowExecution, CompletableFuture<R>> asyncExecute(STUB workflowStub,
Functions.Func1<A1, R> function,
A1 arg1,
Class<R> outputType) {
final WorkflowStub untyped = WorkflowStub.fromTyped(workflowStub);
final WorkflowExecution workflowExecution = WorkflowClient.start(function, arg1);
final CompletableFuture<R> resultAsync = untyped.getResultAsync(outputType);
return ImmutablePair.of(workflowExecution, resultAsync);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.workers.temporal;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

import io.airbyte.commons.concurrency.VoidCallable;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TemporalUtilsTest {

private static final String TASK_QUEUE = "default";

@Test
void testAsyncExecute() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);

final VoidCallable callable = mock(VoidCallable.class);

// force it to wait until we can verify that it is running.
doAnswer((a) -> {
countDownLatch.await(1, TimeUnit.MINUTES);
return null;
}).when(callable).call();

final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();
final WorkflowServiceStubs temporalService = testEnv.getWorkflowService();
final Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(TestWorkflow.WorkflowImpl.class);
final WorkflowClient client = testEnv.getWorkflowClient();
worker.registerActivitiesImplementations(new TestWorkflow.Activity1Impl(callable));
testEnv.start();

final TestWorkflow workflowStub = client.newWorkflowStub(TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
final ImmutablePair<WorkflowExecution, CompletableFuture<String>> pair = TemporalUtils.asyncExecute(
workflowStub,
workflowStub::run,
"whatever",
String.class);

final WorkflowExecution workflowExecution = pair.getLeft();
final String workflowId = workflowExecution.getWorkflowId();
final String runId = workflowExecution.getRunId();

final WorkflowExecutionInfo workflowExecutionInfo = temporalService.blockingStub().listOpenWorkflowExecutions(null).getExecutionsList().get(0);
assertEquals(workflowId, workflowExecutionInfo.getExecution().getWorkflowId());
assertEquals(runId, workflowExecutionInfo.getExecution().getRunId());

// allow the workflow to complete.
countDownLatch.countDown();

final String result = pair.getRight().get(1, TimeUnit.MINUTES);
assertEquals("completed", result);
}

@WorkflowInterface
public interface TestWorkflow {

@WorkflowMethod
String run(String arg);

class WorkflowImpl implements TestWorkflow {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowImpl.class);

private final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(3))
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();

private final Activity1 activity1 = Workflow.newActivityStub(Activity1.class, options);
private final Activity1 activity2 = Workflow.newActivityStub(Activity1.class, options);

@Override
public String run(String arg) {
LOGGER.info("workflow before activity 1");
activity1.activity();
LOGGER.info("workflow before activity 2");
activity2.activity();
LOGGER.info("workflow after all activities");

return "completed";
}

}

@ActivityInterface
interface Activity1 {

@ActivityMethod
void activity();

}

class Activity1Impl implements Activity1 {

private static final Logger LOGGER = LoggerFactory.getLogger(io.airbyte.workers.temporal.TestWorkflow.Activity1Impl.class);
private static final String ACTIVITY1 = "activity1";

private final VoidCallable callable;

public Activity1Impl(VoidCallable callable) {
this.callable = callable;
}

public void activity() {
LOGGER.info("before: {}", ACTIVITY1);
try {
callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
LOGGER.info("before: {}", ACTIVITY1);
}

}

}

}

0 comments on commit 01a7c6a

Please sign in to comment.