Skip to content

Commit

Permalink
EQL: Clean any used memory by the sequence matcher and circuit breake…
Browse files Browse the repository at this point in the history
…r used bytes in case of exception (#84451) (#84803)

(cherry picked from commit 4c243d6)
  • Loading branch information
astefan committed Mar 9, 2022
1 parent 9d527c5 commit 2b56626
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 17 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/84451.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 84451
summary: Clean any used memory by the sequence matcher and circuit breaker used bytes
in case of exception
area: EQL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.ActionListener.runAfter;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits;
import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
Expand Down Expand Up @@ -122,7 +123,11 @@ public TumblingWindow(
public void execute(ActionListener<Payload> listener) {
log.trace("Starting sequence window w/ fetch size [{}]", windowSize);
startTime = System.currentTimeMillis();
tumbleWindow(0, listener);
// clear the memory at the end of the algorithm
tumbleWindow(0, runAfter(listener, () -> {
matcher.clear();
client.close(listener.delegateFailure((l, r) -> {}));
}));
}

/**
Expand Down Expand Up @@ -541,7 +546,6 @@ private void payload(ActionListener<Payload> listener) {

if (completed.isEmpty()) {
listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook()));
close(listener);
return;
}

Expand All @@ -551,16 +555,10 @@ private void payload(ActionListener<Payload> listener) {
Collections.reverse(completed);
}
SequencePayload payload = new SequencePayload(completed, listOfHits, false, timeTook());
close(listener);
return payload;
}));
}

private void close(ActionListener<Payload> listener) {
matcher.clear();
client.close(listener.delegateFailure((l, r) -> {}));
}

private TimeValue timeTook() {
return new TimeValue(System.currentTimeMillis() - startTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@

public class EqlPlugin extends Plugin implements ActionPlugin, CircuitBreakerPlugin {

private static final String CIRCUIT_BREAKER_NAME = "eql_sequence";
private static final long CIRCUIT_BREAKER_LIMIT = (long) ((0.50) * JvmInfo.jvmInfo().getMem().getHeapMax().getBytes());
private static final double CIRCUIT_BREAKER_OVERHEAD = 1.0D;
public static final String CIRCUIT_BREAKER_NAME = "eql_sequence";
public static final long CIRCUIT_BREAKER_LIMIT = (long) ((0.50) * JvmInfo.jvmInfo().getMem().getHeapMax().getBytes());
public static final double CIRCUIT_BREAKER_OVERHEAD = 1.0D;
private final SetOnce<CircuitBreaker> circuitBreaker = new SetOnce<>();

public static final Setting<Boolean> EQL_ENABLED_SETTING = Setting.boolSetting(
Expand Down

0 comments on commit 2b56626

Please sign in to comment.