Skip to content

Commit

Permalink
Revert "Remove the attemptId notion in the connectionManagerWorkflow (#…
Browse files Browse the repository at this point in the history
…10780)" (#11057)

This reverts commit 99338c8.
  • Loading branch information
benmoriceau authored and terencecho committed Mar 14, 2022
1 parent 63a3cf4 commit 52ab967
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 1,304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public interface JobPersistence {
//

/**
* Create a new attempt for a job and return its attempt number. Throws
* {@link IllegalStateException} if the job is already in a terminal state.
* Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a
* terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
* @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @return id of the attempt
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies {
implementation project(':airbyte-scheduler:models')

testImplementation 'io.temporal:temporal-testing:1.8.1'
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'io.temporal:temporal-testing-junit5:1.5.0' // versioned separately from rest of temporal
testImplementation "org.flywaydb:flyway-core:7.14.0"
testImplementation 'org.mockito:mockito-inline:4.0.0'
testImplementation 'org.postgresql:postgresql:42.2.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput;
import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration;
import io.airbyte.workers.temporal.scheduling.state.WorkflowInternalState;
Expand Down Expand Up @@ -65,9 +60,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1;
private static final int AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION = 1;

private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand Down Expand Up @@ -158,7 +150,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

workflowInternalState.setJobId(getOrCreateJobId(connectionUpdaterInput));

workflowInternalState.setAttemptNumber(createAttempt(workflowInternalState.getJobId()));
workflowInternalState.setAttemptId(createAttemptId(workflowInternalState.getJobId()));

final GeneratedJobInput jobInputs = getJobInput();

Expand Down Expand Up @@ -194,13 +186,13 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
af.getActivityType(),
af.getCause(),
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber()));
workflowInternalState.getAttemptId()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
} else {
workflowInternalState.getFailures().add(
FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber()));
workflowInternalState.getAttemptId()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand All @@ -210,41 +202,20 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
}
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput));

resetNewConnectionInput(connectionUpdaterInput);
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
}
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand Down Expand Up @@ -338,20 +309,20 @@ public WorkflowState getState() {
@Override
public JobInformation getJobInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Integer attemptId = workflowInternalState.getAttemptId();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber);
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
}

@Override
public QuarantinedInformation getQuarantinedInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Integer attemptId = workflowInternalState.getAttemptId();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId,
workflowState.isQuarantined());
}

Expand Down Expand Up @@ -451,31 +422,15 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu

/**
* Create a new attempt for a given jobId
*
* @param jobId - the jobId associated with the new attempt
*
* @return The attempt number
*/
private Integer createAttempt(final long jobId) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
private Integer createAttemptId(final long jobId) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptNumberCreationOutput.getAttemptNumber();

return attemptCreationOutput.getAttemptId();
}

/**
Expand All @@ -484,30 +439,14 @@ private Integer createAttempt(final long jobId) {
*/
private GeneratedJobInput getJobInput() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptNumber,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
}

final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber(
attemptNumber,
final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInputWithAttemptNumber,
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
Expand Down Expand Up @@ -593,25 +532,14 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Integer attemptId = workflowInternalState.getAttemptId();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptId,
FailureHelper.failureSummaryForCancellation(jobId, attemptId, failures, partialSuccess)));
resetNewConnectionInput(connectionUpdaterInput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ class SyncInput {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class SyncInputWithAttemptNumber {

private int attemptNumber;
private long jobId;
private boolean reset;

}

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -56,10 +45,4 @@ class GeneratedJobInput {
@ActivityMethod
GeneratedJobInput getSyncWorkflowInput(SyncInput input);

/**
* This generate the input needed by the child sync workflow
*/
@ActivityMethod
GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(SyncInputWithAttemptNumber input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,4 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
}
}

@Override
public GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(final SyncInputWithAttemptNumber input) {
return getSyncWorkflowInput(new SyncInput(
input.getAttemptNumber(),
input.getJobId(),
input.isReset()));
}

}
Loading

0 comments on commit 52ab967

Please sign in to comment.