Skip to content

Commit

Permalink
changed persistence handling in multiple classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ernst Leierzopf committed Apr 14, 2021
1 parent 21c2ba2 commit e257fd7
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 130 deletions.
4 changes: 2 additions & 2 deletions aecid-testsuite/demo/aminer/demo-config.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,14 @@ def build_analysis_pipeline(analysis_context):
atom_filter.add_handler(ecd)

from aminer.analysis.EventFrequencyDetector import EventFrequencyDetector
efd = EventFrequencyDetector(analysis_context.aminer_config, anomaly_event_handlers, window_size=0.5)
efd = EventFrequencyDetector(analysis_context.aminer_config, anomaly_event_handlers, window_size=0.5, auto_include_flag=True)
analysis_context.register_component(efd, component_name="EventFrequencyDetector")
atom_filter.add_handler(efd)

from aminer.analysis.EventSequenceDetector import EventSequenceDetector
esd = EventSequenceDetector(analysis_context.aminer_config, anomaly_event_handlers, ['/model/ParsingME'], ignore_list=[
'/model/ECD/a', '/model/ECD/b', '/model/ECD/c', '/model/ECD/d', '/model/ECD/e', '/model/ECD/f', '/model/Random',
'/model/RandomTime', '/model/DailyCron'])
'/model/RandomTime', '/model/DailyCron'], auto_include_flag=True)
analysis_context.register_component(esd, component_name="EventSequenceDetector")
atom_filter.add_handler(esd)

Expand Down
4 changes: 3 additions & 1 deletion aecid-testsuite/demo/aminer/demo-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ Analysis:
- type: EventFrequencyDetector
id: EventFrequencyDetector
window_size: 0.5
learn_mode: True
- type: EventSequenceDetector
id: EventSequenceDetector
id_path_list:
Expand All @@ -894,6 +895,7 @@ Analysis:
- '/model/Random'
- '/model/RandomTime'
- '/model/DailyCron'
learn_mode: True
- type: MatchFilter
id: MatchFilter
paths:
Expand Down Expand Up @@ -934,7 +936,7 @@ Analysis:
report_interval: 10
- type: MatchValueAverageChangeDetector
id: MatchValueAverageChange
timestamp_path: None
timestamp_path: null
paths:
- "/model/Random"
min_bin_elements: 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import time
import os
import logging
Expand Down Expand Up @@ -143,7 +142,7 @@ def load_persistence_data(self):
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
self.known_values_dict = persistence_data
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

def do_persist(self):
"""Immediately write persistence data to storage."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
from collections import deque
import random
import math
Expand Down Expand Up @@ -681,37 +680,7 @@ def load_persistence_data(self):
implication = Implication((), (), (), 0, 0)
implication.load_from_json_dictionary_repr(implication_dict)
self.forward_rules[rule].append(implication)
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

# persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
# if persistence_data is not None:
# for record in persistence_data:
# implication_direction = record[0]
# trigger_event = tuple(record[1])
# implied_event = tuple(record[2])
# max_obs = record[3]
# min_eval_t = record[4]
# rule = Implication(trigger_event, implied_event, None, max_obs, min_eval_t)
# rule.stable = 1
# if implication_direction == 'back':
# if trigger_event in self.back_rules:
# self.back_rules[trigger_event].append(rule)
# else:
# self.back_rules[trigger_event] = [rule]
# if implied_event in self.back_rules_inv:
# self.back_rules_inv[implied_event].append(rule)
# else:
# self.back_rules_inv[implied_event] = [rule]
# elif implication_direction == 'forward':
# if trigger_event in self.forward_rules:
# self.forward_rules[trigger_event].append(rule)
# else:
# self.forward_rules[trigger_event] = [rule]
# if implied_event in self.forward_rules_inv:
# self.forward_rules_inv[implied_event].append(rule)
# else:
# self.forward_rules_inv[implied_event] = [rule]
# logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

def do_persist(self):
"""Immediately write persistence data to storage."""
Expand All @@ -729,19 +698,6 @@ def do_persist(self):
self.next_persist_time = None
logging.getLogger(DEBUG_LOG_NAME).debug('%s persisted data.', self.__class__.__name__)

# known_path_set = set()
# for event_a in self.back_rules:
# for implication in self.back_rules[event_a]:
# known_path_set.add(
# ('back', tuple(event_a), tuple(implication.implied_event), implication.max_observations, implication.min_eval_true))
# for event_a in self.forward_rules:
# for implication in self.forward_rules[event_a]:
# known_path_set.add(
# ('forward', tuple(event_a), tuple(implication.implied_event), implication.max_observations, implication.min_eval_true))
# PersistenceUtil.store_json(self.persistence_file_name, list(known_path_set))
# self.next_persist_time = None
# logging.getLogger(DEBUG_LOG_NAME).debug('%s persisted data.', self.__class__.__name__)

def log_statistics(self, component_name):
"""
Log statistics of an AtomHandler. Override this method for more sophisticated statistics output of the AtomHandler.
Expand Down Expand Up @@ -869,6 +825,7 @@ def get_dictionary_repr(self):
'hypothesis_observations': self.hypothesis_observations, 'hypothesis_evaluated_true': self.hypothesis_evaluated_true}

