In [2]:
"""
loads and preprocesses the structured log data for anomoly prediction
"""
import numpy as np
import pandas as pd
import time
import re
from collections import OrderedDict
from collections import Counter
from scipy.special import expit


def collect_event_ids(data_frame):
    """
    turns input data_frame into a 2 columned dataframe
    with columns: BlockId, EventSequence
    where EventSequence is a list of the events that happened to the block
    """
    data_dict = OrderedDict()
    for _, row in data_frame.iterrows():
        blk_id_list = re.findall(r"(blk_-?\d+)", row["Content"])
        blk_id_set = set(blk_id_list)
        for blk_id in blk_id_set:
            if not blk_id in data_dict:
                data_dict[blk_id] = []
            data_dict[blk_id].append(row["EventId"])
    data_df = pd.DataFrame(
        list(data_dict.items()), columns=["BlockId", "EventSequence"]
    )
    return data_df

In [3]:


# train = pd.read_csv("./train_subset.csv") # for testing
data1 = pd.read_csv("example_data/HDFS_100k.log_structured.csv")
lab1 = pd.read_csv("example_data/anomaly_label.csv")
print("data loaded")

# Convert to blockId and EventSequence dataframe
events_df = collect_event_ids(data1).merge(lab1, on="BlockId")

# Convert label column to binary
events_df["Label"] = events_df["Label"].apply(lambda x: 1 if x == "Anomaly" else 0)

# select only events
events = events_df["EventSequence"].values

data loaded


In [5]:
data1.head()

Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,Receiving block <*> src: /<*> dest: /<*>
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,E22,BLOCK* NameSystem.allocateBlock:<*>
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,Receiving block <*> src: /<*> dest: /<*>
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,Receiving block <*> src: /<*> dest: /<*>
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,E11,PacketResponder <*> for block <*> terminating


In [8]:
events_df.head()

Unnamed: 0,BlockId,EventSequence,Label
0,blk_-1608999687919862906,"[E5, E22, E5, E5, E11, E11, E9, E9, E11, E9, E...",0
1,blk_7503483334202473044,"[E5, E5, E22, E5, E11, E9, E11, E9, E11, E9, E...",0
2,blk_-3544583377289625738,"[E5, E22, E5, E5, E11, E9, E11, E9, E11, E9, E...",1
3,blk_-9073992586687739851,"[E5, E22, E5, E5, E11, E9, E11, E9, E11, E9, E...",0
4,blk_7854771516489510256,"[E5, E5, E22, E5, E11, E9, E11, E9, E11, E9, E...",0


In [6]:
events[:5]

