From 6fc4904f89dff61b6cf37d96af296d2e7a35c48c Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 1 Mar 2022 16:02:14 -0800 Subject: [PATCH] Remove the attemptId notion in the connectionManagerWorkflow --- .../scheduler/persistence/JobPersistence.java | 6 +- .../ConnectionManagerWorkflowImpl.java | 69 +++++++++++-------- .../JobCreationAndStatusUpdateActivity.java | 18 +++++ ...obCreationAndStatusUpdateActivityImpl.java | 18 +++++ ...obCreationAndStatusUpdateActivityTest.java | 52 ++++++++++++++ 5 files changed, 132 insertions(+), 31 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 2cfd994cb029..821b123d6787 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -75,12 +75,12 @@ public interface JobPersistence { // /** - * Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a - * terminal state. + * Create a new attempt for a job and return its attempt number. Throws + * {@link IllegalStateException} if the job is already in a terminal state. * * @param jobId job for which an attempt will be created * @param logPath path where logs should be written for the attempt - * @return id of the attempt + * @return The attempt number of the created attempt (see {@link DefaultJobPersistence}) * @throws IOException exception due to interaction with persistence */ int createAttempt(long jobId, Path logPath) throws IOException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 89786888ddb3..28ff4def3e9a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -23,6 +23,7 @@ import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput; +import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput; @@ -108,7 +109,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } } - private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput connectionUpdaterInput) { + private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) { return Workflow.newCancellationScope(() -> { connectionId = connectionUpdaterInput.getConnectionId(); @@ -122,7 +123,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co // resetConnection flag to the next run so that that run can execute the actual reset workflowState.setResetConnection(connectionUpdaterInput.isResetConnection()); - Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); + final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); Workflow.await(timeToWait, () -> skipScheduling() || connectionUpdaterInput.isFromFailure()); @@ -189,7 +190,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co }); } - private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) { + private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) { workflowState.setSuccess(true); runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput( workflowInternalState.getJobId(), @@ -199,7 +200,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, resetNewConnectionInput(connectionUpdaterInput); } - private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) { + private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) { runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput( workflowInternalState.getJobId(), workflowInternalState.getAttemptId(), @@ -288,8 +289,8 @@ public WorkflowState getState() { @Override public JobInformation getJobInformation() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); return new JobInformation( jobId == null ? NON_RUNNING_JOB_ID : jobId, attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId); @@ -297,8 +298,8 @@ public JobInformation getJobInformation() { @Override public QuarantinedInformation getQuarantinedInformation() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); return new QuarantinedInformation( connectionId, jobId == null ? NON_RUNNING_JOB_ID : jobId, @@ -335,10 +336,10 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn * * We aimed to use this method for call of the temporal activity. */ - private OUTPUT runMandatoryActivityWithOutput(Function mapper, INPUT input) { + private OUTPUT runMandatoryActivityWithOutput(final Function mapper, final INPUT input) { try { return mapper.apply(input); - } catch (Exception e) { + } catch (final Exception e) { log.error("Failed to run an activity for the connection " + connectionId, e); workflowState.setQuarantined(true); workflowState.setRetryFailedActivity(false); @@ -353,7 +354,7 @@ private OUTPUT runMandatoryActivityWithOutput(Function void runMandatoryActivity(Consumer consumer, INPUT input) { + private void runMandatoryActivity(final Consumer consumer, final INPUT input) { runMandatoryActivityWithOutput((inputInternal) -> { consumer.accept(inputInternal); return null; @@ -369,7 +370,7 @@ private void runMandatoryActivity(Consumer consumer, INPUT input) * * Wait time is infinite If the workflow is manual or disabled since we never want to schedule this. */ - private Duration getTimeToWait(UUID connectionId) { + private Duration getTimeToWait(final UUID connectionId) { // Scheduling final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(connectionId); @@ -402,14 +403,26 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu /** * Create a new attempt for a given jobId */ - private Integer createAttemptId(long jobId) { - final AttemptCreationOutput attemptCreationOutput = + private Integer createAttemptId(final long jobId) { + final int currentVersion = 1; + final int attemptCreationVersion = Workflow.getVersion("rename_attempt_id_to_number", Workflow.DEFAULT_VERSION, currentVersion); + + // Retrieve the attempt number but name it attempt id + if (attemptCreationVersion < currentVersion) { + final AttemptCreationOutput attemptCreationOutput = + runMandatoryActivityWithOutput( + jobCreationAndStatusUpdateActivity::createNewAttempt, + new AttemptCreationInput( + jobId)); + return attemptCreationOutput.getAttemptId(); + } + + final AttemptNumberCreationOutput attemptNumberCreationOutput = runMandatoryActivityWithOutput( - jobCreationAndStatusUpdateActivity::createNewAttempt, + jobCreationAndStatusUpdateActivity::createNewAttemptNumber, new AttemptCreationInput( jobId)); - - return attemptCreationOutput.getAttemptId(); + return attemptNumberCreationOutput.getAttemptNumber(); } /** @@ -417,8 +430,8 @@ private Integer createAttemptId(long jobId) { * job and will generate a different output if the job is a sync or a reset. */ private GeneratedJobInput getJobInput() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); final SyncInput getSyncInputActivitySyncInput = new SyncInput( attemptId, jobId, @@ -456,8 +469,8 @@ private void reportJobStarting() { * since the latter is a long running workflow, in the future, using a different Node pool would * make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999 */ - private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { - int taskQueueChangeVersion = + private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) { + final int taskQueueChangeVersion = Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); String taskQueue = TemporalJobType.SYNC.name(); @@ -487,8 +500,8 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { * * @return True if the job failed, false otherwise */ - private boolean getFailStatus(StandardSyncOutput standardSyncOutput) { - StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary(); + private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) { + final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary(); if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { workflowInternalState.getFailures().addAll(standardSyncOutput.getFailures()); @@ -513,12 +526,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() { /** * Set a job as cancel and continue to the next job if and continue as a reset if needed */ - private void reportCancelledAndContinueWith(boolean isReset, ConnectionUpdaterInput connectionUpdaterInput) { + private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) { workflowState.setContinueAsReset(isReset); - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); - Set failures = workflowInternalState.getFailures(); - Boolean partialSuccess = workflowInternalState.getPartialSuccess(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); + final Set failures = workflowInternalState.getFailures(); + final Boolean partialSuccess = workflowInternalState.getPartialSuccess(); runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled, new JobCancelledInput( jobId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java index aa45b53b0e8c..41700bf3150b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java @@ -72,6 +72,24 @@ class AttemptCreationOutput { @ActivityMethod AttemptCreationOutput createNewAttempt(AttemptCreationInput input) throws RetryableException; + @Data + @NoArgsConstructor + @AllArgsConstructor + class AttemptNumberCreationOutput { + + private int attemptNumber; + + } + + /** + * Create a new attempt for a given job ID + * + * @param input POJO containing the jobId + * @return A POJO containing the attemptNumber + */ + @ActivityMethod + AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException; + @Data @NoArgsConstructor @AllArgsConstructor diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index bc3ae71f9f1d..3a896b236b6b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -126,6 +126,24 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) } } + @Override + public AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException { + try { + final long jobId = input.getJobId(); + final Job createdJob = jobPersistence.getJob(jobId); + + final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob); + final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME); + final int persistedAttemptNumber = jobPersistence.createAttempt(jobId, logFilePath); + emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId); + + LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot()); + return new AttemptNumberCreationOutput(persistedAttemptNumber); + } catch (final IOException e) { + throw new RetryableException(e); + } + } + @Override public void jobSuccess(final JobSuccessInput input) { try { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 5f480cf4fb80..0f286452cb59 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -29,6 +29,7 @@ import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput; +import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput; @@ -84,6 +85,7 @@ public class JobCreationAndStatusUpdateActivityTest { private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final long JOB_ID = 123L; private static final int ATTEMPT_ID = 0; + private static final int ATTEMPT_NUMBER = 1; private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() .withStandardSyncSummary( new StandardSyncSummary() @@ -164,6 +166,56 @@ public void createAttemptThrowException() throws IOException { .hasCauseInstanceOf(IOException.class); } + @Test + @DisplayName("Test attempt creation") + public void createAttemptNumber() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + + final Job mJob = Mockito.mock(Job.class); + + Mockito.when(mJobPersistence.getJob(JOB_ID)) + .thenReturn(mJob); + + final WorkerRun mWorkerRun = Mockito.mock(WorkerRun.class); + + Mockito.when(mTemporalWorkerRunFactory.create(mJob)) + .thenReturn(mWorkerRun); + + final Path mPath = Mockito.mock(Path.class); + final Path path = Path.of("test"); + Mockito.when(mPath.resolve(Mockito.anyString())) + .thenReturn(path); + Mockito.when(mWorkerRun.getJobRoot()) + .thenReturn(mPath); + + Mockito.when(mJobPersistence.createAttempt(JOB_ID, path)) + .thenReturn(ATTEMPT_NUMBER); + + final LogClientSingleton mLogClientSingleton = Mockito.mock(LogClientSingleton.class); + try (final MockedStatic utilities = Mockito.mockStatic(LogClientSingleton.class)) { + utilities.when(() -> LogClientSingleton.getInstance()) + .thenReturn(mLogClientSingleton); + + final AttemptNumberCreationOutput output = jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput( + JOB_ID)); + + Mockito.verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath); + Assertions.assertThat(output.getAttemptNumber()).isEqualTo(ATTEMPT_NUMBER); + } + } + + @Test + @DisplayName("Test exception errors are properly wrapped") + public void createAttemptNumberThrowException() throws IOException { + Mockito.when(mJobPersistence.getJob(JOB_ID)) + .thenThrow(new IOException()); + + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput( + JOB_ID))) + .isInstanceOf(RetryableException.class) + .hasCauseInstanceOf(IOException.class); + } + } @Nested