Skip to content

Commit

Permalink
Added functionality to have multiple keys in compare_keys in Change rule
Browse files Browse the repository at this point in the history
  • Loading branch information
pralabhkumar committed May 19, 2017
1 parent c2f12a2 commit bef5b24
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 58 deletions.
7 changes: 5 additions & 2 deletions elastalert/config.py
Expand Up @@ -236,6 +236,10 @@ def _dt_to_ts_with_format(dt):
rule['compound_aggregation_key'] = rule['aggregation_key']
rule['aggregation_key'] = ','.join(rule['aggregation_key'])

if isinstance(rule.get('compare_key'), list):
rule['compound_compare_key'] = rule['compare_key']
rule['compare_key'] = ','.join(rule['compare_key'])

# Add QK, CK and timestamp to include
include = rule.get('include', ['*'])
if 'query_key' in rule:
Expand All @@ -245,8 +249,7 @@ def _dt_to_ts_with_format(dt):
if 'compound_aggregation_key' in rule:
include += rule['compound_aggregation_key']
if 'compare_key' in rule:
for val in rule['compare_key'].split(',') :
include.append(val)
include.append(rule['compare_key'])
if 'top_count_keys' in rule:
include += rule['top_count_keys']
include.append(rule['timestamp_field'])
Expand Down
9 changes: 4 additions & 5 deletions elastalert/elastalert.py
Expand Up @@ -91,6 +91,7 @@ def __init__(self, args):
self.parse_args(args)
self.debug = self.args.debug
self.verbose = self.args.verbose

if self.verbose or self.debug:
elastalert_logger.setLevel(logging.INFO)

Expand Down Expand Up @@ -194,7 +195,6 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field=
query = {'query': {'filtered': es_filters}}
if sort:
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
#elastalert_logger.info(" Query " + str(query))
return query

def get_terms_query(self, query, size, field, five=False):
Expand Down Expand Up @@ -544,11 +544,10 @@ def run_query(self, rule, start=None, end=None, scroll=False):
start = self.get_index_start(rule['index'])
if end is None:
end = ts_now()

# Reset hit counter and query
rule_inst = rule['type']
#elastalert_logger.info(" inside rule " + rule_inst)
index = self.get_index(rule, start, end)
index = self.get_index(rule, start, end)
if rule.get('use_count_query'):
data = self.get_hits_count(rule, start, end, index)
elif rule.get('use_terms_query'):
Expand Down Expand Up @@ -971,7 +970,7 @@ def start(self):
self.handle_error("%s is not a valid ISO8601 timestamp (YYYY-MM-DDTHH:MM:SS+XX:00)" % (self.starttime))
exit(1)
self.running = True
elastalert_logger.info("Starting up Now")
elastalert_logger.info("Starting up")
while self.running:
next_run = datetime.datetime.utcnow() + self.run_every

Expand Down
53 changes: 22 additions & 31 deletions elastalert/ruletypes.py
Expand Up @@ -158,49 +158,39 @@ class ChangeRule(CompareRule):
required_options = frozenset(['query_key', 'compare_key', 'ignore_null'])
change_map = {}
occurrence_time = {}

def compare(self, event):
key = hashable(lookup_es_key(event, self.rules['query_key']))

values=[]
current_value=""
elastalert_logger.info(" Inside Change Rule " + str(self.occurrences))
for val in self.rules['compare_key'].split(",") :
lookup_value=lookup_es_key(event,val)
if lookup_value is not None :
values.append(lookup_value)
current_value += str(lookup_value)+","


current_value=current_value[0:-1]

elastalert_logger.info(" " + str(values) + " " + current_value)

for val in values :
if not isinstance(val, bool) and not val and self.rules['ignore_null']:
return False

changed = False

values = []
elastalert_logger.info(" Previous Values of compare keys " + str(self.occurrences))
for val in self.rules['compare_key'].split(","):
lookup_value = lookup_es_key(event, val)
values.append(lookup_value)
elastalert_logger.info(" Current Values of compare keys " + str(values))

changed = False
for val in values:
if not isinstance(val, bool) and not val and self.rules['ignore_null']:
return False
# If we have seen this key before, compare it to the new value
if key in self.occurrences:
for idx,previous_values in enumerate(self.occurrences[key].split(",")) :
elastalert_logger.info(" " + str(previous_values) + " " + str(values[idx]))
changed = int(previous_values) != int(values[idx])
if(changed) :
break
for idx, previous_values in enumerate(self.occurrences[key]):
elastalert_logger.info(" " + str(previous_values) + " " + str(values[idx]))
changed = previous_values != values[idx]
if(changed):
break
if changed:
self.change_map[key] = (self.occurrences[key], val)

self.change_map[key] = (self.occurrences[key], values)
# If using timeframe, only return true if the time delta is < timeframe
if key in self.occurrence_time:
changed = event[self.rules['timestamp_field']] - self.occurrence_time[key] <= self.rules['timeframe']

# Update the current value and time
self.occurrences[key] = current_value
elastalert_logger.info(" Setting current value of compare keys values " + str(values))
self.occurrences[key] = values
if 'timeframe' in self.rules:
self.occurrence_time[key] = event[self.rules['timestamp_field']]
elastalert_logger.info("Final Value " + str(changed))
elastalert_logger.info("Final result of comparision between previous and current values " + str(changed))
return changed

