In [23]:
import logging
import tensorflow as tf
import mlflow
import random
import pandas as pd
import numpy as np

from tqdm import tqdm
from collections import Counter
from typing import List, Dict, Set

import sys
import os

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

sys.argv.clear()
sys.argv.append("")

In [2]:
from src import config
from src.features import preprocessing,sequences,knowledge
from src.training import models
from src import refinement

# four types of knowledge for Huawei:
# gram, text, causal, log template(gram_logs)
experiment_config = config.ExperimentConfig()
experiment_config.model_type = "causal"
experiment_config.sequence_type = "huawei_logs"

model_config = models.config.ModelConfig()
model_config.rnn_type: str = "gru"

huawei_preprocessor_config = preprocessing.huawei.HuaweiPreprocessorConfig()
huawei_preprocessor_config.min_causality = 0.1

# keep all other default configurations 
sequence_config = sequences.config.SequenceConfig()
knowledge_config = knowledge.config.KnowledgeConfig()
refinement_config = refinement.config.RefinementConfig()

In [3]:
def log_all_configs_to_mlflow():
    for config in [
        experiment_config,
        huawei_preprocessor_config,
        sequence_config,
        model_config,
        knowledge_config,
        refinement_config,
    ]:
        for config_name, config_value in vars(config).items():
            full_config_name = config.__class__.__name__ + config_name
            mlflow.log_param(full_config_name, str(config_value))

In [4]:
mlflow.set_experiment("Domain Guided Monitoring")
with mlflow.start_run() as run:
    run_id = run.info.run_id
logging.info("Starting run %s", run_id)
tf.random.set_seed(experiment_config.tensorflow_seed)
random.seed(experiment_config.random_seed)

## Load Huawei sequences

In [5]:
sequence_preprocessor = preprocessing.ConcurrentAggregatedLogsPreprocessor(
    huawei_preprocessor_config,
)
sequence_column_name = sequence_preprocessor.sequence_column_name
sequence_df = sequence_preprocessor.load_data()

transformer = sequences.transformer.NextPartialSequenceTransformerFromDataframe(sequence_config)
metadata = transformer.collect_metadata(sequence_df, sequence_column_name)

  sequence_df = sequence_preprocessor.load_data()
Generating DRAIN clusters from log_df: 100%|███████████████| 152373/152373 [00:21<00:00, 7158.92it/s]
Generating DRAIN clusters from log_df: 100%|███████████████| 152373/152373 [00:16<00:00, 9464.26it/s]
Generating DRAIN clusters from log_df: 100%|████████████████████| 970/970 [00:00<00:00, 14653.04it/s]


## Generate dataset

In [6]:
sequence_df_pkl_file: str = "data/sequences_df.pkl"
sequence_df.to_pickle(sequence_df_pkl_file)

train_sequences, test_sequences = transformer._split_train_test(sequence_df, sequence_column_name)

def generate(for_train):
    relevant_sequences = train_sequences if for_train else test_sequences
    for sequence in relevant_sequences:
        split_sequences = transformer._split_sequence(sequence)
        for split_sequence in split_sequences:
            transformer._translate_and_pad(split_sequence, metadata)
            yield split_sequence.x_vecs_stacked, split_sequence.y_vec

def generate_train():
    return generate(for_train=True)

def generate_test():
    return generate(for_train=False)

