# HDFS Anomaly Detection using LSTM

## Importing data 

In [1]:
import os
import re

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from sklearn.utils import class_weight
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.models import load_model
from tqdm import tqdm

In [62]:
load_dotenv()

BASE_PATH = os.getenv('BASE_PATH')

traces = pd.read_csv(os.path.join(BASE_PATH, 'Raw_logs', 'HDFS_v1', 'preprocessed', 'Event_traces.csv'))
labels = pd.read_csv(os.path.join(BASE_PATH, 'Raw_logs', 'HDFS_v1', 'preprocessed', 'anomaly_label.csv'))
log_templates = pd.read_csv(os.path.join(BASE_PATH, 'Raw_logs', 'HDFS_v1', 'preprocessed', 'HDFS.log_templates.csv'))

traces.head()


Unnamed: 0,BlockId,Label,Type,Features,TimeInterval,Latency
0,blk_-1608999687919862906,Success,,"[E5,E22,E5,E5,E11,E11,E9,E9,E11,E9,E26,E26,E26...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",3802
1,bl3544583377289625738k_7503483334202473044,Success,,"[E5,E5,E22,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",3802
2,blk_-,Fail,21.0,"[E5,E22,E5,E5,E11,E9,E11,E9,E11,E9,E3,E26,E26,...","[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",3797
3,blk_-9073992586687739851,Success,,"[E5,E22,E5,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",50448
4,blk_7854771516489510256,Success,,"[E5,E5,E22,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 0.0, 1.0, 48.0, 0.0, 0.0, 0.0, 0.0, 0.0,...",50583


In [65]:
# Convert 'Objects' to lists
import ast

def parse_events(s):
    # find all substrings matching “E” + digits
    return re.findall(r'E\d+', s)

traces['Features'] = traces['Features'].apply(parse_events)
traces.head()

TypeError: expected string or bytes-like object

In [66]:
# Drop type and latency columns
traces = traces.drop(columns=['Type'], axis=1)
traces = traces.drop(columns=['Latency'], axis=1)
traces.info()

KeyError: "['Type'] not found in axis"

In [61]:
# Removing duplicate rows if any

print('Shape before deleting duplicate values:', traces.shape)


Shape before deleting duplicate values: (287645, 4)


In [67]:
# Removing invalid blocks
pattern = r'^blk_-[0-9]+$'
traces = traces[traces['BlockId'].str.match(pattern, na=False)]
traces.head()

