diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/TemporalWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/TemporalWorkerRunFactory.java index 83f95970ea67..c52d3525d0ce 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/TemporalWorkerRunFactory.java @@ -43,6 +43,7 @@ import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalJobException; import io.airbyte.workers.temporal.TemporalJobType; +import io.temporal.failure.TemporalException; import java.nio.file.Path; public class TemporalWorkerRunFactory { @@ -122,7 +123,11 @@ private static TemporalJobType toTemporalJobType(ConfigType jobType) { private OutputAndStatus toOutputAndStatus(CheckedSupplier supplier) { try { return new OutputAndStatus<>(JobStatus.SUCCEEDED, supplier.get()); - } catch (TemporalJobException e) { + } catch (TemporalJobException | TemporalException e) { + // while from within the temporal activity we throw TemporalJobException, Temporal wraps any + // exception thrown by an activity in an ApplicationFailure exception, which gets wrapped in + // ActivityFailure exception which get wrapped in a WorkflowFailedException. We will need to unwrap + // these later, but for now we just catch the parent. return new OutputAndStatus<>(JobStatus.FAILED); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java index 0ff94dda29ea..cdefcf3b6c9b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java @@ -54,6 +54,7 @@ class WorkflowImpl implements CheckConnectionWorkflow { final ActivityOptions options = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofHours(1)) + .setRetryOptions(TemporalUtils.NO_RETRY) .build(); private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java index c9ec73ad8de2..fc49335e8e24 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java @@ -56,6 +56,7 @@ class WorkflowImpl implements DiscoverCatalogWorkflow { final ActivityOptions options = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofHours(2)) + .setRetryOptions(TemporalUtils.NO_RETRY) .build(); private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java index 9b8a8ea6b60f..90fbf4ec05f7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java @@ -51,6 +51,7 @@ class WorkflowImpl implements SpecWorkflow { final ActivityOptions options = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofHours(1)) + .setRetryOptions(TemporalUtils.NO_RETRY) .build(); private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java index d2ef36722857..9895886c88f3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java @@ -63,6 +63,7 @@ class WorkflowImpl implements SyncWorkflow { final ActivityOptions options = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(3)) + .setRetryOptions(TemporalUtils.NO_RETRY) .build(); private final SyncActivity activity = Workflow.newActivityStub(SyncActivity.class, options); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 571bc8cb3c5a..a7b6dffee736 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -27,6 +27,7 @@ import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import java.io.Serializable; @@ -43,6 +44,8 @@ public class TemporalUtils { public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE); + public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build(); + @FunctionalInterface public interface TemporalJobCreator { @@ -52,6 +55,7 @@ public interface TemporalJobCreator { public static WorkflowOptions getWorkflowOptions(TemporalJobType jobType) { return WorkflowOptions.newBuilder() + .setRetryOptions(NO_RETRY) .setTaskQueue(jobType.name()) .build(); }