In [4]:
from logadempirical.data.vocab import Vocab
import pandas as pd
import os
import pickle
import logging

In [5]:
logger = logging.getLogger(__name__)

In [6]:
def build_vocab(vocab_path, data_dir, train_path, embeddings, is_unsupervised=False):
    if not os.path.exists(vocab_path):
        with open(train_path, 'rb') as f:
            data = pickle.load(f)
        if is_unsupervised:
            logs = [x['EventTemplate'] for x in data if x['Label'] == 0]
        else:
            logs = [x['EventTemplate'] for x in data]
        vocab = Vocab(logs, os.path.join(data_dir, embeddings))
        logger.info(f"Vocab size: {len(vocab)}")
        logger.info(f"Save vocab in {vocab_path}")
        vocab.save_vocab(vocab_path)
    else:
        vocab = Vocab.load_vocab(vocab_path)
        logger.info(f"Vocab size: {len(vocab)}")
    return vocab

In [7]:
vocab = log_vocab = build_vocab("output/HDFS/session/train0.1/vocabs/DeepLog.pkl", "dataset/HDFS", "", "")

In [8]:
vocab.itos

['Received block <*> of size <*> from /<*>',
 '<*>:Transmitted block <*> to /<*>',
 'BLOCK* NameSystem.addStoredBlock: addStoredBlock request received for <*> on <*> size <*> But it does not belong to any file.',
 'Verification succeeded for <*>',
 'PacketResponder <*> for block <*> <*>',
 'BLOCK* ask <*> to replicate <*> to datanode(s) <*>',
 'Deleting block <*> file <*>',
 '<*> Starting thread to transfer block <*> to <*>',
 '<*> Starting thread to transfer block <*> to <*>, <*>',
 'BLOCK* NameSystem.addStoredBlock: blockMap updated: <*> is added to <*> size <*>',
 '<*> Served block <*> to /<*>',
 'BLOCK* NameSystem.addStoredBlock: Redundant addStoredBlock request received for <*> on <*> size <*>',
 'Unexpected error trying to delete block <*>. BlockInfo not found in volumeMap.',
 'BLOCK* ask <*> to replicate <*> to datanode(s) <*> <*>',
 'Receiving block <*> src: /<*> dest: /<*>',
 'BLOCK* NameSystem.delete: <*> is added to invalidSet of <*>',
 'Received block <*> src: /<*> dest: /<

In [9]:
event_df = pd.read_csv("./dataset/HDFS/HDFS.log_templates.csv")

In [14]:
all_temp_count = [v for k, v in zip(event_df['EventTemplate'], event_df['Occurrences'])]
normal_temp_count = [v for k, v in zip(event_df['EventTemplate'], event_df['Occurrences']) if k in vocab.itos]

In [15]:
print(sum(normal_temp_count), "/", sum(all_temp_count))

11170312 / 11175629


In [16]:
abnormal_template = [k for k, v in zip(event_df['EventTemplate'], event_df['Occurrences']) if k not in vocab.itos]

In [18]:
len(abnormal_template)

29

In [19]:
log_df = pd.read_csv("./dataset/HDFS/HDFS.log_structured.csv")

In [20]:
block_ids = []
log_df_ab = log_df[log_df.EventTemplate.isin(abnormal_template)].to_dict('records')

In [21]:
log_df_ab[0]

{'LineId': 6336,
 'Date': 81109,
 'Time': 203633,
 'Pid': 147,
 'Level': 'INFO',
 'Component': 'dfs.DataNode$DataXceiver',
 'Content': 'writeBlock blk_-3102267849859399193 received exception java.net.SocketTimeoutException',
 'EventId': 'ace40671',
 'EventTemplate': 'writeBlock <*> received exception <*>',
 'ParameterList': "['blk_-3102267849859399193', 'java.net.SocketTimeoutException']"}

In [22]:
# convert ParameterList from string to list
for log in log_df_ab:
    log['ParameterList'] = eval(log['ParameterList'])

In [23]:
log_df_ab[0]

{'LineId': 6336,
 'Date': 81109,
 'Time': 203633,
 'Pid': 147,
 'Level': 'INFO',
 'Component': 'dfs.DataNode$DataXceiver',
 'Content': 'writeBlock blk_-3102267849859399193 received exception java.net.SocketTimeoutException',
 'EventId': 'ace40671',
 'EventTemplate': 'writeBlock <*> received exception <*>',
 'ParameterList': ['blk_-3102267849859399193',
  'java.net.SocketTimeoutException']}

In [25]:
block_ids = []
for log in log_df_ab:
    blk = [x for x in log['ParameterList'] if 'blk_' in x]
    block_ids.extend(blk)

In [27]:
block_ids = list(set(block_ids))

In [28]:
len(block_ids)

3970

In [29]:
def load_features(data_path, is_unsupervised=True, min_len=0):
    """
    Load features from pickle file
    Parameters
    ----------
    data_path: str: Path to pickle file
    is_unsupervised: bool: Whether the model is unsupervised or not
    min_len: int: Minimum length of log sequence

    Returns
    -------
    logs: List[Tuple[List[str], int]]: List of log sequences
    """
    with open(data_path, 'rb') as f:
        data = pickle.load(f)
    if is_unsupervised:
        logs = []
        no_abnormal = 0
        for seq in data:
            if len(seq['EventTemplate']) < min_len:
                continue
            if not isinstance(seq['Label'], int):
                label = max(seq['Label'].tolist())
            else:
                label = seq['Label']
            if label == 0:
                logs.append((seq['EventTemplate'], label))
            else:
                no_abnormal += 1
        print("Number of abnormal sessions:", no_abnormal)
    else:
        logs = []
        no_abnormal = 0
        for seq in data:
            if len(seq['EventTemplate']) < min_len:
                continue
            if not isinstance(seq['Label'], int):
                label = seq['Label'].tolist()
                if max(label) > 0:
                    no_abnormal += 1
            else:
                label = seq['Label']
                if label > 0:
                    no_abnormal += 1
            logs.append((seq['EventTemplate'], label))
        print("Number of abnormal sessions:", no_abnormal)
    return logs

In [31]:
logs = load_features("./output/HDFS/session/train0.1/test.pkl", is_unsupervised=False)

Number of abnormal sessions: 15149


In [32]:
logs[0]

(['Receiving block <*> src: /<*> dest: /<*>',
  'Receiving block <*> src: /<*> dest: /<*>',
  'Receiving block <*> src: /<*> dest: /<*>',
  'BLOCK* NameSystem.allocateBlock: <*> <*>',
  'PacketResponder <*> for block <*> <*>',
  'Received block <*> of size <*> from /<*>',
  'PacketResponder <*> for block <*> <*>',
  'Received block <*> of size <*> from /<*>',
  'PacketResponder <*> for block <*> <*>',
  'Received block <*> of size <*> from /<*>',
  'BLOCK* NameSystem.addStoredBlock: blockMap updated: <*> is added to <*> size <*>',
  'BLOCK* NameSystem.addStoredBlock: blockMap updated: <*> is added to <*> size <*>',
  'BLOCK* NameSystem.addStoredBlock: blockMap updated: <*> is added to <*> size <*>',
  'BLOCK* NameSystem.delete: <*> is added to invalidSet of <*>',
  'BLOCK* NameSystem.delete: <*> is added to invalidSet of <*>',
  'BLOCK* NameSystem.delete: <*> is added to invalidSet of <*>',
  'Deleting block <*> file <*>',
  'Deleting block <*> file <*>',
  'Deleting block <*> file <*>

In [33]:
ab_logs = [x for x in logs if x[1] > 0]

In [34]:
len(ab_logs)

15149

In [39]:
ab_log_events = sum([x[0] for x in ab_logs], [])

In [40]:
ab_log_events = [x for x in ab_log_events if x not in vocab.itos]

In [41]:
len(ab_log_events)

4754