Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the attemptId notion in the connectionManagerWorkflow #10780

Merged
merged 16 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public interface JobPersistence {
//

/**
* Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a
* terminal state.
* Create a new attempt for a job and return its attempt number. Throws
* {@link IllegalStateException} if the job is already in a terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
* @return id of the attempt
* @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput;
import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration;
import io.airbyte.workers.temporal.scheduling.state.WorkflowInternalState;
Expand Down Expand Up @@ -56,6 +61,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1;

private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand Down Expand Up @@ -139,7 +147,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

workflowInternalState.setJobId(getOrCreateJobId(connectionUpdaterInput));

workflowInternalState.setAttemptId(createAttemptId(workflowInternalState.getJobId()));
workflowInternalState.setAttemptNumber(createAttempt(workflowInternalState.getJobId()));

final GeneratedJobInput jobInputs = getJobInput();

Expand Down Expand Up @@ -175,13 +183,13 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
af.getActivityType(),
af.getCause(),
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId()));
workflowInternalState.getAttemptNumber()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
} else {
workflowInternalState.getFailures().add(
FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), workflowInternalState.getJobId(),
workflowInternalState.getAttemptId()));
workflowInternalState.getAttemptNumber()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand All @@ -191,20 +199,41 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
}

resetNewConnectionInput(connectionUpdaterInput);
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
}

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand Down Expand Up @@ -289,20 +318,20 @@ public WorkflowState getState() {
@Override
public JobInformation getJobInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber);
}

@Override
public QuarantinedInformation getQuarantinedInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber,
workflowState.isQuarantined());
}

Expand Down Expand Up @@ -401,15 +430,31 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu

/**
* Create a new attempt for a given jobId
*
* @param jobId - the jobId associated with the new attempt
*
* @return The attempt number
*/
private Integer createAttemptId(final long jobId) {
final AttemptCreationOutput attemptCreationOutput =
private Integer createAttempt(final long jobId) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
new AttemptCreationInput(
jobId));

return attemptCreationOutput.getAttemptId();
return attemptNumberCreationOutput.getAttemptNumber();
}

/**
Expand All @@ -418,14 +463,30 @@ private Integer createAttemptId(final long jobId) {
*/
private GeneratedJobInput getJobInput() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptNumber,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
}

final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber(
attemptNumber,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivity::getSyncWorkflowInputWithAttemptNumber,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
Expand Down Expand Up @@ -515,14 +576,25 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptId,
FailureHelper.failureSummaryForCancellation(jobId, attemptId, failures, partialSuccess)));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}
resetNewConnectionInput(connectionUpdaterInput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ class SyncInput {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class SyncInputWithAttemptNumber {

private int attemptNumber;
private long jobId;
private boolean reset;

}

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -45,4 +56,10 @@ class GeneratedJobInput {
@ActivityMethod
GeneratedJobInput getSyncWorkflowInput(SyncInput input);

/**
* This generate the input needed by the child sync workflow
*/
@ActivityMethod
GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(SyncInputWithAttemptNumber input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
}
}

@Override
public GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(final SyncInputWithAttemptNumber input) {
return getSyncWorkflowInput(new SyncInput(
input.getAttemptNumber(),
input.getJobId(),
input.isReset()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ class AttemptCreationOutput {
@ActivityMethod
AttemptCreationOutput createNewAttempt(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
class AttemptNumberCreationOutput {

private int attemptNumber;

}

/**
* Create a new attempt for a given job ID
*
* @param input POJO containing the jobId
* @return A POJO containing the attemptNumber
*/
@ActivityMethod
AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -89,6 +107,23 @@ class JobSuccessInput {
@ActivityMethod
void jobSuccess(JobSuccessInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class JobSuccessInputWithAttemptNumber {

private long jobId;
private int attemptNumber;
private StandardSyncOutput standardSyncOutput;

}

/**
* Set a job status as successful
*/
@ActivityMethod
void jobSuccessWithAttemptNumber(JobSuccessInputWithAttemptNumber input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down Expand Up @@ -123,6 +158,24 @@ class AttemptFailureInput {
@ActivityMethod
void attemptFailure(AttemptFailureInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class AttemptNumberFailureInput {

private long jobId;
private int attemptNumber;
private StandardSyncOutput standardSyncOutput;
private AttemptFailureSummary attemptFailureSummary;

}

/**
* Set an attempt status as failed
*/
@ActivityMethod
void attemptFailureWithAttemptNumber(AttemptNumberFailureInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -140,6 +193,23 @@ class JobCancelledInput {
@ActivityMethod
void jobCancelled(JobCancelledInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class JobCancelledInputWithAttemptNumber {

private long jobId;
private int attemptNumber;
private AttemptFailureSummary attemptFailureSummary;

}

/**
* Set a job status as cancelled
*/
@ActivityMethod
void jobCancelledWithAttemptNumber(JobCancelledInputWithAttemptNumber input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Loading