Unnamed: 0,BlockId,Label,Features,TimeInterval
0,blk_-1608999687919862906,Success,"[E5, E22, E5, E5, E11, E11, E9, E9, E11, E9, E...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,blk_-9073992586687739851,Success,"[E5, E22, E5, E5, E11, E9, E11, E9, E11, E9, E...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,blk_-2519617320378473615,Success,"[E5, E22, E5, E5, E11, E11, E9, E9, E11, E9, E...","[0.0, 1.0, 9.0, 43.0, 0.0, 0.0, 0.0, 0.0, 0.0,..."
10,blk_-2900490557492272760,Success,"[E5, E5, E22, E5, E11, E9, E11, E9, E11, E9, E...","[0.0, 0.0, 7.0, 44.0, 0.0, 0.0, 0.0, 0.0, 0.0,..."
11,blk_-50273257731426871,Success,"[E5, E5, E22, E5, E9, E11, E9, E11, E11, E9, E...","[0.0, 0.0, 3.0, 39.0, 1.0, 0.0, 0.0, 0.0, 0.0,..."


In [68]:
goal = labels.Label
counts = goal.value_counts()
percent = goal.value_counts(normalize=True)
percent100 = percent.mul(100).round(1).astype(str) + '%'
pd.DataFrame({'Label': counts, 'percent': percent100})


Unnamed: 0_level_0,Label,percent
Label,Unnamed: 1_level_1,Unnamed: 2_level_1
Normal,558223,97.1%
Anomaly,16838,2.9%


In [69]:
df_events = (traces[['BlockId', 'Features']]
             .explode('Features')
             .rename(columns={'Features': 'Event'}))

df_events

Unnamed: 0,BlockId,Event
0,blk_-1608999687919862906,E5
0,blk_-1608999687919862906,E22
0,blk_-1608999687919862906,E5
0,blk_-1608999687919862906,E5
0,blk_-1608999687919862906,E11
...,...,...
575060,blk_-9128742458709757181,E28
575060,blk_-9128742458709757181,E26
575060,blk_-9128742458709757181,E28
575060,blk_-9128742458709757181,E26


In [70]:
event_counts = (
    pd.crosstab(df_events['BlockId'], df_events['Event'])
)
event_counts

Event,E1,E10,E11,E12,E13,E14,E15,E16,E17,E18,...,E27,E28,E29,E3,E4,E5,E6,E7,E8,E9
BlockId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
blk_-1000002529962039464,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3
blk_-100000266894974466,0,0,3,0,0,0,0,0,0,0,...,0,0,0,6,3,3,0,0,0,3
blk_-1000007292892887521,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3
blk_-1000014584150379967,0,0,3,0,0,0,0,0,0,0,...,0,0,0,6,3,3,0,0,0,3
blk_-1000028658773048709,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
blk_-999650644387121533,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3
blk_-999754326029266890,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3
blk_-999918236066348879,0,0,3,0,0,0,0,0,0,0,...,0,0,0,1,2,3,0,0,0,3
blk_-999925873043039166,0,0,3,0,0,0,0,0,0,0,...,0,0,0,0,0,3,0,0,0,3


In [23]:
# How many missing values are in the dataset?
traces.isnull().sum()

BlockId         0
Label           0
Features        0
TimeInterval    0
dtype: int64

In [22]:
traces.isna().sum()

BlockId         0
Label           0
Features        0
TimeInterval    0
dtype: int64

In [34]:
from sklearn.model_selection import train_test_split

x = traces.drop('Label', axis=1)
y = traces['Label'].map({'Success': 0, 'Fail': 1})

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42, stratify=y)

In [33]:
# Apply smote to balance the dataset
from imblearn.over_sampling import SMOTE

smote = SMOTE(
    sampling_strategy='auto',
    k_neighbors=5,
    random_state=42
)

x_train_res, y_train_res = smote.fit_resample(x_train, y_train)

print("Before SMOTE:", y_train.value_counts(normalize=True))
print(" After SMOTE:", y_train_res.value_counts(normalize=True))



ValueError: could not convert string to float: 'blk_-620208026821375986'

In [None]:
def create_sliding_windows(event_sequences, label, window_size_local=10, step_size_local=1):
    x1, y1 = [], []
    for i in range(0, len(event_sequences) - window_size_local, step_size_local):
        x1.append(event_sequences[i: i + window_size_local])
        y1.append(label)
    return np.array(x1), np.array(y1)


## Encoding the sequences

In [None]:
MAX_LEN = 50
VOCAB_SIZE = len(log_templates['EventId'].unique()) + 1

vectorize_layer = TextVectorization(
    max_tokens=VOCAB_SIZE,  # Set your desired vocabulary size
    output_mode='int',
    output_sequence_length=MAX_LEN  # Set your desired sequence length
)

vectorize_layer.adapt(log_templates['EventId'])

## Applying the window function

In [None]:
if os.path.exists(os.path.join("variables", "x_train.npy")) and os.path.exists(
        os.path.join("variables", "y_train.npy")):
    x_train = np.load(os.path.join("variables", "x_train.npy"))
    y_train = np.load(os.path.join("variables", "y_train.npy"))
else:
    x_all = []
    y_all = []

    window_size = 10
    step_size = 1

    for i in tqdm(range(len(data)), desc="Processing events", unit="log"):
        raw_text = data['Features'][i][1:-1].replace(",", " ")
        x_vectorized = vectorize_layer(raw_text)
        label = data['Label'][i]
        x_windows, y_windows = create_sliding_windows(x_vectorized, label, window_size, step_size)

        x_all.append(x_windows)
        y_all.append(y_windows)

    x_train = np.concatenate(x_all, axis=0)
    y_train = np.concatenate(y_all, axis=0)

