Skip to content

Commit

Permalink
馃悰 fix issue where jobs retried forever (#2502)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Mar 17, 2021
1 parent 6c8d110 commit 30ddcd7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalJobException;
import io.airbyte.workers.temporal.TemporalJobType;
import io.temporal.failure.TemporalException;
import java.nio.file.Path;

public class TemporalWorkerRunFactory {
Expand Down Expand Up @@ -122,7 +123,11 @@ private static TemporalJobType toTemporalJobType(ConfigType jobType) {
private OutputAndStatus<JobOutput> toOutputAndStatus(CheckedSupplier<JobOutput, TemporalJobException> supplier) {
try {
return new OutputAndStatus<>(JobStatus.SUCCEEDED, supplier.get());
} catch (TemporalJobException e) {
} catch (TemporalJobException | TemporalException e) {
// while from within the temporal activity we throw TemporalJobException, Temporal wraps any
// exception thrown by an activity in an ApplicationFailure exception, which gets wrapped in
// ActivityFailure exception which get wrapped in a WorkflowFailedException. We will need to unwrap
// these later, but for now we just catch the parent.
return new OutputAndStatus<>(JobStatus.FAILED);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class WorkflowImpl implements CheckConnectionWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class WorkflowImpl implements DiscoverCatalogWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(2))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class WorkflowImpl implements SpecWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class WorkflowImpl implements SyncWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(3))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final SyncActivity activity = Workflow.newActivityStub(SyncActivity.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.io.Serializable;
Expand All @@ -43,6 +44,8 @@ public class TemporalUtils {

public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE);

public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build();

@FunctionalInterface
public interface TemporalJobCreator<T extends Serializable> {

Expand All @@ -52,6 +55,7 @@ public interface TemporalJobCreator<T extends Serializable> {

public static WorkflowOptions getWorkflowOptions(TemporalJobType jobType) {
return WorkflowOptions.newBuilder()
.setRetryOptions(NO_RETRY)
.setTaskQueue(jobType.name())
.build();
}
Expand Down

0 comments on commit 30ddcd7

Please sign in to comment.