In [393]:
"""

Java/HDFS logs

t1 INFO added user
t2 ERROR deleted user

Elastic search

t1 | INFO | added user
t2 | ERROR | deleted user

ML 

Events

event_id | event
e1 | added user <>
e2 | deleted user <>


Logs

0-5 mins - tw1
t1 | INFO | added user 1
t2 | ERROR | deleted user 2
t3 | INFO | added user 3
t4 | ERROR | deleted user 4
t5 | INFO | added user 5


6-10 mins
t6 | ERROR | deleted user 6
t7 | INFO | added user 7
t8 | ERROR | deleted user 8

11-15 mins
t9 | INFO | added user 9
t10 | ERROR | deleted user 0


Log_summary


time_window_id | count_info | count_error | count_e1 | count_e2
tw1 | 3 | 2 | 3 | 2
tw2 | 1 | 2 | 1 | 2
tw3 | 1 | 1 | 1 | 1

"""

'\n\nJava/HDFS logs\n\nt1 INFO added user\nt2 ERROR deleted user\n\nElastic search\n\nt1 | INFO | added user\nt2 | ERROR | deleted user\n\nML \n\nEvents\n\nevent_id | event\ne1 | added user <>\ne2 | deleted user <>\n\n\nLogs\n\n0-5 mins - tw1\nt1 | INFO | added user 1\nt2 | ERROR | deleted user 2\nt3 | INFO | added user 3\nt4 | ERROR | deleted user 4\nt5 | INFO | added user 5\n\n\n6-10 mins\nt6 | ERROR | deleted user 6\nt7 | INFO | added user 7\nt8 | ERROR | deleted user 8\n\n11-15 mins\nt9 | INFO | added user 9\nt10 | ERROR | deleted user 0\n\n\nLog_summary\n\n\ntime_window_id | count_info | count_error | count_e1 | count_e2\ntw1 | 3 | 2 | 3 | 2\ntw2 | 1 | 2 | 1 | 2\ntw3 | 1 | 1 | 1 | 1\n\n'

In [394]:
import pandas as pd
import os
import numpy as np
import re
from sklearn.utils import shuffle
from collections import OrderedDict
import sys
sys.path.append('../')
from log_parser import Drain

log_file_path = 'data/unstructured/HDFS/'
label_file_name = 'data/unstructured/HDFS/anomaly_label.csv'
unstructured_log_filename = 'HDFS_2k.log'
structured_log_file_path = 'data/structured/HDFS/'
structured_log_filename = 'HDFS_2k.log_structured.csv'


def parseLog(log_file_path, log_file_name, structured_log_file_path, log_type):
    if log_type == 'HDFS':
        log_format = '<Date> <Time> <Pid> <Level> <Component>: <Content>'

    # Regular expression list for optional preprocessing (default: [])
    regex      = [
        r'blk_(|-)[0-9]+' , # block id
        r'(/|)([0-9]+\.){3}[0-9]+(:[0-9]+|)(:|)', # IP
        r'(?<=[^A-Za-z0-9])(\-?\+?\d+)(?=[^A-Za-z0-9])|[0-9]+$', # Numbers
    ]
    st         = 0.5  # Similarity threshold
    depth      = 4  # Depth of all leaf nodes

    parser = Drain.LogParser(log_format, indir=log_file_path, outdir=structured_log_file_path,  depth=depth, st=st, rex=regex)
    parser.parse(log_file_name)

## parse the logs - convert unstructured to structured log
parseLog(log_file_path, unstructured_log_filename, structured_log_file_path, 'HDFS')
    

## read structured log 
print("Loading", structured_log_file_path+structured_log_filename)
structured_log = pd.read_csv(structured_log_file_path+structured_log_filename, engine='c', na_filter=False, memory_map=True)

structured_log

Parsing file: data/unstructured/HDFS/HDFS_2k.log
Processed 50.0% of log lines.
Processed 100.0% of log lines.
Parsing done. [Time taken: 0:00:00.307728]
Loading data/structured/HDFS/HDFS_2k.log_structured.csv


Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList
0,1,81109,203615,148,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_38865049064139...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['1', 'blk_38865049064139660']"
1,2,81109,203807,222,INFO,dfs.DataNode$PacketResponder,PacketResponder 0 for block blk_-6952295868487...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['0', 'blk_-6952295868487656571']"
2,3,81109,204005,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.addStoredBlock: blockMap upd...,5d5de21c,BLOCK* NameSystem.addStoredBlock: blockMap upd...,"['10.251.73.220:50010', 'blk_71283702376877284..."
3,4,81109,204015,308,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_82291938032499...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['2', 'blk_8229193803249955061']"
4,5,81109,204106,329,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_-6670958622368...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['2', 'blk_-6670958622368987959']"
...,...,...,...,...,...,...,...,...,...,...
1995,1996,81111,101621,24902,INFO,dfs.DataNode$DataXceiver,Receiving block blk_4198733391373026104 src: /...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_4198733391373026104', '/10.251.106.10:46..."
1996,1997,81111,101735,26595,INFO,dfs.DataNode$PacketResponder,Received block blk_-5815145248455404269 of siz...,e3df2680,Received block <*> of size <*> from <*>,"['blk_-5815145248455404269', '67108864', '/10...."
1997,1998,81111,101804,26494,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-295306975763175640 src: /...,09a53393,Receiving block <*> src: <*> dest: <*>,"['blk_-295306975763175640', '/10.250.9.207:532..."
1998,1999,81111,101954,26414,INFO,dfs.DataNode$PacketResponder,PacketResponder 0 for block blk_52257196770490...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['0', 'blk_5225719677049010638']"