print(f"X_Train {x_train.shape}")
print(f"y_train shape: {y_train.shape}")



In [None]:
os.makedirs("variables", exist_ok=True)
np.save(os.path.join("variables", "x_train.npy"), x_train)
np.save(os.path.join("variables", "y_train.npy"), y_train)

In [None]:
x_train_final, x_test_final, y_train_final, y_test_final = train_test_split(x_train, y_train, test_size=0.2,
                                                                            random_state=42, stratify=y_train)

## Building the LSTM Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense

embedding_vector_length = 32
model = Sequential()
model.add(Embedding(vectorize_layer.vocabulary_size(), embedding_vector_length))
model.add(LSTM(32))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

modelDropout = Sequential()
modelDropout.add(Embedding(vectorize_layer.vocabulary_size(), embedding_vector_length))
modelDropout.add(LSTM(32, dropout=0.2, recurrent_dropout=0.2))
modelDropout.add(Dense(1, activation='sigmoid'))
modelDropout.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy', 'precision', 'recall'])

print(model.summary())
print(modelDropout.summary())

## Training the LSTM Model

In [None]:


class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train_final),
    y=y_train_final.ravel()
)

# Converter para dicionário:
weights = dict(zip(np.unique(y_train_final), class_weights))

print(weights)

checkpoint = ModelCheckpoint('models/lstm_model_best_dropout.keras', monitor='val_accuracy', save_best_only=True,
                             verbose=1)

model_path = os.path.join('models', 'lstm_model_best_dropout.keras')
if os.path.exists(model_path):
    print('Loading existing model.')
    modelDropout = load_model(model_path)
else:
    print('Model not found, training a new one.')
    # Train the LSTM model
    modelDropout.fit(x_train_final, y_train_final,
                     validation_data=(x_test_final, y_test_final),
                     epochs=3, batch_size=512,
                     class_weight=weights,
                     callbacks=[checkpoint])


# Prediction Stage

In [None]:
text_input = " ".join(event_sequence)
x_vectorized = vectorize_layer(text_input)
x_windows, _ = create_sliding_windows(x_vectorized, None)
predictions = model.predict(x_windows)

In [None]:


plt.plot(predictions)
plt.title("Failure Probability over Time")
plt.xlabel("Window Index")
plt.ylabel("Failure Probability")
plt.show()


In [4]:
BASE_PATH = "~"

log_templates = pd.read_csv(os.path.join(BASE_PATH, 'Raw_logs', 'HDFS_v1', 'preprocessed', 'HDFS.log_templates.csv'))
print(log_templates.head())
log_templates['Regex'] = log_templates['EventTemplate'].apply(
    lambda t: re.compile(re.escape(t).replace(r'\[\*\]', '.*')))


def map_log_to_event(log_line):
    for _, row in log_templates.iterrows():
        if row['Regex'].match(log_line):
            return row['EventId']
    return None


log_file_path = os.path.expanduser(os.path.join(BASE_PATH, 'Raw_logs', 'HDFS_v1', 'HDFS.log'))

if not os.path.exists(log_file_path):
    raise FileNotFoundError(f"No such file or directory: '{log_file_path}'")

event_sequence = []
with open(log_file_path, 'r') as file:
    for line in tqdm(file, desc="Processing log lines", unit="line"):
        event_id = map_log_to_event(line.strip())
        if event_id is not None:
            event_sequence.append(event_id)
            

  EventId                           EventTemplate
0      E1  [*]Adding an already existing block[*]
1      E2        [*]Verification succeeded for[*]
2      E3                 [*]Served block[*]to[*]
3      E4  [*]Got exception while serving[*]to[*]
4      E5    [*]Receiving block[*]src:[*]dest:[*]


Processing log lines: 855118line [03:21, 4236.09line/s] 


KeyboardInterrupt: 