Skip to content

Commit

Permalink
Only run normalization when needed (#16794)
Browse files Browse the repository at this point in the history
Only run normalization when records have been committed
  • Loading branch information
alovew committed Sep 21, 2022
1 parent bded0ac commit f28d5f3
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.AttemptStatus;
import io.airbyte.persistence.job.models.AttemptWithJobInfo;
import io.airbyte.persistence.job.models.Job;
Expand Down Expand Up @@ -625,6 +626,18 @@ public List<AttemptWithJobInfo> listAttemptsWithJobInfo(final ConfigType configT
timeConvertedIntoLocalDateTime)));
}

@Override
public List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(ATTEMPTS.ATTEMPT_NUMBER, SYNC_STATS.RECORDS_COMMITTED, NORMALIZATION_SUMMARIES.FAILURES)
.from(ATTEMPTS)
.join(SYNC_STATS).on(SYNC_STATS.ATTEMPT_ID.eq(ATTEMPTS.ID))
.leftJoin(NORMALIZATION_SUMMARIES).on(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(ATTEMPTS.ID))
.where(ATTEMPTS.JOB_ID.eq(jobId))
.fetch(record -> new AttemptNormalizationStatus(record.get(ATTEMPTS.ATTEMPT_NUMBER),
Optional.of(record.get(SYNC_STATS.RECORDS_COMMITTED)), record.get(NORMALIZATION_SUMMARIES.FAILURES) != null)));
}

// Retrieves only Job information from the record, without any attempt info
private static Job getJobFromRecord(final Record record) {
return new Job(record.get(JOB_ID, Long.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.AttemptWithJobInfo;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
Expand Down Expand Up @@ -294,4 +295,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setSchedulerMigrationDone() throws IOException;

List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.persistence.job.models;

import java.util.Optional;

public record AttemptNormalizationStatus(long attemptNumber, Optional<Long> recordsCommitted, boolean normalizationFailed) {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public enum JobStatus {
RUNNING, Set.of(INCOMPLETE, SUCCEEDED, FAILED, CANCELLED),
INCOMPLETE, Set.of(PENDING, RUNNING, FAILED, CANCELLED, INCOMPLETE),
SUCCEEDED, Set.of(),
FAILED, Set.of(),
FAILED, Set.of(FAILED),
CANCELLED, Set.of());

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.workers.temporal.spec.SpecActivity;
import io.airbyte.workers.temporal.sync.DbtTransformationActivity;
import io.airbyte.workers.temporal.sync.NormalizationActivity;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity;
import io.airbyte.workers.temporal.sync.PersistStateActivity;
import io.airbyte.workers.temporal.sync.ReplicationActivity;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -101,8 +102,9 @@ public List<Object> syncActivities(
final ReplicationActivity replicationActivity,
final NormalizationActivity normalizationActivity,
final DbtTransformationActivity dbtTransformationActivity,
final PersistStateActivity persistStateActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.util.Optional;

@ActivityInterface
public interface NormalizationSummaryCheckActivity {

@ActivityMethod
boolean shouldRunNormalization(Long jobId, Long attemptId, Optional<Long> numCommittedRecords) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class NormalizationSummaryCheckActivityImpl implements NormalizationSummaryCheckActivity {

private final Optional<JobPersistence> jobPersistence;

public NormalizationSummaryCheckActivityImpl(final Optional<JobPersistence> jobPersistence) {
this.jobPersistence = jobPersistence;
}

@Override
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) throws IOException {
// if job persistence is unavailable, default to running normalization
if (jobPersistence.isEmpty()) {
return true;
}

// if the count of committed records for this attempt is > 0 OR if it is null,
// then we should run normalization
if (numCommittedRecords.get() == null || numCommittedRecords.get() > 0) {
return true;
}

final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.get().getAttemptNormalizationStatusesForJob(jobId);
final AtomicLong totalRecordsCommitted = new AtomicLong(0L);
final AtomicBoolean shouldReturnTrue = new AtomicBoolean(false);

attemptNormalizationStatuses.stream().sorted(Comparator.comparing(AttemptNormalizationStatus::attemptNumber).reversed()).toList()
.forEach(n -> {
if (n.attemptNumber() == attemptNumber) {
return;
}

// if normalization succeeded from a previous attempt succeeded,
// we can stop looking for previous attempts
if (!n.normalizationFailed()) {
return;
}

// if normalization failed on past attempt, add number of records committed on that attempt to total
// committed number
// if there is no data recorded for the number of committed records, we should assume that there
// were committed records and run normalization
if (n.recordsCommitted().isEmpty()) {
shouldReturnTrue.set(true);
return;
} else if (n.recordsCommitted().get() != 0L) {
totalRecordsCommitted.addAndGet(n.recordsCommitted().get());
}
});

if (shouldReturnTrue.get() || totalRecordsCommitted.get() > 0L) {
return true;
}

return false;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import javax.inject.Singleton;
import org.slf4j.Logger;
Expand All @@ -28,6 +30,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncWorkflowImpl.class);
private static final String VERSION_LABEL = "sync-workflow";
private static final int CURRENT_VERSION = 2;
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -37,6 +41,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private DbtTransformationActivity dbtTransformationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private PersistStateActivity persistActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;

@Override
public StandardSyncOutput run(final JobRunConfig jobRunConfig,
Expand All @@ -59,6 +65,23 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) {
for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) {
if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) {
final int normalizationSummaryCheckVersion =
Workflow.getVersion(NORMALIZATION_SUMMARY_CHECK_TAG, Workflow.DEFAULT_VERSION, NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION);
if (normalizationSummaryCheckVersion >= NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION) {
Boolean shouldRun;
try {
shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(),
Optional.of(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted()));
} catch (final IOException e) {
shouldRun = true;
}
if (!shouldRun) {
LOGGER.info("Skipping normalization because there are no records to normalize.");
break;
}
}

LOGGER.info("generating normalization input");
final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput);
final NormalizationSummary normalizationSummary =
normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.Mockito.mock;

import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivityImpl;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@Slf4j
@ExtendWith(MockitoExtension.class)
class NormalizationSummaryCheckActivityTest {

private static final Long JOB_ID = 10L;
static private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity;
static private JobPersistence mJobPersistence;

@BeforeAll
static void setUp() {
mJobPersistence = mock(JobPersistence.class);
normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(Optional.of(mJobPersistence));
}

@Test
void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws IOException {
// Attempt 1 committed records, but normalization failed
// Attempt 2 did not commit records, normalization failed (or did not run)
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), true);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));

Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

@Test
void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws IOException {
Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(30L)));
}

@Test
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws IOException {
// No attempts committed any records
// Normalization did not run on any attempts
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(0L), true);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

@Test
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws IOException {
// Records committed on first two attempts and normalization succeeded
// No records committed on current attempt and normalization has not yet run
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), false);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(20L), false);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

}

0 comments on commit f28d5f3

Please sign in to comment.