Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only run normalization when needed #16794

Merged
merged 6 commits into from
Sep 21, 2022

Conversation

alovew
Copy link
Contributor

@alovew alovew commented Sep 15, 2022

Resolves #9672

Only run normalization if there are records that have been committed that have not yet been normalized

@github-actions github-actions bot added area/platform issues related to the platform area/worker Related to worker labels Sep 15, 2022
@alovew alovew temporarily deployed to more-secrets September 15, 2022 23:56 Inactive
@alovew alovew temporarily deployed to more-secrets September 19, 2022 17:59 Inactive
@alovew alovew temporarily deployed to more-secrets September 19, 2022 21:56 Inactive
@alovew alovew force-pushed the anne/check-before-running-normalization branch from 9a6df23 to c415820 Compare September 20, 2022 17:51
@alovew alovew temporarily deployed to more-secrets September 20, 2022 17:53 Inactive
@alovew alovew temporarily deployed to more-secrets September 20, 2022 21:34 Inactive
@alovew alovew marked this pull request as ready for review September 20, 2022 22:48
@alovew alovew temporarily deployed to more-secrets September 20, 2022 22:50 Inactive
logging

shouldRunNormalization method with tests

update

fix syncworkflowtests

add activity version

tests
@alovew alovew force-pushed the anne/check-before-running-normalization branch from ddedce3 to 3976351 Compare September 20, 2022 23:10
@alovew alovew temporarily deployed to more-secrets September 20, 2022 23:12 Inactive
@alovew alovew temporarily deployed to more-secrets September 20, 2022 23:16 Inactive
@alovew alovew temporarily deployed to more-secrets September 21, 2022 01:03 Inactive
Copy link
Member

@colesnodgrass colesnodgrass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely need to change the Optional.get() == null check, I believe everything else is a recommendation.

Comment on lines 29 to 33
if (origin != null) {
return origin.value();
} else {
return null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this method is preexisting, but it feels unnecessary. The FailureOrigin is provided by the caller and this simply calls a method on the provided class?

Additionally this method is only called once:

for (final FailureReason reason : failureSummary.getFailures()) {
  MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
    new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}

Seems that we could move this functionality to this^ spot instead of having a dependency on MetricTags. Unless there is something I am missing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was from when I was testing and running into odd errors - I'm not sure this actually ever happens and I can remove this since it's unrelated anyway!


import java.util.Optional;

public class AttemptNormalizationStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be replaced with a record class?
Should normalizationFailed be a boolean stead, forcing a true/false to be provided?

Comment on lines 26 to 27
@Inject
private JobPersistence jobPersistence;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per both my preference and Micronaut's preference

Field injection makes it harder to understand a class’s requirements, making it easier to get a NullPointerException when testing a class using Field Injection. We recommend you use Constructor Injection.

could we change this to be constructor injected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the @AllArgsConstructor will create a constructor that wins over the field injection. This is probably working because we don't have any qualifiers or configuration properties here. However, per @colesnodgrass's request, we should use explicit constructors here and avoid Lombok/field injection to make the code easier to read.


@Override
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public Boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return boolean instead? It appears we only return true/false and never null.

// if normalization failed on past attempt,
// add number of records committed on that attempt to
// total committed number
if (n.getNormalizationFailed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getNormalizationFailed returned a boolean instead of a Boolean, this if statement could be removed, as the above if statement already verified it was false.

}

final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.getAttemptNormalizationStatusesForJob(jobId);
final AtomicReference<Long> totalRecordsCommitted = new AtomicReference<>(0L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be an AtomicLong


final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.getAttemptNormalizationStatusesForJob(jobId);
final AtomicReference<Long> totalRecordsCommitted = new AtomicReference<>(0L);
final AtomicReference<Boolean> shouldReturnTrue = new AtomicReference<>(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be an AtomicBoolean

Comment on lines 60 to 65
if (n.getRecordsCommitted().get() == null) {
shouldReturnTrue.set(true);
return;
} else if (n.getRecordsCommitted().get() != 0L) {
totalRecordsCommitted.set(totalRecordsCommitted.get() + n.getRecordsCommitted().get());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling Optional.get() == null doesn't work as Optional.get() throws an NPE if the optional is empty.

Maybe replace this with Optional.ifPresentOrElse?

n.getRecordsComitted.ifPresentOrElse(
  // can use addAndGet if using an AtomicLong
  numRecords -> totalRecordsComitted.addAndGet(numRecords),
  () -> shouldReturnTrue.set(true)
);

@alovew alovew temporarily deployed to more-secrets September 21, 2022 19:32 Inactive
@alovew alovew temporarily deployed to more-secrets September 21, 2022 20:36 Inactive
// were committed records and run normalization
if (n.recordsCommitted().isEmpty()) {
shouldReturnTrue.set(true);
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return is not needed here as pointed out by my IDE

return is unnecessary as the last statement in a void method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return is to beak out of the iteration block instead of returning a value from the method overall

shouldReturnTrue.set(true);
return;
} else if (n.recordsCommitted().get() != 0L) {
totalRecordsCommitted.set(totalRecordsCommitted.get() + n.recordsCommitted().get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.set should be replaced with .addAndGet(n.recordsCommitted().get()) as this will avoids the totalRecordsCommitted.get() call

Copy link
Member

@colesnodgrass colesnodgrass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one recommended change from the set call to addAndGet, otherwise 👍🏻

@alovew alovew temporarily deployed to more-secrets September 21, 2022 21:21 Inactive

private final Optional<JobPersistence> jobPersistence;

public NormalizationSummaryCheckActivityImpl(final Optional<JobPersistence> jobPersistence) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for the optional JobPersistence?
If it's related to the data plane worker, feels like we may want to have a follow up to be able to use API instead of direct DB access here, otherwise multi-cloud will run into this issue again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's exactly the reason - i agree, didn't realize while I was writing this but changing this to use the API instead later makes sense. It's not hugely crucial in this case since all that will happen is that normalization will run when it doesn't need to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end, this PR is still a step forward. I think this also highlights the fact that we need to focus on removing direct DB access from the workers.

cc @jdpgrailsdev in case you were keeping track of the many reasons we should be doing this ;)

@alovew alovew merged commit f28d5f3 into master Sep 21, 2022
@alovew alovew deleted the anne/check-before-running-normalization branch September 21, 2022 23:50
Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just want to double check on how we skip normalization, it might be a bit aggressive but I am not that familiar with the following steps.


// if the count of committed records for this attempt is > 0 OR if it is null,
// then we should run normalization
if (numCommittedRecords.get() == null || numCommittedRecords.get() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the null check should be numCommittedRecords.isEmpty() or .isPresent()

}
if (!shouldRun) {
LOGGER.info("Skipping normalization because there are no records to normalize.");
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to break in this case? I think there may be other steps after normalization, like dbt for example.

robbinhan pushed a commit to robbinhan/airbyte that referenced this pull request Sep 29, 2022
Only run normalization when records have been committed
jhammarstedt pushed a commit to jhammarstedt/airbyte that referenced this pull request Oct 31, 2022
Only run normalization when records have been committed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do not run normalization if no records are committed or pending records from a previous partial sync
4 participants