Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK] introduce circuit breaker #2407

Merged
merged 3 commits into from Feb 15, 2024
Merged

Conversation

pawel-big-lebowski
Copy link
Contributor

@pawel-big-lebowski pawel-big-lebowski commented Feb 1, 2024

Problem

Add mechanism to prevent from over-instrumentation which turns off Openlineage integration if specified criteria are met.

Closes: #1255

Solution

  • Implement circuit breaker config section within java-client,
  • Implement test circuit breaker, which opens/closes based on constructor param,
  • Implement Spark / Flink integration with an integration test based on test circuit breaker
  • Implement memory circuit breaker, which closes based on amount of free memory available
  • Implement Java runtime circuit breaker, which closes based on memory and time spent on Garbage Collection

Note: All schema changes require discussion. Please link the issue for context.

  • Your change modifies the core OpenLineage model
  • Your change modifies one or more OpenLineage facets

If you're contributing a new integration, please specify the scope of the integration and how/where it has been tested (e.g., Apache Spark integration supports S3 and GCS filesystem operations, tested with AWS EMR).

One-line summary:

Checklist

  • You've signed-off your work
  • Your pull request title follows our guidelines
  • Your changes are accompanied by tests (if relevant)
  • Your change contains a small diff and is self-contained
  • You've updated any relevant documentation (if relevant)
  • Your comment includes a one-liner for the changelog about the specific purpose of the change (if necessary)
  • You've versioned the core OpenLineage model or facets according to SchemaVer (if relevant)
  • You've added a header to source files (if relevant)

SPDX-License-Identifier: Apache-2.0
Copyright 2018-2023 contributors to the OpenLineage project

Copy link
Member

@mobuchowski mobuchowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pawel-big-lebowski for the awesome work! I have a few comments.

More complex version of circuit breaker. The amount of free memory can be low as long as
amount of time spent on Garbage Collection is acceptable. `JavaRuntimeCircuitBreaker` closes
when free memory drops below threshold and amount of time spent on garbage collection exceeds
given threshold (`10%` by default). The circuit breaker is always open when checked for the first time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general: I don't think we should have defaults here. If user hasn't configured any threshold, it should mean that we don't take this particular factor into account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's based on this circuit breaker https://docs.newrelic.com/docs/apm/agents/java-agent/custom-instrumentation/circuit-breaker-java-custom-instrumentation/ which has the same default values.

I think it makes sense for people to turn something on and experiment with values later when needed.


boolean isClosed();

default boolean isOpen() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isOpen and isClosed could provide additional information about reason of it being closed or open.

I think returning something some object that has both the boolean and the reason would be nice.

import java.util.stream.Collectors;
import lombok.NonNull;

public class TestCircuitBreaker extends CommonCircuitBreaker {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have TestCircuitBreaker in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this for integration tests within Spark and Flink integration, so the class shall be part of a openlineage-java jar. I can rename it into StaticCircuitCreaker, because it closes/opens based on list of boolean values true, false,true provided.

} else if (value instanceof TestCircuitBreaker) {
return CIRCUIT_BREAKER_TYPE_TEST;
}
throw new UnsupportedOperationException("Unsupported circuit breaker " + value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we define custom CircuitBreaker similar to Transport?

If we want to allow that later but not do this in this PR then I'd document that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the ServiceLoader approach that we use for Transport. I agree the code with if/else chain has some code smell. I will introduce config builders as we have for transport. Custom circuit breaker is outside the scope of this PR.

try {
return callable.call();
} catch (Exception e) {
log.error("OpenLineage callable failed to execute. Swallowing the exception {}", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing the exception is not exactly no-op, do we really want that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen in case of some uncaught exception of OL integration. This should not happen, but if this is happens we should catch it. I think we want this.

@pawel-big-lebowski pawel-big-lebowski force-pushed the spark/circuit-breaker branch 4 times, most recently from fd2200d to 1668468 Compare February 14, 2024 09:55
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
@pawel-big-lebowski pawel-big-lebowski merged commit 3dad978 into main Feb 15, 2024
30 checks passed
@pawel-big-lebowski pawel-big-lebowski deleted the spark/circuit-breaker branch February 15, 2024 10:18
algorithmy1 pushed a commit to algorithmy1/OpenLineage that referenced this pull request Feb 15, 2024
* [SPARK][FLINK] introduce circuit breaker

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* [SPARK] daemon circuit breaker

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* [SPARK] pmd fix for java client

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

---------

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL] Add circuit breaker to java client
2 participants