In [1]:
import re
import os
import pandas as pd
from collections import OrderedDict
from tqdm import tqdm
import json
import torch

import dateutil.parser

In [2]:
def encoder(tmp_path, encoder='one_hot'):
    assert encoder in ['one_hot', 'semantic'], "encoder must be one_hot or semantic"
    if encoder == 'one_hot':
        one_datas = pd.read_csv(tmp_path + '.log_templates.csv', engine='c', na_filter=False, memory_map=True)
        etype_nums = len(one_datas)
        mapping = {etype:torch.nn.functional.one_hot(torch.tensor(idx),\
                                                     num_classes=etype_nums).tolist()\
                   for idx, etype in enumerate(one_datas['EventId'])}
        save_js = json.dumps(mapping)
        f2 = open(tmp_path + '_one_hot.json', 'w')
        f2.write(save_js)
        f2.close()
    print('encoder done')

In [3]:
def sample(log_file, window='session',window_size=0):
    assert window == 'session', "Only window=session is supported for HDFS dataset."
    print('Loading', log_file)
    parsed_log = pd.read_csv(log_file + '.log_structured.csv', engine='c', na_filter=False, memory_map=True, dtype={1:"string", 2:"string"})
    
    data_dict = OrderedDict()
    for idx, row in parsed_log.iterrows():
        session_list = re.findall(r'(blk_-?\d+)', row['Content'])
        session_set = set(session_list)
        cur_time = row['Date'] + ' ' + row['Time']
        for session_id in session_set:
            if not session_id in data_dict:
                data_dict[session_id] = []
            # .append((feature1, feature2, ...)), here you can add more features which you need
            data_dict[session_id].append((row['EventId'], row['Component'], cur_time))
        last_time = cur_time
    data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence'])
    # data_df.to_csv("result/HDFS_sequence.csv",index=None)
    data_df.to_csv(log_file + "_sequence.csv",index=None)
    # save component
    component_set = list(set(parsed_log['Component']))
    mapping = {component_set[i]:i for i in range(len(component_set))}
    save_js = json.dumps(mapping)
    f = open(log_file + '_component.json', 'w')
    f.write(save_js)
    f.close()

def split_datas(seq_file, used_name, label_file, alpth = 0.8):
    '''
    split normal logs and abnormal logs by alpth rate and cat
    '''
    print('Loading Log: {} , label: {} and split datas by {} rate'.format(seq_file, label_file, alpth))
    datas = pd.read_csv(seq_file + '_sequence.csv', engine='c', na_filter=False, memory_map=True)
    labels = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True)
    
    normal_, anomaly_ = list(labels[labels['Label'] == 'Normal']['BlockId']), list(labels[labels['Label'] == 'Anomaly']['BlockId'])
    normal_data, anomaly_data = datas[datas['BlockId'].isin(normal_)], datas[datas['BlockId'].isin(anomaly_)]
    norlen, anolen = len(normal_data), len(anomaly_data)
    print('get normal data {}, anomaly data {}'.format(norlen, anolen))
    normal_data['Label'] = 0
    anomaly_data['Label'] = 1
    
    if not os.path.exists(used_name):
        os.mkdir(used_name)
    
    # split normal
    train_test = round(norlen*alpth)
    train_val = round(norlen*(alpth - 0.1))
    
    normal_data.iloc[:train_val].to_csv(used_name + 'train_normal.csv', index=None)
    normal_data.iloc[train_val:train_test].to_csv(used_name + 'val_normal.csv', index=None)
    normal_data.iloc[train_test:].to_csv(used_name + 'test_normal.csv', index=None)
    
    # split anomaly
    train_test = round(anolen*alpth)
    train_val = round(anolen*(alpth - 0.1))
    
    anomaly_data.iloc[:train_val].to_csv(used_name + 'train_anomaly.csv', index=None)
    anomaly_data.iloc[train_val:train_test].to_csv(used_name + 'val_anomaly.csv', index=None)
    anomaly_data.iloc[train_test:].to_csv(used_name + 'test_anomaly.csv', index=None)
    
    print('new file in used')

In [4]:
name = '../data/HDFS/result_logRel/HDFS_30s'
used_name = '../data/HDFS/used_logRel_30s/'
sample(name, window='session')
split_datas(name, used_name, '../data/HDFS/anomaly_label.csv')
encoder(name)

Loading ../data/HDFS/result_logRel/HDFS_30s
Loading Log: ../data/HDFS/result_logRel/HDFS_30s , label: ../data/HDFS/anomaly_label.csv and split datas by 0.8 rate
get normal data 20022, anomaly data 2001


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  normal_data['Label'] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  anomaly_data['Label'] = 1


new file in used
encoder done
