Skip to content

Commit

Permalink
Handle null JobSyncConfig (#18969)
Browse files Browse the repository at this point in the history
* Handle null JobSyncConfig

* Add unit test

* Fix PMD warning

* Do not update jobs in terminal state

* Fix failing tests

* Fix compile error
  • Loading branch information
jdpgrailsdev committed Nov 4, 2022
1 parent 6e94249 commit c8a7cb3
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public void failJob(final long jobId) throws IOException {

private void updateJobStatus(final DSLContext ctx, final long jobId, final JobStatus newStatus, final LocalDateTime now) {
final Job job = getJob(ctx, jobId);
if (job.isJobInTerminalState()) {
// If the job is already terminal, no need to set a new status
return;
}
job.validateStatusTransition(newStatus);
ctx.execute(
"UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface JobErrorReportingClient {
*/
void reportJobFailureReason(@Nullable StandardWorkspace workspace,
final FailureReason reason,
final String dockerImage,
@Nullable final String dockerImage,
Map<String, String> metadata);

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ static IHub createSentryHubWithDSN(final String sentryDSN) {
@Override
public void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
@Nullable final String dockerImage,
final Map<String, String> metadata) {
final SentryEvent event = new SentryEvent();

// Remove invalid characters from the release name, use @ so sentry knows how to grab the tag
// e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0
// More info at https://docs.sentry.io/product/cli/releases/#creating-releases
final String release = dockerImage.replace("/", "-").replace(":", "@");
event.setRelease(release);

// enhance event fingerprint to ensure separate grouping per connector
final String[] releaseParts = release.split("@");
if (releaseParts.length > 0) {
event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
if (dockerImage != null) {
// Remove invalid characters from the release name, use @ so sentry knows how to grab the tag
// e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0
// More info at https://docs.sentry.io/product/cli/releases/#creating-releases
final String release = dockerImage.replace("/", "-").replace(":", "@");
event.setRelease(release);

// enhance event fingerprint to ensure separate grouping per connector
final String[] releaseParts = release.split("@");
if (releaseParts.length > 0) {
event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
}
}

// set workspace as the user in sentry to get impact and priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -713,13 +714,13 @@ void testCancelJob() throws IOException {
}

@Test
@DisplayName("Should raise an exception if job is already succeeded")
@DisplayName("Should not raise an exception if job is already succeeded")
void testCancelJobAlreadySuccessful() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
jobPersistence.succeedAttempt(jobId, attemptNumber);

assertThrows(IllegalStateException.class, () -> jobPersistence.cancelJob(jobId));
assertDoesNotThrow(() -> jobPersistence.cancelJob(jobId));

final Job updated = jobPersistence.getJob(jobId);
assertEquals(JobStatus.SUCCEEDED, updated.getStatus());
Expand Down Expand Up @@ -867,13 +868,13 @@ void failJob() throws IOException {
}

@Test
@DisplayName("Should raise an exception if job is already succeeded")
@DisplayName("Should not raise an exception if job is already succeeded")
void testFailJobAlreadySucceeded() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
jobPersistence.succeedAttempt(jobId, attemptNumber);

assertThrows(IllegalStateException.class, () -> jobPersistence.failJob(jobId));
assertDoesNotThrow(() -> jobPersistence.failJob(jobId));

final Job updated = jobPersistence.getJob(jobId);
assertEquals(JobStatus.SUCCEEDED, updated.getStatus());
Expand Down Expand Up @@ -1653,7 +1654,7 @@ void testResetJobCancelled() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();

jobPersistence.cancelJob(jobId);
assertThrows(IllegalStateException.class, () -> jobPersistence.resetJob(jobId));
assertDoesNotThrow(() -> jobPersistence.resetJob(jobId));

final Job updated = jobPersistence.getJob(jobId);
assertEquals(JobStatus.CANCELLED, updated.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ public void jobFailure(final JobFailureInput input) {
final UUID connectionId = UUID.fromString(job.getScope());
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
final JobSyncConfig jobSyncConfig = job.getConfig().getSync();
final SyncJobReportingContext jobContext =
new SyncJobReportingContext(jobId, jobSyncConfig.getSourceDockerImage(), jobSyncConfig.getDestinationDockerImage());
final String sourceDockerImage = jobSyncConfig != null ? jobSyncConfig.getSourceDockerImage() : null;
final String destinationDockerImage = jobSyncConfig != null ? jobSyncConfig.getDestinationDockerImage() : null;
final SyncJobReportingContext jobContext = new SyncJobReportingContext(jobId, sourceDockerImage, destinationDockerImage);
job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary)
.ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, jobContext));
trackCompletion(job, JobStatus.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
class JobCreationAndStatusUpdateActivityTest {

public static final String REASON = "reason";
@Mock
private SyncJobFactory mJobFactory;

Expand Down Expand Up @@ -393,10 +394,10 @@ void setJobFailure() throws IOException {
Mockito.when(mJobPersistence.getJob(JOB_ID))
.thenReturn(mJob);

jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, "reason"));
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON));

verify(mJobPersistence).failJob(JOB_ID);
verify(mJobNotifier).failJob(eq("reason"), Mockito.any());
verify(mJobNotifier).failJob(eq(REASON), Mockito.any());
verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any());
}

Expand All @@ -414,6 +415,29 @@ void setJobFailureWrapException() throws IOException {
verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_NUMBER, JobState.FAILED, exception);
}

@Test
void setJobFailureWithNullJobSyncConfig() throws IOException {
final Attempt mAttempt = Mockito.mock(Attempt.class);
Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary));

final JobConfig mJobConfig = Mockito.mock(JobConfig.class);
Mockito.when(mJobConfig.getSync()).thenReturn(null);

final Job mJob = Mockito.mock(Job.class);
Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString());
Mockito.when(mJob.getConfig()).thenReturn(mJobConfig);
Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt));

Mockito.when(mJobPersistence.getJob(JOB_ID))
.thenReturn(mJob);

jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON));

verify(mJobPersistence).failJob(JOB_ID);
verify(mJobNotifier).failJob(eq(REASON), Mockito.any());
verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any());
}

@Test
void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity
Expand Down

0 comments on commit c8a7cb3

Please sign in to comment.