Skip to content

Commit

Permalink
fix attempt output persistence for schedulerv2 (#9764)
Browse files Browse the repository at this point in the history
* fix attempt output persistence for schedulerv2

* fix test

* add test for failure case pushing output

* fix comment
  • Loading branch information
jrhizor committed Jan 24, 2022
1 parent ded87be commit f5b5fea
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput)
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(
connectionUpdaterInput.getJobId(),
connectionUpdaterInput.getAttemptId()));
connectionUpdaterInput.getAttemptId(),
standardSyncOutput.orElse(null)));

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class AttemptFailureInput {

private long jobId;
private int attemptId;
private StandardSyncOutput standardSyncOutput;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void jobSuccess(final JobSuccessInput input) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput);
} else {
log.warn("The job {} doesn't have an input for the attempt {}", input.getJobId(), input.getAttemptId());
log.warn("The job {} doesn't have any output for the attempt {}", input.getJobId(), input.getAttemptId());
}
jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId());
final Job job = jobPersistence.getJob(input.getJobId());
Expand All @@ -139,7 +139,13 @@ public void jobFailure(final JobFailureInput input) {
public void attemptFailure(final AttemptFailureInput input) {
try {
jobPersistence.failAttempt(input.getJobId(), input.getAttemptId());
final Job job = jobPersistence.getJob(input.getJobId());

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput);
} else {
log.warn("The job {} doesn't have any output for the attempt {}", input.getJobId(), input.getAttemptId());
}
} catch (final IOException e) {
throw new RetryableException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class JobCreationAndStatusUpdateActivityTest {
new StandardSyncSummary()
.withStatus(ReplicationStatus.COMPLETED));

private static final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput);

@Nested
class Creation {

Expand Down Expand Up @@ -147,7 +149,6 @@ class Update {
@Test
public void setJobSuccess() throws IOException {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));
final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput);

Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Expand Down Expand Up @@ -185,17 +186,18 @@ public void setJobFailureWrapException() throws IOException {

@Test
public void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID));
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));

Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
}

@Test
public void setAttemptFailureWrapException() throws IOException {
Mockito.doThrow(new IOException())
.when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);

Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID)))
Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, null)))
.isInstanceOf(RetryableException.class)
.hasCauseInstanceOf(IOException.class);
}
Expand Down

0 comments on commit f5b5fea

Please sign in to comment.