From f28d5f304fa2232b3839b295f5f11bbf40881ad6 Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Wed, 21 Sep 2022 16:50:27 -0700 Subject: [PATCH] Only run normalization when needed (#16794) Only run normalization when records have been committed --- .../job/DefaultJobPersistence.java | 13 ++++ .../persistence/job/JobPersistence.java | 3 + .../models/AttemptNormalizationStatus.java | 9 +++ .../persistence/job/models/JobStatus.java | 2 +- .../workers/config/ActivityBeanFactory.java | 6 +- .../NormalizationSummaryCheckActivity.java | 18 +++++ ...NormalizationSummaryCheckActivityImpl.java | 78 +++++++++++++++++++ .../temporal/sync/SyncWorkflowImpl.java | 23 ++++++ ...NormalizationSummaryCheckActivityTest.java | 73 +++++++++++++++++ .../temporal/sync/SyncWorkflowTest.java | 49 ++++++++++-- 10 files changed, 265 insertions(+), 9 deletions(-) create mode 100644 airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java create mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index b3011aeddeba5..810bff38b7784 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -36,6 +36,7 @@ import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.persistence.job.models.Attempt; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; import io.airbyte.persistence.job.models.Job; @@ -625,6 +626,18 @@ public List listAttemptsWithJobInfo(final ConfigType configT timeConvertedIntoLocalDateTime))); } + @Override + public List getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException { + return jobDatabase + .query(ctx -> ctx.select(ATTEMPTS.ATTEMPT_NUMBER, SYNC_STATS.RECORDS_COMMITTED, NORMALIZATION_SUMMARIES.FAILURES) + .from(ATTEMPTS) + .join(SYNC_STATS).on(SYNC_STATS.ATTEMPT_ID.eq(ATTEMPTS.ID)) + .leftJoin(NORMALIZATION_SUMMARIES).on(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(ATTEMPTS.ID)) + .where(ATTEMPTS.JOB_ID.eq(jobId)) + .fetch(record -> new AttemptNormalizationStatus(record.get(ATTEMPTS.ATTEMPT_NUMBER), + Optional.of(record.get(SYNC_STATS.RECORDS_COMMITTED)), record.get(NORMALIZATION_SUMMARIES.FAILURES) != null))); + } + // Retrieves only Job information from the record, without any attempt info private static Job getJobFromRecord(final Record record) { return new Job(record.get(JOB_ID, Long.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index ee722722d40cc..fea5a9c036578 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -11,6 +11,7 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.SyncStats; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; @@ -294,4 +295,6 @@ List listJobStatusAndTimestampWithConnection(UUID con */ void setSchedulerMigrationDone() throws IOException; + List getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException; + } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java new file mode 100644 index 0000000000000..9575bb8e99687 --- /dev/null +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.persistence.job.models; + +import java.util.Optional; + +public record AttemptNormalizationStatus(long attemptNumber, Optional recordsCommitted, boolean normalizationFailed) {} diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/JobStatus.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/JobStatus.java index 13089c8335180..421f93f82385a 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/JobStatus.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/JobStatus.java @@ -25,7 +25,7 @@ public enum JobStatus { RUNNING, Set.of(INCOMPLETE, SUCCEEDED, FAILED, CANCELLED), INCOMPLETE, Set.of(PENDING, RUNNING, FAILED, CANCELLED, INCOMPLETE), SUCCEEDED, Set.of(), - FAILED, Set.of(), + FAILED, Set.of(FAILED), CANCELLED, Set.of()); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 52e49829087a1..a4fbbf66ef50c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -20,6 +20,7 @@ import io.airbyte.workers.temporal.spec.SpecActivity; import io.airbyte.workers.temporal.sync.DbtTransformationActivity; import io.airbyte.workers.temporal.sync.NormalizationActivity; +import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity; import io.airbyte.workers.temporal.sync.PersistStateActivity; import io.airbyte.workers.temporal.sync.ReplicationActivity; import io.micronaut.context.annotation.Factory; @@ -101,8 +102,9 @@ public List syncActivities( final ReplicationActivity replicationActivity, final NormalizationActivity normalizationActivity, final DbtTransformationActivity dbtTransformationActivity, - final PersistStateActivity persistStateActivity) { - return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); + final PersistStateActivity persistStateActivity, + final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity) { + return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java new file mode 100644 index 0000000000000..f4948483c1878 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.sync; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import java.io.IOException; +import java.util.Optional; + +@ActivityInterface +public interface NormalizationSummaryCheckActivity { + + @ActivityMethod + boolean shouldRunNormalization(Long jobId, Long attemptId, Optional numCommittedRecords) throws IOException; + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java new file mode 100644 index 0000000000000..6d16942f992c1 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.sync; + +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.inject.Singleton; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Singleton +public class NormalizationSummaryCheckActivityImpl implements NormalizationSummaryCheckActivity { + + private final Optional jobPersistence; + + public NormalizationSummaryCheckActivityImpl(final Optional jobPersistence) { + this.jobPersistence = jobPersistence; + } + + @Override + @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") + public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional numCommittedRecords) throws IOException { + // if job persistence is unavailable, default to running normalization + if (jobPersistence.isEmpty()) { + return true; + } + + // if the count of committed records for this attempt is > 0 OR if it is null, + // then we should run normalization + if (numCommittedRecords.get() == null || numCommittedRecords.get() > 0) { + return true; + } + + final List attemptNormalizationStatuses = jobPersistence.get().getAttemptNormalizationStatusesForJob(jobId); + final AtomicLong totalRecordsCommitted = new AtomicLong(0L); + final AtomicBoolean shouldReturnTrue = new AtomicBoolean(false); + + attemptNormalizationStatuses.stream().sorted(Comparator.comparing(AttemptNormalizationStatus::attemptNumber).reversed()).toList() + .forEach(n -> { + if (n.attemptNumber() == attemptNumber) { + return; + } + + // if normalization succeeded from a previous attempt succeeded, + // we can stop looking for previous attempts + if (!n.normalizationFailed()) { + return; + } + + // if normalization failed on past attempt, add number of records committed on that attempt to total + // committed number + // if there is no data recorded for the number of committed records, we should assume that there + // were committed records and run normalization + if (n.recordsCommitted().isEmpty()) { + shouldReturnTrue.set(true); + return; + } else if (n.recordsCommitted().get() != 0L) { + totalRecordsCommitted.addAndGet(n.recordsCommitted().get()); + } + }); + + if (shouldReturnTrue.get() || totalRecordsCommitted.get() > 0L) { + return true; + } + + return false; + + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index ae303e7b84198..9a21f0f0ae11f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -16,6 +16,8 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.temporal.workflow.Workflow; +import java.io.IOException; +import java.util.Optional; import java.util.UUID; import javax.inject.Singleton; import org.slf4j.Logger; @@ -28,6 +30,8 @@ public class SyncWorkflowImpl implements SyncWorkflow { private static final Logger LOGGER = LoggerFactory.getLogger(SyncWorkflowImpl.class); private static final String VERSION_LABEL = "sync-workflow"; private static final int CURRENT_VERSION = 2; + private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check"; + private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1; @TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions") private ReplicationActivity replicationActivity; @@ -37,6 +41,8 @@ public class SyncWorkflowImpl implements SyncWorkflow { private DbtTransformationActivity dbtTransformationActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private PersistStateActivity persistActivity; + @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig, @@ -59,6 +65,23 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) { if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) { + final int normalizationSummaryCheckVersion = + Workflow.getVersion(NORMALIZATION_SUMMARY_CHECK_TAG, Workflow.DEFAULT_VERSION, NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION); + if (normalizationSummaryCheckVersion >= NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION) { + Boolean shouldRun; + try { + shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(), + Optional.of(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted())); + } catch (final IOException e) { + shouldRun = true; + } + if (!shouldRun) { + LOGGER.info("Skipping normalization because there are no records to normalize."); + break; + } + } + + LOGGER.info("generating normalization input"); final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput); final NormalizationSummary normalizationSummary = normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java new file mode 100644 index 0000000000000..4ddd6039db9a8 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.scheduling.activities; + +import static org.mockito.Mockito.mock; + +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; +import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivityImpl; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@Slf4j +@ExtendWith(MockitoExtension.class) +class NormalizationSummaryCheckActivityTest { + + private static final Long JOB_ID = 10L; + static private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity; + static private JobPersistence mJobPersistence; + + @BeforeAll + static void setUp() { + mJobPersistence = mock(JobPersistence.class); + normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(Optional.of(mJobPersistence)); + } + + @Test + void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws IOException { + // Attempt 1 committed records, but normalization failed + // Attempt 2 did not commit records, normalization failed (or did not run) + final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), true); + final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true); + Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + + Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); + } + + @Test + void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws IOException { + Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(30L))); + } + + @Test + void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws IOException { + // No attempts committed any records + // Normalization did not run on any attempts + final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(0L), true); + final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true); + Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); + } + + @Test + void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws IOException { + // Records committed on first two attempts and normalization succeeded + // No records committed on current attempt and normalization has not yet run + final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), false); + final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(20L), false); + Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index fbb4419c43b08..68142b918bbba 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.sync; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -22,6 +22,8 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.SyncStats; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -41,6 +43,7 @@ import io.temporal.common.RetryOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.Worker; +import java.io.IOException; import java.time.Duration; import java.util.List; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -48,6 +51,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +@SuppressWarnings({"PMD.UnusedPrivateField", "PMD.UnusedPrivateMethod"}) class SyncWorkflowTest { // TEMPORAL @@ -59,6 +63,7 @@ class SyncWorkflowTest { private NormalizationActivityImpl normalizationActivity; private DbtTransformationActivityImpl dbtTransformationActivity; private PersistStateActivityImpl persistStateActivity; + private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity; private static final String SYNC_TASK_QUEUE = "SYNC_TASK_QUEUE"; @@ -84,13 +89,15 @@ class SyncWorkflowTest { private NormalizationInput normalizationInput; private OperatorDbtInput operatorDbtInput; private StandardSyncOutput replicationSuccessOutput; + private StandardSyncSummary standardSyncSummary; + private SyncStats syncStats; private NormalizationSummary normalizationSummary; private ActivityOptions longActivityOptions; private ActivityOptions shortActivityOptions; private TemporalProxyHelper temporalProxyHelper; @BeforeEach - void setUp() { + void setUp() throws IOException { testEnv = TestWorkflowEnvironment.newInstance(); syncWorker = testEnv.newWorker(SYNC_TASK_QUEUE); client = testEnv.getWorkflowClient(); @@ -98,7 +105,10 @@ void setUp() { final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(); sync = syncPair.getKey(); syncInput = syncPair.getValue(); - replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()); + + syncStats = new SyncStats().withRecordsCommitted(10L); + standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); + replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(standardSyncSummary); normalizationSummary = new NormalizationSummary(); normalizationInput = new NormalizationInput() @@ -114,7 +124,10 @@ void setUp() { normalizationActivity = mock(NormalizationActivityImpl.class); dbtTransformationActivity = mock(DbtTransformationActivityImpl.class); persistStateActivity = mock(PersistStateActivityImpl.class); + normalizationSummaryCheckActivity = mock(NormalizationSummaryCheckActivityImpl.class); + when(normalizationActivity.generateNormalizationInput(any(), any())).thenReturn(normalizationInput); + when(normalizationSummaryCheckActivity.shouldRunNormalization(any(), any(), any())).thenReturn(true); longActivityOptions = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(3)) @@ -156,7 +169,7 @@ public void tearDown() { // bundle up all the temporal worker setup / execution into one method. private StandardSyncOutput execute() { syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, - persistStateActivity); + persistStateActivity, normalizationSummaryCheckActivity); testEnv.start(); final SyncWorkflow workflow = client.newWorkflowStub(SyncWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(SYNC_TASK_QUEUE).build()); @@ -182,8 +195,10 @@ void testSuccess() { verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); - verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - assertEquals(replicationSuccessOutput.withNormalizationSummary(normalizationSummary), actualOutput); + verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), + operatorDbtInput); + assertEquals(replicationSuccessOutput.withNormalizationSummary(normalizationSummary), + actualOutput); } @Test @@ -266,6 +281,28 @@ void testCancelDuringNormalization() { verifyNoInteractions(dbtTransformationActivity); } + @Test + void testSkipNormalization() throws IOException { + final SyncStats syncStats = new SyncStats().withRecordsCommitted(0L); + final StandardSyncSummary standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); + final StandardSyncOutput replicationSuccessOutputNoRecordsCommitted = + new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(standardSyncSummary); + when(normalizationSummaryCheckActivity.shouldRunNormalization(any(), any(), any())).thenReturn(false); + + doReturn(replicationSuccessOutputNoRecordsCommitted).when(replicationActivity).replicate( + JOB_RUN_CONFIG, + SOURCE_LAUNCHER_CONFIG, + DESTINATION_LAUNCHER_CONFIG, + syncInput); + + execute(); + + verifyReplication(replicationActivity, syncInput); + verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog()); + verifyNoInteractions(normalizationActivity); + verifyNoInteractions(dbtTransformationActivity); + } + @SuppressWarnings("ResultOfMethodCallIgnored") private void cancelWorkflow() { final WorkflowServiceBlockingStub temporalService = testEnv.getWorkflowService().blockingStub();