diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
index 2cfd994cb029..821b123d6787 100644
--- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
+++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
@@ -75,12 +75,12 @@ public interface JobPersistence {
//
/**
- * Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a
- * terminal state.
+ * Create a new attempt for a job and return its attempt number. Throws
+ * {@link IllegalStateException} if the job is already in a terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
- * @return id of the attempt
+ * @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
index 89786888ddb3..28ff4def3e9a 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
@@ -23,6 +23,7 @@
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
+import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
@@ -108,7 +109,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
}
}
- private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput connectionUpdaterInput) {
+ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) {
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();
@@ -122,7 +123,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
// resetConnection flag to the next run so that that run can execute the actual reset
workflowState.setResetConnection(connectionUpdaterInput.isResetConnection());
- Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());
+ final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());
Workflow.await(timeToWait,
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());
@@ -189,7 +190,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
});
}
- private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
+ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
@@ -199,7 +200,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput,
resetNewConnectionInput(connectionUpdaterInput);
}
- private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
+ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
@@ -288,8 +289,8 @@ public WorkflowState getState() {
@Override
public JobInformation getJobInformation() {
- Long jobId = workflowInternalState.getJobId();
- Integer attemptId = workflowInternalState.getAttemptId();
+ final Long jobId = workflowInternalState.getJobId();
+ final Integer attemptId = workflowInternalState.getAttemptId();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
@@ -297,8 +298,8 @@ public JobInformation getJobInformation() {
@Override
public QuarantinedInformation getQuarantinedInformation() {
- Long jobId = workflowInternalState.getJobId();
- Integer attemptId = workflowInternalState.getAttemptId();
+ final Long jobId = workflowInternalState.getJobId();
+ final Integer attemptId = workflowInternalState.getAttemptId();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
@@ -335,10 +336,10 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn
*
* We aimed to use this method for call of the temporal activity.
*/
- private OUTPUT runMandatoryActivityWithOutput(Function mapper, INPUT input) {
+ private OUTPUT runMandatoryActivityWithOutput(final Function mapper, final INPUT input) {
try {
return mapper.apply(input);
- } catch (Exception e) {
+ } catch (final Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setQuarantined(true);
workflowState.setRetryFailedActivity(false);
@@ -353,7 +354,7 @@ private OUTPUT runMandatoryActivityWithOutput(Function void runMandatoryActivity(Consumer consumer, INPUT input) {
+ private void runMandatoryActivity(final Consumer consumer, final INPUT input) {
runMandatoryActivityWithOutput((inputInternal) -> {
consumer.accept(inputInternal);
return null;
@@ -369,7 +370,7 @@ private void runMandatoryActivity(Consumer consumer, INPUT input)
*
* Wait time is infinite If the workflow is manual or disabled since we never want to schedule this.
*/
- private Duration getTimeToWait(UUID connectionId) {
+ private Duration getTimeToWait(final UUID connectionId) {
// Scheduling
final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(connectionId);
@@ -402,14 +403,26 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
/**
* Create a new attempt for a given jobId
*/
- private Integer createAttemptId(long jobId) {
- final AttemptCreationOutput attemptCreationOutput =
+ private Integer createAttemptId(final long jobId) {
+ final int currentVersion = 1;
+ final int attemptCreationVersion = Workflow.getVersion("rename_attempt_id_to_number", Workflow.DEFAULT_VERSION, currentVersion);
+
+ // Retrieve the attempt number but name it attempt id
+ if (attemptCreationVersion < currentVersion) {
+ final AttemptCreationOutput attemptCreationOutput =
+ runMandatoryActivityWithOutput(
+ jobCreationAndStatusUpdateActivity::createNewAttempt,
+ new AttemptCreationInput(
+ jobId));
+ return attemptCreationOutput.getAttemptId();
+ }
+
+ final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
- jobCreationAndStatusUpdateActivity::createNewAttempt,
+ jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
new AttemptCreationInput(
jobId));
-
- return attemptCreationOutput.getAttemptId();
+ return attemptNumberCreationOutput.getAttemptNumber();
}
/**
@@ -417,8 +430,8 @@ private Integer createAttemptId(long jobId) {
* job and will generate a different output if the job is a sync or a reset.
*/
private GeneratedJobInput getJobInput() {
- Long jobId = workflowInternalState.getJobId();
- Integer attemptId = workflowInternalState.getAttemptId();
+ final Long jobId = workflowInternalState.getJobId();
+ final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
jobId,
@@ -456,8 +469,8 @@ private void reportJobStarting() {
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999
*/
- private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
- int taskQueueChangeVersion =
+ private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) {
+ final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);
String taskQueue = TemporalJobType.SYNC.name();
@@ -487,8 +500,8 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
*
* @return True if the job failed, false otherwise
*/
- private boolean getFailStatus(StandardSyncOutput standardSyncOutput) {
- StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();
+ private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
+ final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();
if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) {
workflowInternalState.getFailures().addAll(standardSyncOutput.getFailures());
@@ -513,12 +526,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
/**
* Set a job as cancel and continue to the next job if and continue as a reset if needed
*/
- private void reportCancelledAndContinueWith(boolean isReset, ConnectionUpdaterInput connectionUpdaterInput) {
+ private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
- Long jobId = workflowInternalState.getJobId();
- Integer attemptId = workflowInternalState.getAttemptId();
- Set failures = workflowInternalState.getFailures();
- Boolean partialSuccess = workflowInternalState.getPartialSuccess();
+ final Long jobId = workflowInternalState.getJobId();
+ final Integer attemptId = workflowInternalState.getAttemptId();
+ final Set failures = workflowInternalState.getFailures();
+ final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java
index aa45b53b0e8c..41700bf3150b 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java
@@ -72,6 +72,24 @@ class AttemptCreationOutput {
@ActivityMethod
AttemptCreationOutput createNewAttempt(AttemptCreationInput input) throws RetryableException;
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ class AttemptNumberCreationOutput {
+
+ private int attemptNumber;
+
+ }
+
+ /**
+ * Create a new attempt for a given job ID
+ *
+ * @param input POJO containing the jobId
+ * @return A POJO containing the attemptNumber
+ */
+ @ActivityMethod
+ AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException;
+
@Data
@NoArgsConstructor
@AllArgsConstructor
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
index bc3ae71f9f1d..3a896b236b6b 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
@@ -126,6 +126,24 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input)
}
}
+ @Override
+ public AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException {
+ try {
+ final long jobId = input.getJobId();
+ final Job createdJob = jobPersistence.getJob(jobId);
+
+ final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
+ final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
+ final int persistedAttemptNumber = jobPersistence.createAttempt(jobId, logFilePath);
+ emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId);
+
+ LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot());
+ return new AttemptNumberCreationOutput(persistedAttemptNumber);
+ } catch (final IOException e) {
+ throw new RetryableException(e);
+ }
+ }
+
@Override
public void jobSuccess(final JobSuccessInput input) {
try {
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
index 5f480cf4fb80..0f286452cb59 100644
--- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
+++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
@@ -29,6 +29,7 @@
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
+import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
@@ -84,6 +85,7 @@ public class JobCreationAndStatusUpdateActivityTest {
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 123L;
private static final int ATTEMPT_ID = 0;
+ private static final int ATTEMPT_NUMBER = 1;
private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
@@ -164,6 +166,56 @@ public void createAttemptThrowException() throws IOException {
.hasCauseInstanceOf(IOException.class);
}
+ @Test
+ @DisplayName("Test attempt creation")
+ public void createAttemptNumber() throws IOException {
+ Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
+
+ final Job mJob = Mockito.mock(Job.class);
+
+ Mockito.when(mJobPersistence.getJob(JOB_ID))
+ .thenReturn(mJob);
+
+ final WorkerRun mWorkerRun = Mockito.mock(WorkerRun.class);
+
+ Mockito.when(mTemporalWorkerRunFactory.create(mJob))
+ .thenReturn(mWorkerRun);
+
+ final Path mPath = Mockito.mock(Path.class);
+ final Path path = Path.of("test");
+ Mockito.when(mPath.resolve(Mockito.anyString()))
+ .thenReturn(path);
+ Mockito.when(mWorkerRun.getJobRoot())
+ .thenReturn(mPath);
+
+ Mockito.when(mJobPersistence.createAttempt(JOB_ID, path))
+ .thenReturn(ATTEMPT_NUMBER);
+
+ final LogClientSingleton mLogClientSingleton = Mockito.mock(LogClientSingleton.class);
+ try (final MockedStatic utilities = Mockito.mockStatic(LogClientSingleton.class)) {
+ utilities.when(() -> LogClientSingleton.getInstance())
+ .thenReturn(mLogClientSingleton);
+
+ final AttemptNumberCreationOutput output = jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput(
+ JOB_ID));
+
+ Mockito.verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath);
+ Assertions.assertThat(output.getAttemptNumber()).isEqualTo(ATTEMPT_NUMBER);
+ }
+ }
+
+ @Test
+ @DisplayName("Test exception errors are properly wrapped")
+ public void createAttemptNumberThrowException() throws IOException {
+ Mockito.when(mJobPersistence.getJob(JOB_ID))
+ .thenThrow(new IOException());
+
+ Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput(
+ JOB_ID)))
+ .isInstanceOf(RetryableException.class)
+ .hasCauseInstanceOf(IOException.class);
+ }
+
}
@Nested