def load_from_json_dictionary_repr(self, dict_repr):
"""Load the Implication from persisted data in a json dictionary format."""
self.trigger_event = tuple(dict_repr["trigger_event"])
self.implied_event = tuple(dict_repr["implied_event"])
self.stable = dict_repr["stable"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,7 @@ def __init__(self, aminer_config, anomaly_event_handlers, target_path_list=None,

self.persistence_file_name = build_persistence_file_name(aminer_config, self.__class__.__name__, persistence_id)
PersistenceUtil.add_persistable_component(self)

# Persisted data contains lists of event-frequency pairs, i.e., [[<ev1, ev2>, <freq>], [<ev1, ev2>, <freq>], ...]
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
for entry in persistence_data:
log_event = entry[0]
frequency = entry[1]
self.counts_prev[tuple(log_event)] = frequency
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)
self.load_persistence_data()

def receive_atom(self, log_atom):
"""Receive a log atom from a source."""
Expand Down Expand Up @@ -199,13 +191,18 @@ def do_timer(self, trigger_time):

def do_persist(self):
"""Immediately write persistence data to storage."""
persist_data = []
for log_ev, freq in self.counts_prev.items():
persist_data.append((log_ev, freq))
PersistenceUtil.store_json(self.persistence_file_name, persist_data)
PersistenceUtil.store_json(self.persistence_file_name, {"counts": self.counts, "counts_prev": self.counts_prev})
self.next_persist_time = None
logging.getLogger(DEBUG_LOG_NAME).debug('%s persisted data.', self.__class__.__name__)

def load_persistence_data(self):
"""Load the persistence data from storage."""
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
self.counts = persistence_data["counts"]
self.counts_prev = persistence_data["counts_prev"]
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

def allowlist_event(self, event_type, event_data, allowlisting_data):
"""
Allowlist an event generated by this source using the information emitted when generating the event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,6 @@ def __init__(self, aminer_config, anomaly_event_handlers, id_path_list=None, tar
self.persistence_file_name = build_persistence_file_name(aminer_config, self.__class__.__name__, persistence_id)
PersistenceUtil.add_persistable_component(self)

# Persisted data contains lists of sequences, i.e., [[<seq1_elem1>, <seq1_elem2>], [<seq2_elem1, ...], ...]
# Thereby, sequence elements may be tuples, i.e., combinations of values, or paths that define events.
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
for sequence in persistence_data:
sequence_elem_tuple = []
for sequence_elem in sequence:
sequence_elem_tuple.append(tuple(sequence_elem))
self.sequences.add(tuple(sequence_elem_tuple))
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

def receive_atom(self, log_atom):
"""Receive a log atom from a source."""
parser_match = log_atom.parser_match
Expand Down Expand Up @@ -199,9 +188,22 @@ def do_timer(self, trigger_time):
delta = 600
return delta

def load_persistence_data(self):
"""Load the persistence data from storage."""
# Persisted data contains lists of sequences, i.e., [[<seq1_elem1>, <seq1_elem2>], [<seq2_elem1, ...], ...]
# Thereby, sequence elements may be tuples, i.e., combinations of values, or paths that define events.
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
for sequence in persistence_data:
sequence_elem_tuple = []
for sequence_elem in sequence:
sequence_elem_tuple.append(tuple(sequence_elem))
self.sequences.add(tuple(sequence_elem_tuple))
logging.getLogger(DEBUG_LOG_NAME).debug('%s loaded persistence data.', self.__class__.__name__)

def do_persist(self):
"""Immediately write persistence data to storage."""
PersistenceUtil.store_json(self.persistence_file_name, list(self.sequences))
PersistenceUtil.store_json(self.persistence_file_name, self.sequences)
self.next_persist_time = None
logging.getLogger(DEBUG_LOG_NAME).debug('%s persisted data.', self.__class__.__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,7 @@ def __init__(self, aminer_config, anomaly_event_handlers, persistence_id='Defaul
# Loads the persistence
self.persistence_file_name = build_persistence_file_name(aminer_config, self.__class__.__name__, persistence_id)
PersistenceUtil.add_persistable_component(self)
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)

# Imports the persistence
if persistence_data is not None:
for key in persistence_data[0]:
self.found_keys.append(set(key))
self.variable_key_list = persistence_data[1]
self.values = persistence_data[2]
self.longest_path = persistence_data[3]
self.check_variables = persistence_data[4]
self.num_eventlines = persistence_data[5]
self.etd_time_trigger = persistence_data[6]
self.num_eventlines_TSA_ref = persistence_data[7]

self.num_events = len(self.found_keys)
else:
if self.track_time_for_TSA:
self.etd_time_trigger[0].append(-1)
self.etd_time_trigger[1].append(-1)
self.etd_time_trigger[2].append(-1)
self.load_persistence_data()

def receive_atom(self, log_atom):
"""Receives an parsed atom and keeps track of the event types and the values of the variables of them."""
Expand Down Expand Up @@ -295,19 +276,32 @@ def do_timer(self, trigger_time):
delta = self.aminer_config.config_properties.get(KEY_PERSISTENCE_PERIOD, DEFAULT_PERSISTENCE_PERIOD)
return delta

def load_persistence_data(self):
"""Load the persistence data from storage."""
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
for key in persistence_data["found_keys"]:
self.found_keys.append(set(key))
self.variable_key_list = persistence_data["variable_key_list"]
self.values = persistence_data["values"]
self.longest_path = persistence_data["longest_path"]
self.check_variables = persistence_data["check_variables"]
self.num_eventlines = persistence_data["num_eventlines"]
self.etd_time_trigger = persistence_data["etd_time_trigger"]
self.num_eventlines_TSA_ref = persistence_data["num_eventlines_TSA_ref"]
self.num_events = len(self.found_keys)
else:
if self.track_time_for_TSA:
self.etd_time_trigger[0].append(-1)
self.etd_time_trigger[1].append(-1)
self.etd_time_trigger[2].append(-1)

def do_persist(self):
"""Immediately write persistence data to storage."""
tmp_list = [[]]
for key in self.found_keys:
tmp_list[0].append(list(key))
tmp_list.append(self.variable_key_list)
tmp_list.append(self.values)
tmp_list.append(self.longest_path)
tmp_list.append(self.check_variables)
tmp_list.append(self.num_eventlines)
tmp_list.append(self.etd_time_trigger)
tmp_list.append(self.num_eventlines_TSA_ref)
PersistenceUtil.store_json(self.persistence_file_name, tmp_list)
persist_dict = {"found_keys": self.found_keys, "variable_key_list": self.variable_key_list, "values": self.values,
"longest_path": self.longest_path, "check_variables": self.check_variables, "num_eventlines": self.num_eventlines,
"etd_time_trigger": self.etd_time_trigger, "num_eventlines_TSA_ref": self.num_eventlines_TSA_ref}
PersistenceUtil.store_json(self.persistence_file_name, persist_dict)

for following_module in self.following_modules:
following_module.do_persist()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,11 @@ def __init__(self, aminer_config, anomaly_event_handlers, timestamp_path, analyz
self.persistence_id = persistence_id
self.output_log_line = output_log_line
self.aminer_config = aminer_config
self.stat_data = []

self.persistence_file_name = build_persistence_file_name(aminer_config, self.__class__.__name__, persistence_id)
PersistenceUtil.add_persistable_component(self)
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
self.stat_data = []
for path in analyze_path_list:
self.stat_data.append((path, [],))
if persistence_data is not None:
for val in persistence_data:
if isinstance(val, str):
val = val.strip('[').strip(']').split(',', 2)
path = val[0].strip('"')
values = val[1].strip(' ').strip('[').strip(']')
else:
path = val[0]
values = val[1]
index = 0
for p, _ in self.stat_data:
if p == path:
break
index += 1
for value in values:
self.stat_data[index][1].append(value)
self.load_persistence_data(analyze_path_list)

def receive_atom(self, log_atom):
"""Send summary to all event handlers."""
Expand Down Expand Up @@ -158,6 +140,19 @@ def do_timer(self, trigger_time):
delta = self.aminer_config.config_properties.get(KEY_PERSISTENCE_PERIOD, DEFAULT_PERSISTENCE_PERIOD)
return delta

def load_persistence_data(self, analyze_path_list):
"""Load the persistence data from storage."""
persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
if persistence_data is not None:
for stat_data in persistence_data:
for i, val in enumerate(stat_data[1]):
if isinstance(val, list):
stat_data[1][i] = tuple(val)
self.stat_data.append(tuple(stat_data))
else:
for path in analyze_path_list:
self.stat_data.append((path, []))

def do_persist(self):
"""Immediately write persistence data to storage."""
PersistenceUtil.store_json(self.persistence_file_name, self.stat_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
'tuple_transformation_function': {'type': 'string', 'allowed': ['demo'], 'nullable': True, 'default': None},
'value_list': {
'type': 'list', 'schema': {'type': ['boolean', 'float', 'integer', 'string']}, 'nullable': True, 'default': None},
'timestamp_path': {'type': 'string'},
'timestamp_path': {'type': 'string', 'nullable': True},
'min_bin_elements': {'type': 'integer'},
'min_bin_time': {'type': 'integer'},
'debug_mode': {'type': 'boolean', 'default': False},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
{
'id': {'type': 'string', 'nullable': True, 'empty': False},
'type': {'type': 'string', 'allowed': ['MatchValueAverageChangeDetector'], 'required': True},
'timestamp_path': {'type': 'string', 'required': True, 'empty': False},
'timestamp_path': {'type': 'string', 'nullable': True, 'empty': False},
'paths': {'type': 'list', 'schema': {'type': 'string', 'empty': False}, 'required': True},
'min_bin_elements': {'type': 'integer', 'required': True, 'min': 1},
'min_bin_time': {'type': 'integer', 'required': True, 'min': 1},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def encode_object(term):
encoded_object = 'string:' + term
elif isinstance(term, bytes):
encoded_object = 'bytes:' + encode_byte_string_as_string(term)
elif isinstance(term, (list, tuple)):
elif isinstance(term, (list, tuple, set)):
encoded_object = [encode_object(item) for item in term]
elif isinstance(term, dict):
encoded_object = {}
Expand Down Expand Up @@ -68,7 +68,10 @@ def decode_object(term):
decoded_object = {}
for key, var in term.items():
if key.startswith("(") and key.endswith(")"):
key = ast.literal_eval(key)
try:
key = ast.literal_eval(key)
except ValueError:
pass
var = decode_object(var)
decoded_object[key] = var
else:
Expand Down

0 comments on commit e257fd7

Please sign in to comment.