Skip to content

Commit

Permalink
EQL: Introduce support for sequence maxspan (#58635)
Browse files Browse the repository at this point in the history
EQL sequences can specify now a maximum time allowed for their span
(computed between the first and the last matching event).

(cherry picked from commit 747c359)
  • Loading branch information
costin committed Jun 29, 2020
1 parent 773f357 commit 3a546f1
Show file tree
Hide file tree
Showing 22 changed files with 286 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;

public class DataLoader {

private static final String TEST_DATA = "/test_data.json";
Expand Down Expand Up @@ -64,7 +67,10 @@ protected static void loadDatasetIntoEs(RestHighLevelClient client,
try (XContentParser parser = p.apply(JsonXContent.jsonXContent, DataLoader.class.getResourceAsStream(TEST_DATA))) {
List<Object> list = parser.list();
for (Object item : list) {
bulk.add(new IndexRequest(testIndexName).source((Map<String, Object>) item, XContentType.JSON));
assertThat(item, instanceOf(Map.class));
Map<String, Object> entry = (Map<String, Object>) item;
transformDataset(entry);
bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON));
}
}

Expand All @@ -78,6 +84,23 @@ protected static void loadDatasetIntoEs(RestHighLevelClient client,
}
}

private static void transformDataset(Map<String, Object> entry) {
Object object = entry.get("timestamp");
assertThat(object, instanceOf(Long.class));
Long ts = (Long) object;
// currently this is windows filetime
entry.put("@timestamp", winFileTimeToUnix(ts));
}


private static final long FILETIME_EPOCH_DIFF = 11644473600000L;
private static final long FILETIME_ONE_MILLISECOND = 10 * 1000;

public static long winFileTimeToUnix(final long filetime) {
long ts = (filetime / FILETIME_ONE_MILLISECOND);
return ts - FILETIME_EPOCH_DIFF;
}