def add_match(self, match):
Expand All @@ -212,6 +202,7 @@ def add_match(self, match):
if change:
extra = {'old_value': change[0],
'new_value': change[1]}
elastalert_logger.info("Description of the changed records " + str(dict(match.items() + extra.items())))
super(ChangeRule, self).add_match(dict(match.items() + extra.items()))


Expand Down
7 changes: 4 additions & 3 deletions elastalert/schema.yaml
Expand Up @@ -38,22 +38,22 @@ oneOf:
required: [blacklist, compare_key]
properties:
type: {enum: [blacklist]}
compare_key: {type: string}
compare_key: {'items': {'type': 'string'},'type': ['string', 'array']}
blacklist: {type: array, items: {type: string}}

- title: Whitelist
required: [whitelist, compare_key, ignore_null]
properties:
type: {enum: [whitelist]}
compare_key: {type: string}
compare_key: {'items': {'type': 'string'},'type': ['string', 'array']}
whitelist: {type: array, items: {type: string}}
ignore_null: {type: boolean}

- title: Change
required: [query_key, compare_key, ignore_null]
properties:
type: {enum: [change]}
compare_key: {type: string}
compare_key: {'items': {'type': 'string'},'type': ['string', 'array']}
ignore_null: {type: boolean}
timeframe: *timeframe

Expand Down Expand Up @@ -264,3 +264,4 @@ properties:
### Simple
simple_webhook_url: *arrayOfString
simple_proxy: {type: string}

20 changes: 10 additions & 10 deletions example_rules/example_change.yaml
Expand Up @@ -6,11 +6,11 @@

# (Optional)
# Elasticsearch host
es_host: elasticsearch.example.com
# es_host: elasticsearch.example.com

# (Optional)
# Elasticsearch port
# es_port: 9000
# es_port: 14900

# (Optional) Connect with SSL to Elasticsearch
#use_ssl: True
Expand All @@ -30,11 +30,11 @@ type: change

# (Required)
# Index to search, wildcard supported
index: change-index*
index: logstash-*

# (Required, change specific)
# The field to look for changes in
compare_key: country_name,country_col
compare_key: country_name

# (Required, change specific)
# Ignore documents without the compare_key (country_name) field
Expand All @@ -47,16 +47,16 @@ query_key: username
# (Required, change specific)
# The value of compare_key must change in two events that are less than timeframe apart to trigger an alert
timeframe:
minutes: 2
days: 1

# (Required)
# A list of Elasticsearch filters used for find events
# These filters are joined with AND and nested in a filtered query
# For more info: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html
filter: []
#- query:
# query_string:
# query: "document_type: login"
filter:
- query:
query_string:
query: "document_type: login"

# (Required)
# The alert is use when a match is found
Expand All @@ -66,4 +66,4 @@ alert:
# (required, email specific)
# a list of email addresses to send alerts to
email:
- "abcd@efg.com"
- "elastalert@example.com"
19 changes: 12 additions & 7 deletions tests/rules_test.py
Expand Up @@ -63,6 +63,8 @@ def assert_matches_have(matches, terms):
for match, term in zip(matches, terms):
assert term[0] in match
assert match[term[0]] == term[1]
if len(term) > 2:
assert match[term[2]] == term[3]


def test_any():
Expand Down Expand Up @@ -450,36 +452,39 @@ def test_whitelist_dont_ignore_nulls():


def test_change():
events = hits(10, username='qlo', term='good')
events = hits(10, username='qlo', term='good', second_term='yes')
events[8].pop('term')
events[8].pop('second_term')
events[9]['term'] = 'bad'
rules = {'compare_key': 'term',
events[9]['second_term'] = 'no'
rules = {'compare_key': 'term,second_term',
'query_key': 'username',
'ignore_null': True,
'timestamp_field': '@timestamp'}
rule = ChangeRule(rules)
rule.add_data(events)
assert_matches_have(rule.matches, [('term', 'bad')])
assert_matches_have(rule.matches, [('term', 'bad', 'second_term', 'no')])

# Unhashable QK
events2 = hits(10, username=['qlo'], term='good')
events2 = hits(10, username=['qlo'], term='good', second_term='yes')
events2[9]['term'] = 'bad'
events2[9]['second_term'] = 'no'
rule = ChangeRule(rules)
rule.add_data(events2)
assert_matches_have(rule.matches, [('term', 'bad')])
assert_matches_have(rule.matches, [('term', 'bad', 'second_term', 'no')])

# Don't ignore nulls
rules['ignore_null'] = False
rule = ChangeRule(rules)
rule.add_data(events)
assert_matches_have(rule.matches, [('username', 'qlo'), ('term', 'bad')])
assert_matches_have(rule.matches, [('username', 'qlo'), ('term', 'bad', 'second_term', 'no')])

# With timeframe
rules['timeframe'] = datetime.timedelta(seconds=2)
rules['ignore_null'] = True
rule = ChangeRule(rules)
rule.add_data(events)
assert_matches_have(rule.matches, [('term', 'bad')])
assert_matches_have(rule.matches, [('term', 'bad', 'second_term', 'no')])

# With timeframe, doesn't match
events = events[:8] + events[9:]
Expand Down

0 comments on commit bef5b24

Please sign in to comment.