Skip to content

Commit

Permalink
Remove the attemptId notion in the connectionManagerWorkflow
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Mar 2, 2022
1 parent b610393 commit 6fc4904
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public interface JobPersistence {
//

/**
* Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a
* terminal state.
* Create a new attempt for a job and return its attempt number. Throws
* {@link IllegalStateException} if the job is already in a terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
* @return id of the attempt
* @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
}
}

private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput connectionUpdaterInput) {
private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) {
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();

Expand All @@ -122,7 +123,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
// resetConnection flag to the next run so that that run can execute the actual reset
workflowState.setResetConnection(connectionUpdaterInput.isResetConnection());

Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());
final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());

Workflow.await(timeToWait,
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());
Expand Down Expand Up @@ -189,7 +190,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
});
}

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
Expand All @@ -199,7 +200,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput,
resetNewConnectionInput(connectionUpdaterInput);
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
Expand Down Expand Up @@ -288,17 +289,17 @@ public WorkflowState getState() {

@Override
public JobInformation getJobInformation() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
}

@Override
public QuarantinedInformation getQuarantinedInformation() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
Expand Down Expand Up @@ -335,10 +336,10 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn
*
* We aimed to use this method for call of the temporal activity.
*/
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) {
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INPUT, OUTPUT> mapper, final INPUT input) {
try {
return mapper.apply(input);
} catch (Exception e) {
} catch (final Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setQuarantined(true);
workflowState.setRetryFailedActivity(false);
Expand All @@ -353,7 +354,7 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(Function<INPUT, OU
/**
* Similar to runMandatoryActivityWithOutput but for methods that don't return
*/
private <INPUT> void runMandatoryActivity(Consumer<INPUT> consumer, INPUT input) {
private <INPUT> void runMandatoryActivity(final Consumer<INPUT> consumer, final INPUT input) {
runMandatoryActivityWithOutput((inputInternal) -> {
consumer.accept(inputInternal);
return null;
Expand All @@ -369,7 +370,7 @@ private <INPUT> void runMandatoryActivity(Consumer<INPUT> consumer, INPUT input)
*
* Wait time is infinite If the workflow is manual or disabled since we never want to schedule this.
*/
private Duration getTimeToWait(UUID connectionId) {
private Duration getTimeToWait(final UUID connectionId) {
// Scheduling
final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(connectionId);

Expand Down Expand Up @@ -402,23 +403,35 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
/**
* Create a new attempt for a given jobId
*/
private Integer createAttemptId(long jobId) {
final AttemptCreationOutput attemptCreationOutput =
private Integer createAttemptId(final long jobId) {
final int currentVersion = 1;
final int attemptCreationVersion = Workflow.getVersion("rename_attempt_id_to_number", Workflow.DEFAULT_VERSION, currentVersion);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < currentVersion) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
new AttemptCreationInput(
jobId));

return attemptCreationOutput.getAttemptId();
return attemptNumberCreationOutput.getAttemptNumber();
}

/**
* Generate the input that is needed by the job. It will generate the configuration needed by the
* job and will generate a different output if the job is a sync or a reset.
*/
private GeneratedJobInput getJobInput() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
jobId,
Expand Down Expand Up @@ -456,8 +469,8 @@ private void reportJobStarting() {
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999
*/
private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
int taskQueueChangeVersion =
private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

String taskQueue = TemporalJobType.SYNC.name();
Expand Down Expand Up @@ -487,8 +500,8 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
*
* @return True if the job failed, false otherwise
*/
private boolean getFailStatus(StandardSyncOutput standardSyncOutput) {
StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();
private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();

if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) {
workflowInternalState.getFailures().addAll(standardSyncOutput.getFailures());
Expand All @@ -513,12 +526,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
/**
* Set a job as cancel and continue to the next job if and continue as a reset if needed
*/
private void reportCancelledAndContinueWith(boolean isReset, ConnectionUpdaterInput connectionUpdaterInput) {
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
Set<FailureReason> failures = workflowInternalState.getFailures();
Boolean partialSuccess = workflowInternalState.getPartialSuccess();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ class AttemptCreationOutput {
@ActivityMethod
AttemptCreationOutput createNewAttempt(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
class AttemptNumberCreationOutput {

private int attemptNumber;

}

/**
* Create a new attempt for a given job ID
*
* @param input POJO containing the jobId
* @return A POJO containing the attemptNumber
*/
@ActivityMethod
AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input)
}
}

@Override
public AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException {
try {
final long jobId = input.getJobId();
final Job createdJob = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
final int persistedAttemptNumber = jobPersistence.createAttempt(jobId, logFilePath);
emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId);

LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot());
return new AttemptNumberCreationOutput(persistedAttemptNumber);
} catch (final IOException e) {
throw new RetryableException(e);
}
}

@Override
public void jobSuccess(final JobSuccessInput input) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class JobCreationAndStatusUpdateActivityTest {
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 123L;
private static final int ATTEMPT_ID = 0;
private static final int ATTEMPT_NUMBER = 1;
private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
Expand Down Expand Up @@ -164,6 +166,56 @@ public void createAttemptThrowException() throws IOException {
.hasCauseInstanceOf(IOException.class);
}

@Test
@DisplayName("Test attempt creation")
public void createAttemptNumber() throws IOException {
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));

final Job mJob = Mockito.mock(Job.class);

Mockito.when(mJobPersistence.getJob(JOB_ID))
.thenReturn(mJob);

final WorkerRun mWorkerRun = Mockito.mock(WorkerRun.class);

Mockito.when(mTemporalWorkerRunFactory.create(mJob))
.thenReturn(mWorkerRun);

final Path mPath = Mockito.mock(Path.class);
final Path path = Path.of("test");
Mockito.when(mPath.resolve(Mockito.anyString()))
.thenReturn(path);
Mockito.when(mWorkerRun.getJobRoot())
.thenReturn(mPath);

Mockito.when(mJobPersistence.createAttempt(JOB_ID, path))
.thenReturn(ATTEMPT_NUMBER);

final LogClientSingleton mLogClientSingleton = Mockito.mock(LogClientSingleton.class);
try (final MockedStatic<LogClientSingleton> utilities = Mockito.mockStatic(LogClientSingleton.class)) {
utilities.when(() -> LogClientSingleton.getInstance())
.thenReturn(mLogClientSingleton);

final AttemptNumberCreationOutput output = jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput(
JOB_ID));

Mockito.verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath);
Assertions.assertThat(output.getAttemptNumber()).isEqualTo(ATTEMPT_NUMBER);
}
}

@Test
@DisplayName("Test exception errors are properly wrapped")
public void createAttemptNumberThrowException() throws IOException {
Mockito.when(mJobPersistence.getJob(JOB_ID))
.thenThrow(new IOException());

Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput(
JOB_ID)))
.isInstanceOf(RetryableException.class)
.hasCauseInstanceOf(IOException.class);
}

}

@Nested
Expand Down

0 comments on commit 6fc4904

Please sign in to comment.