In [7]:
train_dataset = (
    tf.data.Dataset.from_generator(
        generate_train,
        output_types=(tf.float32, tf.float32),
    )
    .shuffle(
        experiment_config.dataset_shuffle_buffer,
        seed=experiment_config.dataset_shuffle_seed,
        reshuffle_each_iteration=True,
    )
    .batch(experiment_config.batch_size)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

test_dataset = (
    tf.data.Dataset.from_generator(
        generate_test,
        output_types=(tf.float32, tf.float32),
    )
    .batch(experiment_config.batch_size)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

2022-05-30 08:37:24.677403: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-05-30 08:37:24.677421: E tensorflow/stream_executor/cuda/cuda_driver.cc:313] failed call to cuInit: UNKNOWN ERROR (303)
2022-05-30 08:37:24.677436: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (hj-ubuntu): /proc/driver/nvidia/version does not exist
2022-05-30 08:37:24.677620: I tensorflow/core/platform/cpu_feature_guard.cc:143] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2022-05-30 08:37:24.703932: I tensorflow/core/platform/profile_utils/cpu_utils.cc:102] CPU Frequency: 2599990000 Hz
2022-05-30 08:37:24.704542: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7fd534000b60 initialized for platform Host (this does not guarantee that 

## Generate causal relationships

In [10]:
causality_preprocessor = preprocessing.ConcurrentAggregatedLogsCausalityPreprocessor(
    config = huawei_preprocessor_config
)

pd.set_option('display.max_columns', None) 
pd.set_option('display.max_rows', None) 
pd.set_option('max_colwidth',100)

### Step 1 Load log-only data 

In [11]:
huawei_df = sequence_preprocessor._load_log_only_data().fillna("")

  exec(code_obj, self.user_global_ns, self.user_ns)
Generating DRAIN clusters from log_df: 100%|███████████████| 152373/152373 [00:20<00:00, 7324.17it/s]
Generating DRAIN clusters from log_df: 100%|███████████████| 152373/152373 [00:15<00:00, 9787.71it/s]
Generating DRAIN clusters from log_df: 100%|████████████████████| 970/970 [00:00<00:00, 14072.85it/s]


### Step 2 Generate causality

In [12]:
relevant_columns = set(
    [
        x
        for x in sequence_preprocessor.relevant_columns
        if not huawei_preprocessor_config.log_only_causality or "log" in x
    ]
)

In [13]:
'''
def _generate_counted_causality(
    self, df: pd.DataFrame, relevant_columns: Set[str]
) -> Dict[str, List[str]]:
    causality: Dict[str, List[str]] = {}
    previous_row = None
    for _, row in tqdm(
        df.iterrows(),
        desc="Generating counted causality for Huawei log data",
        total=len(df),
    ):
        if previous_row is None:
            previous_row = row
            continue
        for previous_column in relevant_columns:
            previous_column_value = (
                previous_column + "#" + str(previous_row[previous_column]).lower()
                if len(str(previous_row[previous_column])) > 0
                else ""
            )
            if len(previous_column_value) < 1:
                continue
            if previous_column_value not in causality:
                causality[previous_column_value] = []
            for current_column in relevant_columns:
                current_column_value = (
                    current_column + "#" + str(row[current_column]).lower()
                    if len(str(row[current_column])) > 0
                    else ""
                )
                if len(current_column_value) < 1:
                    continue
                if current_column_value not in causality[previous_column_value]:
                    causality[previous_column_value].append(current_column_value)
                else:
                    causality[previous_column_value].append(current_column_value)
        previous_row = row
    return causality
'''

counted_causality = causality_preprocessor._generate_counted_causality(
    huawei_df, relevant_columns
)

Generating counted causality for Huawei log data: 100%|█████| 169230/169230 [04:44<00:00, 594.77it/s]


In [None]:
counted_causality

In [24]:
causality_records = []
for from_value, to_values in tqdm(counted_causality.items(), desc="Generating causality df from counted causality",):
    total_to_counts = len(to_values)
    to_values_counter: Dict[str, int] = Counter(to_values)
    for to_value, to_count in to_values_counter.items():
        if to_count / total_to_counts > huawei_preprocessor_config.min_causality:
            causality_records.append(
                {
                    "parent_id": from_value,
                    "parent_name": from_value.split("#")[1],
                    "child_id": to_value,
                    "child_name": to_value.split("#")[1],
                },
            )

Generating causality df from counted causality: 100%|██████████| 2104/2104 [00:00<00:00, 2142.75it/s]


In [25]:
causality_df = pd.DataFrame.from_records(causality_records).drop_duplicates().reset_index(drop=True)

In [26]:
causality_df

Unnamed: 0,parent_id,parent_name,child_id,child_name
0,coarse_log_cluster_template#security group rule updated *,security group rule updated *,coarse_log_cluster_template#security group rule updated *,security group rule updated *
1,coarse_log_cluster_template#security group rule updated *,security group rule updated *,programname#neutron-openvswitch-agent,neutron-openvswitch-agent
2,coarse_log_cluster_template#security group rule updated *,security group rule updated *,python_module#neutron.agent.securitygroups_rpc,neutron.agent.securitygroups_rpc
3,coarse_log_cluster_template#security group rule updated *,security group rule updated *,fine_log_cluster_template#security group rule updated *,security group rule updated *
4,coarse_log_cluster_template#security group rule updated *,security group rule updated *,log_level#info,info
5,programname#neutron-openvswitch-agent,neutron-openvswitch-agent,programname#neutron-openvswitch-agent,neutron-openvswitch-agent
6,Hostname#wally113,wally113,Hostname#wally113,wally113
7,python_module#neutron.agent.securitygroups_rpc,neutron.agent.securitygroups_rpc,coarse_log_cluster_template#security group rule updated *,security group rule updated *
8,python_module#neutron.agent.securitygroups_rpc,neutron.agent.securitygroups_rpc,programname#neutron-openvswitch-agent,neutron-openvswitch-agent
9,python_module#neutron.agent.securitygroups_rpc,neutron.agent.securitygroups_rpc,python_module#neutron.agent.securitygroups_rpc,neutron.agent.securitygroups_rpc


### Step 3 Build hierarchy from daraframe

In [27]:
'''
def build_causality_from_df(
    self, causality_df: pd.DataFrame, vocab: Dict[str, int]
):
    if self.config.add_causality_prefix:
        causality_df = self._add_prefixes(causality_df)
    self.vocab: Dict[str, int] = vocab
    self._build_extended_vocab(causality_df, vocab)
    for _, row in tqdm(causality_df.iterrows(), desc="Building Causality from df"):
        child_id = row[self.child_id_col]
        if child_id not in self.extended_vocab:
            #logging.debug("Ignoring node %s as not in dataset", child_id)
            continue

        parent_id = row[self.parent_id_col]
        if parent_id not in self.extended_vocab:
            #logging.debug("Ignoring node %s as not in dataset", parent_id)
            continue

        child_node = self.nodes[self.extended_vocab[child_id]]
        parent_node = self.nodes[self.extended_vocab[parent_id]]

        child_node.in_nodes.add(parent_node)
        parent_node.out_nodes.add(child_node)

    logging.info("Built causality with %d nodes", len(self.nodes))
'''


causality = knowledge.CausalityKnowledge(
    config=knowledge_config,
)
causality.build_causality_from_df(causality_df, metadata.x_vocab)

Building Causality from df: 6543it [00:00, 16728.23it/s]


## Genarate model

In [None]:
# load model
model = models.CausalityModel()

# build model
model.build(metadata, causality)

## Run Experiment

In [None]:
# train model
model.train_dataset(
    train_dataset,
    test_dataset,
    experiment_config.multilabel_classification,
    experiment_config.n_epochs,
)

In [None]:
# logging dataset info
mlflow.log_metric("train_size", len([x for x in train_dataset]))
mlflow.log_metric("test_size", len([x for x in test_dataset]))
mlflow.log_metric("x_vocab_size", len(metadata.x_vocab))
mlflow.log_metric("y_vocab_size", len(metadata.y_vocab))

# generate artifacts
# skip

# set mlflow tags
mlflow.set_tag("sequence_type", experiment_config.sequence_type)
mlflow.set_tag("model_type", experiment_config.model_type)
if len(metadata.y_vocab) == 1:
    mlflow.set_tag("task_type", "risk_prediction")
else:
    mlflow.set_tag("task_type", "sequence_prediction")

logging.info("Finished run %s", run_id)