private static XContentParser createParser(XContent xContent, InputStream data) throws IOException {
NamedXContentRegistry contentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
return xContent.createParser(contentRegistry, LoggingDeprecationHandler.INSTANCE, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
"type" : "date"
},
"@timestamp" : {
"type" : "alias",
"path" : "timestamp"
"type" : "date"
},
"user" : {
"type" : "keyword"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,51 +336,6 @@ until
'''
expected_event_ids = [54, 55, 61, 67]

[[queries]]
query = '''
sequence
[process where opcode == 1] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
until
[file where opcode == 200] by unique_pid, process_path
'''


[[queries]]
note = "Sequence: non-field based join."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2
[process where true] by unique_ppid * 2
'''
expected_event_ids = [1, 2,
2, 3]


[[queries]]
note = "Sequence: multiple non-field based joins."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid)
[process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid)
'''
expected_event_ids = [1, 2,
2, 3]

[[queries]]
query = '''
sequence with maxspan=500ms
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2
'''
expected_event_ids = []

[[queries]]
query = '''
sequence
Expand Down Expand Up @@ -1026,6 +981,28 @@ query = '''
registry where arrayContains(bytes_written_string_list, "missing", "en-US")
'''


[[queries]]
note = "Sequence: non-field based join."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2
[process where true] by unique_ppid * 2
'''
expected_event_ids = [1, 2,
2, 3]


[[queries]]
note = "Sequence: multiple non-field based joins."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid)
[process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid)
'''
expected_event_ids = [1, 2,
2, 3]

# TODO: update toggles for this function
[[queries]]
case_sensitive = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.util.List;
Expand All @@ -22,19 +24,25 @@ public class Criterion implements QueryRequest {
private final HitExtractor tiebreakerExtractor;

// search after markers
private Object[] startMarker;
private Object[] stopMarker;
private Ordinal startMarker;
private Ordinal stopMarker;

private boolean reverse;

//TODO: should accept QueryRequest instead of another SearchSourceBuilder
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor) {
public Criterion(SearchSourceBuilder searchSource,
List<HitExtractor> searchAfterExractors,
HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor,
boolean reverse) {
this.searchSource = searchSource;
this.keyExtractors = searchAfterExractors;
this.timestampExtractor = timestampExtractor;
this.tiebreakerExtractor = tiebreakerExtractor;

this.startMarker = null;
this.stopMarker = null;
this.reverse = reverse;
}

@Override
Expand All @@ -54,54 +62,45 @@ public HitExtractor tiebreakerExtractor() {
return tiebreakerExtractor;
}

public long timestamp(SearchHit hit) {
@SuppressWarnings({ "unchecked" })
public Ordinal ordinal(SearchHit hit) {

Object ts = timestampExtractor.extract(hit);
if (ts instanceof Number) {
return ((Number) ts).longValue();
if (ts instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}

@SuppressWarnings({ "unchecked" })
public Comparable<Object> tiebreaker(SearchHit hit) {
if (tiebreakerExtractor == null) {
return null;
}
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable) {
return (Comparable<Object>) tb;
}
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
long timestamp = ((Number) ts).longValue();
Comparable<Object> tiebreaker = null;

public Object[] startMarker() {
return startMarker;
if (tiebreakerExtractor != null) {
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable == false) {
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
tiebreaker = (Comparable<Object>) tb;
}
return new Ordinal(timestamp, tiebreaker);
}

public Object[] stopMarker() {
return stopMarker;
public void startMarker(Ordinal ordinal) {
startMarker = ordinal;
}

private Object[] marker(SearchHit hit) {
long timestamp = timestamp(hit);
Object tiebreaker = null;
if (tiebreakerExtractor() != null) {
tiebreaker = tiebreaker(hit);
}

return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
public void stopMarker(Ordinal ordinal) {
stopMarker = ordinal;
}

public void startMarker(SearchHit hit) {
startMarker = marker(hit);
public Ordinal nextMarker() {
return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker;
}

public void stopMarker(SearchHit hit) {
stopMarker = marker(hit);
public Criterion useMarker(Ordinal marker) {
searchSource.searchAfter(marker.toArray());
return this;
}

public Criterion useMarker(Object[] marker) {
searchSource.searchAfter(marker);
return this;
public Iterable<SearchHit> iterable(List<SearchHit> hits) {
return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
Expand Down Expand Up @@ -43,11 +44,14 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction,
TimeValue maxSpan,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();

List<Criterion> criteria = new ArrayList<>(plans.size() - 1);

boolean descending = direction == OrderDirection.DESC;

// build a criterion for each query
for (int i = 0; i < plans.size() - 1; i++) {
List<Attribute> keys = listOfKeys.get(i);
Expand All @@ -61,9 +65,10 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
// TODO: this could be generalized into an exec only query
Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass());
QueryRequest request = ((EsQueryExec) query).queryRequest(session);
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor));
// base query remains descending, the rest need to flip
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending));
}
return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit);
return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit);
}

private HitExtractor timestampExtractor(HitExtractor hitExtractor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;

import java.util.Objects;

class KeyAndOrdinal {
final SequenceKey key;
final long timestamp;
final Comparable<Object> tiebreaker;
final Ordinal ordinal;

KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tiebreaker) {
KeyAndOrdinal(SequenceKey key, Ordinal ordinal) {
this.key = key;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.ordinal = ordinal;
}

@Override
public int hashCode() {
return Objects.hash(key, timestamp, tiebreaker);
return Objects.hash(key, ordinal);
}

@Override
Expand All @@ -38,12 +37,11 @@ public boolean equals(Object obj) {

KeyAndOrdinal other = (KeyAndOrdinal) obj;
return Objects.equals(key, other.key)
&& Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
&& Objects.equals(ordinal, other.ordinal);
}

@Override
public String toString() {
return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]";
return key.toString() + ordinal.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

class SequencePayload extends AbstractPayload {

private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;

SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
super(timedOut, timeTook, nextKeys);
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
sequences = new ArrayList<>(seq.size());
for (Sequence s : seq) {
boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0);

for (Iterator<Sequence> it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) {
Sequence s = it.next();
sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
}
}
Expand Down

0 comments on commit 3a546f1

Please sign in to comment.