Skip to content

Commit

Permalink
accept null value when evaluating whether to skip normalization (#17537)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Oct 3, 2022
1 parent 66dafd8 commit 65cd7e8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
Boolean shouldRun;
try {
shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(),
Optional.of(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted()));
Optional.ofNullable(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted()));
} catch (final IOException e) {
shouldRun = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand Down Expand Up @@ -89,7 +90,9 @@ class SyncWorkflowTest {
private NormalizationInput normalizationInput;
private OperatorDbtInput operatorDbtInput;
private StandardSyncOutput replicationSuccessOutput;
private StandardSyncOutput replicationFailOutput;
private StandardSyncSummary standardSyncSummary;
private StandardSyncSummary failedSyncSummary;
private SyncStats syncStats;
private NormalizationSummary normalizationSummary;
private ActivityOptions longActivityOptions;
Expand All @@ -108,7 +111,10 @@ void setUp() throws IOException {

syncStats = new SyncStats().withRecordsCommitted(10L);
standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats);
failedSyncSummary = new StandardSyncSummary().withStatus(ReplicationStatus.FAILED).withTotalStats(new SyncStats().withRecordsEmitted(0L));
replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(standardSyncSummary);
replicationFailOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(failedSyncSummary);

normalizationSummary = new NormalizationSummary();

normalizationInput = new NormalizationInput()
Expand Down Expand Up @@ -217,6 +223,30 @@ void testReplicationFailure() {
verifyNoInteractions(dbtTransformationActivity);
}

@Test
void testReplicationFailedGracefully() {
doReturn(replicationFailOutput).when(replicationActivity).replicate(
JOB_RUN_CONFIG,
SOURCE_LAUNCHER_CONFIG,
DESTINATION_LAUNCHER_CONFIG,
syncInput);

doReturn(normalizationSummary).when(normalizationActivity).normalize(
JOB_RUN_CONFIG,
DESTINATION_LAUNCHER_CONFIG,
normalizationInput);

final StandardSyncOutput actualOutput = execute();

verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
assertEquals(replicationFailOutput.withNormalizationSummary(normalizationSummary),
actualOutput);
}

@Test
void testNormalizationFailure() {
doReturn(replicationSuccessOutput).when(replicationActivity).replicate(
Expand Down

0 comments on commit 65cd7e8

Please sign in to comment.