In [64]:
import pandas as pd
import re

log_pattern = re.compile(r'(\d{6}) (\d{6}) (\d+) (\w+) ([\w.$]+): (.+)')
block_id_pattern = re.compile(r'blk_[\d-]+')

### Helper Functions

In [65]:
def load_log_file(file_path):
    with open(file_path, 'r') as file:
        raw_logs = file.readlines()
    return [log.strip() for log in raw_logs]

def parse_log(log):
    match = log_pattern.match(log)
    if match:
        return match.groups()
    return None

def extract_block_id(log):
    match = block_id_pattern.search(log)
    if match:
        return match.group()
    return None

def parse_logs(logs):
    parsed_logs = [parse_log(log) for log in logs]
    parsed_logs = [log for log in parsed_logs if log is not None]
    return parsed_logs


In [66]:
# Read raw log file
raw_logs = load_log_file('HDFS.log')

# Parse raw logs
parsed_logs = parse_logs(raw_logs)

# Extract and add block id to parsed logs from log message
block_ids = [extract_block_id(log[5]) for log in parsed_logs]
parsed_logs = [log + (block_id,) for log, block_id in zip(parsed_logs, block_ids)]

# Create DataFrame from parsed logs
columns = ['date', 'time', 'process_id', 'log_level', 'component', 'message', 'block_id']
df = pd.DataFrame(parsed_logs, columns=columns)
df.head()

Unnamed: 0,date,time,process_id,log_level,component,message,block_id
0,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906
1,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,blk_-1608999687919862906
2,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906
3,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906
4,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,blk_-1608999687919862906


In [67]:
# Read ground truth file
ground_truth = pd.read_csv('anomaly_label.csv')
ground_truth.head()

Unnamed: 0,BlockId,Label
0,blk_-1608999687919862906,Normal
1,blk_7503483334202473044,Normal
2,blk_-3544583377289625738,Anomaly
3,blk_-9073992586687739851,Normal
4,blk_7854771516489510256,Normal


In [68]:
# Map block id to anomaly label in ground truth
block_id_to_label = dict(zip(ground_truth['BlockId'], ground_truth['Label']))
df['target'] = df['block_id'].map(block_id_to_label)
df.head()

Unnamed: 0,date,time,process_id,log_level,component,message,block_id,target
0,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,Normal
1,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,blk_-1608999687919862906,Normal
2,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,Normal
3,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,Normal
4,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,blk_-1608999687919862906,Normal


In [69]:
df['target'].value_counts()

target
Normal     10887379
Anomaly      288250
Name: count, dtype: int64

In [70]:
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer

# Ensure NLTK resources are downloaded
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

# Step 1: Remove the substring "blk_<block-id>"
def remove_block_id(text):
    return re.sub(r'blk_\w+', '', text)

# Step 2: Convert text to lowercase
def to_lowercase(text):
    return text.lower()

# Step 3: Tokenize the text
def tokenize(text):
    return word_tokenize(text)

# Step 4: Remove punctuation
def remove_punctuation(tokens):
    return [word for word in tokens if word.isalnum()]

# Step 5: Remove stop words
def remove_stopwords(tokens):
    stop_words = set(stopwords.words('english'))
    return [word for word in tokens if word not in stop_words]

# Step 6: Perform stemming
def stem_words(tokens):
    stemmer = PorterStemmer()
    return [stemmer.stem(word) for word in tokens]

# Step 7: Perform lemmatization
def lemmatize_words(tokens):
    lemmatizer = WordNetLemmatizer()
    return [lemmatizer.lemmatize(word) for word in tokens]

# Complete preprocessing pipeline
def preprocess_log(log):
    # Remove the substring "blk_<block-id>"
    log = remove_block_id(log)   
    # Convert text to lowercase
    log = to_lowercase(log) 
    # Tokenize the text
    tokens = tokenize(log) 
    # Remove punctuation
    tokens = remove_punctuation(tokens)
    # Remove stop words
    tokens = remove_stopwords(tokens)
    # Perform stemming
    tokens = stem_words(tokens)   
    # Perform lemmatization
    tokens = lemmatize_words(tokens)

    # convert list of tokens to a string
    return tokens


[nltk_data] Downloading package punkt to
[nltk_data]     /Users/dhairyaparikh/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/dhairyaparikh/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/dhairyaparikh/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/dhairyaparikh/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [71]:
# Apply preprocessing to log messages
df['message_processed'] = df['message'].apply(preprocess_log)

In [72]:
df['message_processed'].head()

0           [receiv, block, src, dest]
1                              [block]
2           [receiv, block, src, dest]
3           [receiv, block, src, dest]
4    [packetrespond, 1, block, termin]
Name: message_processed, dtype: object

In [78]:
# Split the data into training and testing sets using stratified sampling
from sklearn.model_selection import train_test_split

X = df['message_processed']
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert text data to numerical data using TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer()
X_train_tfidf = vectorizer.fit_transform(X_train.map(lambda x: ' '.join(x)))
X_test_tfidf = vectorizer.transform(X_test.map(lambda x: ' '.join(x)))

# Train a Logistic Regression model
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(max_iter=1000)
model.fit(X_train_tfidf, y_train)

# Evaluate the model
from sklearn.metrics import classification_report
# Print accuracy

print('Accuracy:', model.score(X_test_tfidf, y_test))

y_pred = model.predict(X_test_tfidf)
print(classification_report(y_test, y_pred))



Accuracy: 0.9762630831550436
              precision    recall  f1-score   support

     Anomaly       0.77      0.11      0.20     57650
      Normal       0.98      1.00      0.99   2177476

    accuracy                           0.98   2235126
   macro avg       0.87      0.56      0.59   2235126
weighted avg       0.97      0.98      0.97   2235126