In [395]:
def fill_zeros(x):
    if len(str(x)) < 6:
        return str(x).zfill(6)
    else:
        return str(x)
    
structured_log['Date'] = structured_log['Date'].apply(fill_zeros)
structured_log['Time'] = structured_log['Time'].apply(fill_zeros)

structured_log.loc[:,'Date'] = pd.to_datetime(structured_log.Date.astype(str)+' '+structured_log.Time.astype(str), format="%y%m%d %H%M%S")
# structured_log.set_index("Date", inplace=True)

structured_log = structured_log.drop(columns=['Time'])

structured_log.head()

Unnamed: 0,LineId,Date,Pid,Level,Component,Content,EventId,EventTemplate,ParameterList
0,1,2008-11-09 20:36:15,148,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_38865049064139...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['1', 'blk_38865049064139660']"
1,2,2008-11-09 20:38:07,222,INFO,dfs.DataNode$PacketResponder,PacketResponder 0 for block blk_-6952295868487...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['0', 'blk_-6952295868487656571']"
2,3,2008-11-09 20:40:05,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.addStoredBlock: blockMap upd...,5d5de21c,BLOCK* NameSystem.addStoredBlock: blockMap upd...,"['10.251.73.220:50010', 'blk_71283702376877284..."
3,4,2008-11-09 20:40:15,308,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_82291938032499...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['2', 'blk_8229193803249955061']"
4,5,2008-11-09 20:41:06,329,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_-6670958622368...,dc2c74b7,PacketResponder <*> for block <*> terminating,"['2', 'blk_-6670958622368987959']"


In [396]:
# TODO: Cleaning: Remove rows and colums with count 0 (no data present)


In [397]:
# Adding relevant columns to the dataframe
LOG_LEVELS = ['WARN', 'INFO', 'DEBUG', 'TRACE', 'ERROR', 'FATAL']
df_grouped = structured_log.groupby(pd.Grouper(key='Date', freq='5Min',closed='right',label='right')).agg(
    total_msgs=pd.NamedAgg(column="Content", aggfunc="count"),    
).reset_index()

for level in LOG_LEVELS:
    df_grouped[level + '_count'] = 0

for event_id in structured_log['EventId'].unique():
    df_grouped[event_id + '_count'] = 0

df_grouped.head()

Unnamed: 0,Date,total_msgs,WARN_count,INFO_count,DEBUG_count,TRACE_count,ERROR_count,FATAL_count,dc2c74b7_count,5d5de21c_count,...,dba996ef_count,626085d5_count,81cee340_count,d63ef163_count,40651754_count,04137b95_count,d6b7b743_count,98012f03_count,728076ac_count,415a1760_count
0,2008-11-09 20:40:00,2,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2008-11-09 20:45:00,6,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,2008-11-09 20:50:00,7,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2008-11-09 20:55:00,6,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,2008-11-09 21:00:00,8,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [398]:
# Populating all the log level counts
df_grouped_logLevel = structured_log.groupby([pd.Grouper(key='Date', freq='5Min',closed='right',label='right'), 'Level']).agg(
    count=pd.NamedAgg(column="Level", aggfunc="count"),    
).reset_index()

for row in df_grouped_logLevel.itertuples():
    df_grouped.loc[df_grouped['Date'] == row.Date, row.Level + '_count'] = row.count

df_grouped.head()

Unnamed: 0,Date,total_msgs,WARN_count,INFO_count,DEBUG_count,TRACE_count,ERROR_count,FATAL_count,dc2c74b7_count,5d5de21c_count,...,dba996ef_count,626085d5_count,81cee340_count,d63ef163_count,40651754_count,04137b95_count,d6b7b743_count,98012f03_count,728076ac_count,415a1760_count
0,2008-11-09 20:40:00,2,0,2,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2008-11-09 20:45:00,6,0,6,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,2008-11-09 20:50:00,7,0,7,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2008-11-09 20:55:00,6,0,6,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,2008-11-09 21:00:00,8,0,8,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [399]:
# Populating all the event id counts
df_grouped_eventId = structured_log.groupby([pd.Grouper(key='Date', freq='5Min',closed='right',label='right'), 'EventId']).agg(
    count=pd.NamedAgg(column="EventId", aggfunc="count"),    
).reset_index()

for row in df_grouped_eventId.itertuples():
    df_grouped.loc[df_grouped['Date'] == row.Date, row.EventId + '_count'] = row.count

df_grouped

Unnamed: 0,Date,total_msgs,WARN_count,INFO_count,DEBUG_count,TRACE_count,ERROR_count,FATAL_count,dc2c74b7_count,5d5de21c_count,...,dba996ef_count,626085d5_count,81cee340_count,d63ef163_count,40651754_count,04137b95_count,d6b7b743_count,98012f03_count,728076ac_count,415a1760_count
0,2008-11-09 20:40:00,2,0,2,0,0,0,0,2,0,...,0,0,0,0,0,0,0,0,0,0
1,2008-11-09 20:45:00,6,0,6,0,0,0,0,2,4,...,0,0,0,0,0,0,0,0,0,0
2,2008-11-09 20:50:00,7,0,7,0,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
3,2008-11-09 20:55:00,6,0,6,0,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
4,2008-11-09 21:00:00,8,0,8,0,0,0,0,0,3,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
449,2008-11-11 10:05:00,8,0,8,0,0,0,0,2,3,...,1,0,0,0,0,0,0,0,0,0
450,2008-11-11 10:10:00,6,0,6,0,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
451,2008-11-11 10:15:00,15,0,15,0,0,0,0,6,3,...,0,0,0,0,0,0,0,0,0,0
452,2008-11-11 10:20:00,4,0,4,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
