# Dependencies

In [None]:
!pip install swifter

# Preprocessing

In [1]:
# Loading data
import pandas as pd
folder = 'dataset/HDFS_4level_0.5st'
df = pd.read_csv('{}/HDFS.log_structured.csv'.format(folder))
df.head(5)

Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.19.102:5..."
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,3d91fa85,BLOCK* NameSystem.allocateBlock: <*> <*>,['/mnt/hadoop/mapred/system/job_200811092030_0...
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.10.6:405..."
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-1608999687919862906', '/10.250.14.224:4..."
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,d38aa58d,PacketResponder <*> for block <*> <*>,"['1', 'blk_-1608999687919862906 terminating']"


In [2]:
# Adding block id to dataframe
import re
import swifter

def getBlockId(x):
    block_id = re.findall("blk_.\d+", x)
    if len(block_id) == 0:
        return ''
    else:
        return block_id[0]
    

df['block_id'] = df['ParameterList'].swifter.apply(getBlockId)

Pandas Apply:   0%|          | 0/11175629 [00:00<?, ?it/s]

In [3]:
# Checking shape
# Initialy, there are only 11k lines. Same number as reported in the paper "Detecting Large-Scale System Problems by Mining Console Logs"
df.shape

(11175629, 11)

In [4]:
# Differently from what is reported in the "Drain: An Online Log Parsing Approach with Fixed Depth Tree", "Detecting Large-Scale System Problems by Mining Console Logs" and "Tools and Benchmarks for Automated Log Parsing". 
# The number of templates found by the drain execution was close to 50, when the expected number was 29.
# Models worked despite the difference.
len(df['EventId'].unique())

48

In [6]:
# Aggregating events by blockId.
df['cat_BlockId'] = df['block_id'].astype('category')
df['cat_BlockId'] = df['cat_BlockId'].cat.codes
df['cat_EventTemplate'] = df['EventTemplate'].astype('category')
df['cat_EventTemplate'] = df['cat_EventTemplate'].cat.codes
result_agg = df.groupby(['cat_BlockId', 'cat_EventTemplate']).size().reset_index(name='counts')
result_agg

Unnamed: 0,cat_BlockId,cat_EventTemplate,counts
0,0,10,3
1,0,11,1
2,0,31,3
3,0,33,3
4,0,35,3
...,...,...,...
4170659,575060,12,3
4170660,575060,17,3
4170661,575060,31,3
4170662,575060,33,3


In [7]:
# Generating frequency matrix. Each row corresponds to a block and each column corresponds to a message type.
import numpy as np
data = np.zeros((575061, 48), dtype=float)   # (rows,cols)
for i in range(len(result_agg)):
    data[result_agg.iloc[i]['cat_BlockId']][result_agg.iloc[i]['cat_EventTemplate']] = result_agg.iloc[i]['counts']

In [8]:
# Loading labels for blocks
import pandas as pd
anomaly_db = pd.read_csv('{}/anomaly_label.csv'.format(folder))
anomaly_db.head(5)

Unnamed: 0,BlockId,Label
0,blk_-1608999687919862906,Normal
1,blk_7503483334202473044,Normal
2,blk_-3544583377289625738,Anomaly
3,blk_-9073992586687739851,Normal
4,blk_7854771516489510256,Normal


In [9]:
# Creating maps between block ids and rows
block_mapping = df.groupby(['block_id','cat_BlockId']).size().reset_index()
block_mapping = block_mapping[['block_id','cat_BlockId']]
block_mapping = block_mapping.rename(columns={'block_id':'BlockId'})
block_mapping.head()

Unnamed: 0,BlockId,cat_BlockId
0,blk_-1000002529962039464,0
1,blk_-100000266894974466,1
2,blk_-1000007292892887521,2
3,blk_-1000014584150379967,3
4,blk_-1000028658773048709,4


In [11]:
# Preparing Y for each line of matrix X
anomaly_fixed = pd.merge(anomaly_db, block_mapping, on="BlockId")
anomaly_fixed["cat_label"] = anomaly_fixed["Label"].astype("category").cat.codes
anomaly_fixed['cat_label'] = 1 - anomaly_fixed['cat_label']
anomaly_fixed = anomaly_fixed.sort_values(by=['cat_BlockId']).reset_index()
Y = anomaly_fixed['cat_label']

In [16]:
# Extracting unique lines from the data matrix
X, x_index = np.unique(data, axis=0, return_index=True)
Y = Y[x_index]

In [17]:
print("Final dataset has {} samples, {} of them are anomalous".format(X.shape[0], sum(Y)))

Final dataset has 597 samples, 393 of them are anomalous


In [18]:
# Spliting and saving dataset
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.33, random_state=42)
np.savez('{}/splited_dataset'.format(folder), x_train=X_train, y_train=y_train, x_test=X_test, y_test=y_test)