array([list(['E5', 'E22', 'E5', 'E5', 'E11', 'E11', 'E9', 'E9', 'E11', 'E9', 'E26', 'E26', 'E26', 'E6', 'E5', 'E16', 'E6', 'E5', 'E18', 'E25', 'E26', 'E26', 'E3', 'E25', 'E6', 'E6', 'E5', 'E5', 'E16', 'E18', 'E26', 'E26', 'E5', 'E6', 'E5', 'E16', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E18', 'E25', 'E6', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E26', 'E26', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E25', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E3', 'E18', 'E6', 'E5', 'E3', 'E3', 'E3', 'E3', 'E3', 'E16', 'E3', 'E3', 'E

In [7]:
events1 = events.copy()

In [8]:
unique_events = set()
for i in events1:
    unique_events.update(i)

In [9]:
unique_events

{'E10',
 'E11',
 'E13',
 'E14',
 'E15',
 'E16',
 'E18',
 'E2',
 'E21',
 'E22',
 'E25',
 'E26',
 'E27',
 'E3',
 'E5',
 'E6',
 'E7',
 'E8',
 'E9'}

In [10]:
for i in events1:
    print(i)

6', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E11', 'E9', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9', 'E26']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E22', 'E5', 'E5', 'E5', 'E26', 'E26', 'E26', 'E11', 'E9', 'E11', 'E9', 'E11', 'E9']
['E2

In [11]:
# Convert into bag of words
X_counts = []
for i in range(events1.shape[0]):
    event_counts = Counter(events1[i])
    X_counts.append(event_counts)
X_df = pd.DataFrame(X_counts)
X_df = X_df.fillna(0)
events_list = X_df.columns
X = X_df.values


In [12]:
test1 = ['E1', 'E2', 'E1']
test2 = ['E3', 'E2', 'E5']

In [13]:
test1_rep = np.repeat(test1, 20)
test2_rep = np.repeat(test2, 20)

In [14]:
test1_split = np.split(test1_rep, 20)
test2_split = np.split(test2_rep, 20)

In [15]:
list_splits = [test1_split, test2_split]

In [16]:
all_blocks_count = []
for i in list_splits:
    block_counts = []
    for j in i:
        subset_count = Counter(j)
        block_counts.append(subset_count)
    block_df = pd.DataFrame(block_counts, columns = unique_events) / 20
    block_df = X_df.fillna(0)
    block_np = block_df.to_numpy()
    all_blocks_count.append(block_np)

all_blocks_stacked = np.stack(all_blocks_count)


In [17]:
block_df

Unnamed: 0,E18,E26,E22,E27,E16,E15,E14,E8,E5,E6,E13,E10,E21,E9,E3,E2,E11,E25,E7
0,,,,,,,,,,,,,,,0.15,,,,
1,,,,,,,,,,,,,,,0.15,,,,
2,,,,,,,,,,,,,,,0.15,,,,
3,,,,,,,,,,,,,,,0.15,,,,
4,,,,,,,,,,,,,,,0.15,,,,
5,,,,,,,,,,,,,,,0.15,,,,
6,,,,,,,,,,,,,,,0.1,0.05,,,
7,,,,,,,,,,,,,,,,0.15,,,
8,,,,,,,,,,,,,,,,0.15,,,
9,,,,,,,,,,,,,,,,0.15,,,


In [18]:
all_blocks_stacked

array([[[ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan, 0.05,  nan,  nan,  nan],
        [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,


In [4]:
unique_events = set()
for i in events:
    unique_events.update(i)

# Convert into bag of words
all_blocks_count = []
for block in events:
    # multiply block by 20 for 5% partitions
    block_rep = np.repeat(block, 20)
    # now split into 5% partitions
    block_split = np.split(block_rep, 20)
    block_counts = []
    for sub_block in block_split:
        # count each sub_block
        subset_count = Counter(sub_block)
        block_counts.append(subset_count)
    # put into dataframe to add nas to missing events
    # divide by 20 as original operation multiplied by 20
    block_df = pd.DataFrame(block_counts, columns=unique_events) / 20
    block_df = X_df.fillna(0)
    block_np = block_df.to_numpy()
    all_blocks_count.append(block_np)

# finally stack the blocks
X = np.stack(all_blocks_count)

In [5]:
X.shape

(7940, 20, 19)

In [7]:
X[0,:,:]

array([[ 3.  ,   nan,  3.  ,   nan,   nan,  2.45,   nan,   nan,   nan,
         1.  ,   nan,   nan,   nan,   nan,   nan,  3.  ,   nan,   nan,
          nan],
       [  nan,   nan,  2.  ,  2.9 ,   nan,  2.55,  1.  ,  1.  ,  1.  ,
          nan,  2.  ,   nan,   nan,   nan,   nan,   nan,   nan,   nan,
          nan],
       [  nan,   nan,  4.  ,  2.1 ,   nan,  2.  ,  1.  ,  2.  ,  1.35,
          nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,
          nan],
       [  nan,   nan,   nan,  1.  ,   nan,   nan,  1.  ,   nan,  9.45,
          nan,  1.  ,   nan,   nan,   nan,   nan,   nan,   nan,   nan,
          nan],
       [  nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan, 12.45,
          nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,
          nan],
       [  nan,   nan,   nan,   nan,   nan,  2.  ,   nan,   nan, 10.45,
          nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,   nan,
          nan],
       [  nan,   nan,   nan,   nan,   nan,   nan,   