diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 2f31c3020b218..8676cd8c94c8c 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -222,6 +222,10 @@ public void failJob(final long jobId) throws IOException { private void updateJobStatus(final DSLContext ctx, final long jobId, final JobStatus newStatus, final LocalDateTime now) { final Job job = getJob(ctx, jobId); + if (job.isJobInTerminalState()) { + // If the job is already terminal, no need to set a new status + return; + } job.validateStatusTransition(newStatus); ctx.execute( "UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?", diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReportingClient.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReportingClient.java index d6bc07ed3b790..7ebeb02113b9b 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReportingClient.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReportingClient.java @@ -19,7 +19,7 @@ public interface JobErrorReportingClient { */ void reportJobFailureReason(@Nullable StandardWorkspace workspace, final FailureReason reason, - final String dockerImage, + @Nullable final String dockerImage, Map metadata); } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/SentryJobErrorReportingClient.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/SentryJobErrorReportingClient.java index ce4b503e0f595..f3750c97d9047 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/SentryJobErrorReportingClient.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/SentryJobErrorReportingClient.java @@ -61,20 +61,22 @@ static IHub createSentryHubWithDSN(final String sentryDSN) { @Override public void reportJobFailureReason(@Nullable final StandardWorkspace workspace, final FailureReason failureReason, - final String dockerImage, + @Nullable final String dockerImage, final Map metadata) { final SentryEvent event = new SentryEvent(); - // Remove invalid characters from the release name, use @ so sentry knows how to grab the tag - // e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0 - // More info at https://docs.sentry.io/product/cli/releases/#creating-releases - final String release = dockerImage.replace("/", "-").replace(":", "@"); - event.setRelease(release); - - // enhance event fingerprint to ensure separate grouping per connector - final String[] releaseParts = release.split("@"); - if (releaseParts.length > 0) { - event.setFingerprints(List.of("{{ default }}", releaseParts[0])); + if (dockerImage != null) { + // Remove invalid characters from the release name, use @ so sentry knows how to grab the tag + // e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0 + // More info at https://docs.sentry.io/product/cli/releases/#creating-releases + final String release = dockerImage.replace("/", "-").replace(":", "@"); + event.setRelease(release); + + // enhance event fingerprint to ensure separate grouping per connector + final String[] releaseParts = release.split("@"); + if (releaseParts.length > 0) { + event.setFingerprints(List.of("{{ default }}", releaseParts[0])); + } } // set workspace as the user in sentry to get impact and priority diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index b41c691fbbc80..633bffb9571eb 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -8,6 +8,7 @@ import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -713,13 +714,13 @@ void testCancelJob() throws IOException { } @Test - @DisplayName("Should raise an exception if job is already succeeded") + @DisplayName("Should not raise an exception if job is already succeeded") void testCancelJobAlreadySuccessful() throws IOException { final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); jobPersistence.succeedAttempt(jobId, attemptNumber); - assertThrows(IllegalStateException.class, () -> jobPersistence.cancelJob(jobId)); + assertDoesNotThrow(() -> jobPersistence.cancelJob(jobId)); final Job updated = jobPersistence.getJob(jobId); assertEquals(JobStatus.SUCCEEDED, updated.getStatus()); @@ -867,13 +868,13 @@ void failJob() throws IOException { } @Test - @DisplayName("Should raise an exception if job is already succeeded") + @DisplayName("Should not raise an exception if job is already succeeded") void testFailJobAlreadySucceeded() throws IOException { final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); jobPersistence.succeedAttempt(jobId, attemptNumber); - assertThrows(IllegalStateException.class, () -> jobPersistence.failJob(jobId)); + assertDoesNotThrow(() -> jobPersistence.failJob(jobId)); final Job updated = jobPersistence.getJob(jobId); assertEquals(JobStatus.SUCCEEDED, updated.getStatus()); @@ -1653,7 +1654,7 @@ void testResetJobCancelled() throws IOException { final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); jobPersistence.cancelJob(jobId); - assertThrows(IllegalStateException.class, () -> jobPersistence.resetJob(jobId)); + assertDoesNotThrow(() -> jobPersistence.resetJob(jobId)); final Job updated = jobPersistence.getJob(jobId); assertEquals(JobStatus.CANCELLED, updated.getStatus()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 2582eee235a5c..34026e1c864d9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -269,8 +269,9 @@ public void jobFailure(final JobFailureInput input) { final UUID connectionId = UUID.fromString(job.getScope()); ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId)); final JobSyncConfig jobSyncConfig = job.getConfig().getSync(); - final SyncJobReportingContext jobContext = - new SyncJobReportingContext(jobId, jobSyncConfig.getSourceDockerImage(), jobSyncConfig.getDestinationDockerImage()); + final String sourceDockerImage = jobSyncConfig != null ? jobSyncConfig.getSourceDockerImage() : null; + final String destinationDockerImage = jobSyncConfig != null ? jobSyncConfig.getDestinationDockerImage() : null; + final SyncJobReportingContext jobContext = new SyncJobReportingContext(jobId, sourceDockerImage, destinationDockerImage); job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary) .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, jobContext)); trackCompletion(job, JobStatus.FAILED); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index ba5a41d08e58a..a4752364d4097 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -82,6 +82,7 @@ @SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") class JobCreationAndStatusUpdateActivityTest { + public static final String REASON = "reason"; @Mock private SyncJobFactory mJobFactory; @@ -393,10 +394,10 @@ void setJobFailure() throws IOException { Mockito.when(mJobPersistence.getJob(JOB_ID)) .thenReturn(mJob); - jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, "reason")); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON)); verify(mJobPersistence).failJob(JOB_ID); - verify(mJobNotifier).failJob(eq("reason"), Mockito.any()); + verify(mJobNotifier).failJob(eq(REASON), Mockito.any()); verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any()); } @@ -414,6 +415,29 @@ void setJobFailureWrapException() throws IOException { verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_NUMBER, JobState.FAILED, exception); } + @Test + void setJobFailureWithNullJobSyncConfig() throws IOException { + final Attempt mAttempt = Mockito.mock(Attempt.class); + Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary)); + + final JobConfig mJobConfig = Mockito.mock(JobConfig.class); + Mockito.when(mJobConfig.getSync()).thenReturn(null); + + final Job mJob = Mockito.mock(Job.class); + Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString()); + Mockito.when(mJob.getConfig()).thenReturn(mJobConfig); + Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt)); + + Mockito.when(mJobPersistence.getJob(JOB_ID)) + .thenReturn(mJob); + + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON)); + + verify(mJobPersistence).failJob(JOB_ID); + verify(mJobNotifier).failJob(eq(REASON), Mockito.any()); + verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any()); + } + @Test void setAttemptFailure() throws IOException { jobCreationAndStatusUpdateActivity