# Generated dataset for federated agents

In [19]:
# Creating the dataset for the parties
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

folder = 'dataset/HDFS_4level_0.5st'
data = np.load('{}/splited_dataset.npz'.format(folder))

def generate_parties_dataset(nb_dp_per_party, should_stratify, party_folder, data):
    x_train, y_train,x_test, y_test = data['x_train'], data['y_train'], data['x_test'], data['y_test']
    labels, train_counts = np.unique(y_train, return_counts=True)
    te_labels, test_counts = np.unique(y_test, return_counts=True)
    if np.all(np.isin(labels, te_labels)):
        print("Warning: test set and train set contain different labels")

    num_train = np.shape(y_train)[0]
    num_test = np.shape(y_test)[0]
    num_labels = np.shape(np.unique(y_test))[0]
    nb_parties = len(nb_dp_per_party)

    if should_stratify:
        # Sample according to source label distribution
        train_probs = {
            label: train_counts[label] / float(num_train) for label in labels}
        test_probs = {label: test_counts[label] /
                      float(num_test) for label in te_labels}
    else:
        # Sample uniformly
        train_probs = {label: 1.0 / len(labels) for label in labels}
        test_probs = {label: 1.0 / len(te_labels) for label in te_labels}

    for idx, dp in enumerate(nb_dp_per_party):
        train_p = np.array([train_probs[y_train[idx]]
                            for idx in range(num_train)])
        train_p /= np.sum(train_p)
        train_indices = np.random.choice(num_train, dp, p=train_p)
        test_p = np.array([test_probs[y_test[idx]] for idx in range(num_test)])
        test_p /= np.sum(test_p)

        # Split test evenly
        test_indices = np.random.choice(
            num_test, int(num_test / nb_parties), p=test_p)

        
        sc = StandardScaler()
        x_train_pi = x_train[train_indices]
        x_train_pi = sc.fit_transform(x_train_pi)
        y_train_pi = y_train[train_indices]
        
        x_test_pi = x_test[test_indices]
        x_test_pi = sc.transform(x_test_pi)
        y_test_pi = y_test[test_indices]

        # Now put it all in an npz
        name_file = 'data_party' + str(idx) + '.npz'
        name_file = os.path.join(party_folder, name_file)
        np.savez(name_file, x_train=x_train_pi, y_train=y_train_pi,
                 x_test=x_test_pi, y_test=y_test_pi)

        print_statistics(idx, x_test_pi, x_train_pi, num_labels, y_train_pi)

        print('Finished! :) Data saved in ', party_folder)
        
def print_statistics(i, x_test_pi, x_train_pi, nb_labels, y_train_pi):
    print('Party_', i)
    print('nb_x_train: ', np.shape(x_train_pi),
          'nb_x_test: ', np.shape(x_test_pi))
    for l in range(nb_labels):
        print('* Label ', l, ' samples: ', (y_train_pi == l).sum())


# Same distribution for all participants        
generate_parties_dataset([150 for _ in range(3)], False, folder, data)
# Participants with 10%, 25% and 65% of the dataset.
# generate_parties_dataset([int(450 * 0.10), int(450 * 0.25), int(450 * 0.65)], False, folder, data)

Party_ 0
nb_x_train:  (150, 48) nb_x_test:  (66, 48)
* Label  0  samples:  48
* Label  1  samples:  102
Finished! :) Data saved in  dataset/HDFS_4level_0.5st
Party_ 1
nb_x_train:  (150, 48) nb_x_test:  (66, 48)
* Label  0  samples:  53
* Label  1  samples:  97
Finished! :) Data saved in  dataset/HDFS_4level_0.5st
Party_ 2
nb_x_train:  (150, 48) nb_x_test:  (66, 48)
* Label  0  samples:  60
* Label  1  samples:  90
Finished! :) Data saved in  dataset/HDFS_4level_0.5st
