In [13]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-22.0.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (3.2 kB)
Downloading pyarrow-22.0.0-cp312-cp312-manylinux_2_28_x86_64.whl (47.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.7/47.7 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-22.0.0


In [None]:
import os
import re
import json
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report

import pyarrow


In [2]:
# <<< EDIT THIS >>>
DATA_DIR = "/home/andy/6309GP/HDFS/HDFS_v1/preprocessed"  # e.g., "D:/datasets/HDFS_v1/preprocessed"

# Expected files (as you said you have):
# - anomaly_label.csv
# - Event_occurrence_matrix.csv           (count matrix per session x event)
# - Event_traces.csv                      (session -> sequence of event IDs)
# - HDFS.log_templates.csv                (EventId -> EventTemplate)
# - HDFS.npz                              (optional prebuilt sparse matrices)


In [3]:
import ast, re
from pathlib import Path
import pandas as pd
import numpy as np

DATA_DIR = Path(DATA_DIR)  # reuse your existing variable

# ---------- 1) Load labels (prefer anomaly_label.csv if present) ----------
labels_path = DATA_DIR / "anomaly_label.csv"
if labels_path.exists():
    y_df = pd.read_csv(labels_path, engine="python")
    # Find id + label columns
    sid_col = next((c for c in ["SessionId","BlockId","BlockID","Id","ID"] if c in y_df.columns), None)
    if sid_col is None:
        raise ValueError("anomaly_label.csv: could not find a session/block id column.")
    if "Label" not in y_df.columns and "Anomaly" in y_df.columns:
        y_df["Label"] = y_df["Anomaly"]
    if "Label" not in y_df.columns:
        raise ValueError("anomaly_label.csv: expected 'Label' or 'Anomaly' column.")
    # Normalize labels to {0,1}
    if y_df["Label"].dtype == object:
        y_df["Label"] = (
            y_df["Label"].astype(str).str.strip().str.lower()
            .map({"success":0, "normal":0, "fail":1, "failure":1, "anomaly":1, "1":1, "0":0})
            .fillna(0).astype(int)
        )
    else:
        y_df["Label"] = y_df["Label"].astype(int)
    y_df = y_df[[sid_col,"Label"]].rename(columns={sid_col:"SessionId"})
else:
    y_df = None  # we'll fallback to labels inside Event_traces.csv if present


# ---------- 2) Load traces; auto-detect the EventSequence column ----------
traces_path = DATA_DIR / "Event_traces.csv"
df = pd.read_csv(traces_path, engine="python")

# helper: detect column that contains [E5,E22,...] lists
def looks_like_event_list(s: pd.Series) -> bool:
    s = s.astype(str).head(50)
    # at least 60% of sampled rows match [E123(,E456)*]
    frac = s.str.contains(r'^\s*\[\s*E\d+(?:\s*,\s*E\d+)*\s*\]\s*$', regex=True).mean()
    return frac >= 0.6

event_col = None
for c in df.columns:
    try:
        if looks_like_event_list(df[c]):
            event_col = c
            break
    except Exception:
        pass
if event_col is None:
    raise ValueError("Could not find a column with event ID lists like [E5,E22,...] in Event_traces.csv.")

# session/block id
sid_col = next((c for c in ["SessionId","BlockId","BlockID","Id","ID"] if c in df.columns), None)
if sid_col is None:
    raise ValueError("Event_traces.csv: could not find a session/block id column (e.g., BlockId/SessionId).")

# label (optional inside Event_traces.csv)
label_in_traces = "Label" in df.columns

# parse the [E..] list to a space-separated string for TF-IDF(ID)
def event_list_to_space(seq_str: str) -> str:
    s = str(seq_str)
    # try safe literal parse first
    try:
        lst = ast.literal_eval(s)
        # ensure strings like 'E5'
        return " ".join(str(x) for x in lst)
    except Exception:
        # fallback: strip brackets/spaces and split by comma
        s2 = re.sub(r'[\[\]\s]', '', s)
        toks = [t for t in s2.split(",") if t]
        return " ".join(toks)

traces = df[[sid_col, event_col]].rename(columns={sid_col:"SessionId", event_col:"EventSequence"}).copy()
traces["EventSequence"] = traces["EventSequence"].astype(str).apply(event_list_to_space)

# bring labels: prefer anomaly_label.csv if available, else derive from Event_traces.csv
if y_df is not None:
    data_df = traces.merge(y_df, on="SessionId", how="inner")
else:
    if not label_in_traces:
        raise ValueError("No anomaly_label.csv and no 'Label' column in Event_traces.csv; cannot build labels.")
    tmp = df[[sid_col,"Label"]].rename(columns={sid_col:"SessionId"})
    # normalize textual labels
    tmp["Label"] = (
        tmp["Label"].astype(str).str.strip().str.lower()
        .map({"success":0, "normal":0, "fail":1, "failure":1, "anomaly":1, "1":1, "0":0})
        .fillna(0).astype(int)
    )
    data_df = traces.merge(tmp, on="SessionId", how="inner")

print("Data shape:", data_df.shape)
print(data_df.head(2))
print("Label distribution:", data_df["Label"].value_counts().to_dict())


Data shape: (575061, 3)
                  SessionId  \
0  blk_-1608999687919862906   
1   blk_7503483334202473044   

                                       EventSequence  Label  
0  E5 E22 E5 E5 E11 E11 E9 E9 E11 E9 E26 E26 E26 ...      0  
1  E5 E5 E22 E5 E11 E9 E11 E9 E11 E9 E26 E26 E26 ...      0  
Label distribution: {0: 558223, 1: 16838}


In [4]:
# Load templates (EventId -> EventTemplate)
templates_path = Path(DATA_DIR) / "HDFS.log_templates.csv"
tpl_df = pd.read_csv(templates_path)

# Normalize column names
eid_col = None
tpl_col = None
for c in tpl_df.columns:
    if c.lower() in ("eventid", "eid", "id", "event_id"):
        eid_col = c
    if c.lower() in ("eventtemplate", "template", "logtemplate", "log_template"):
        tpl_col = c
if eid_col is None or tpl_col is None:
    raise ValueError("HDFS.log_templates.csv must have columns for EventId and EventTemplate.")
tpl_df = tpl_df[[eid_col, tpl_col]].rename(columns={eid_col: "EventId", tpl_col: "EventTemplate"})

# Make EventId strings like 'E1' consistent if needed
tpl_df["EventId"] = tpl_df["EventId"].astype(str)
print(tpl_df.head(), tpl_df.shape)


  EventId                           EventTemplate
0      E1  [*]Adding an already existing block[*]
1      E2        [*]Verification succeeded for[*]
2      E3                 [*]Served block[*]to[*]
3      E4  [*]Got exception while serving[*]to[*]
4      E5    [*]Receiving block[*]src:[*]dest:[*] (29, 2)


In [15]:
import time
import pandas as pd
import numpy as np

# Merge step
data_df = traces.merge(y_df, on="SessionId", how="inner")
print(data_df.head(), data_df.shape, data_df["Label"].value_counts(dropna=False))

# Generate timestamped filename
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"sequence_hdfs_{timestamp}.bin"

# Save as binary (structured NumPy array)
data_df.to_feather(filename)

print(f" DataFrame saved as {filename}")


                  SessionId  \
0  blk_-1608999687919862906   
1   blk_7503483334202473044   
2  blk_-3544583377289625738   
3  blk_-9073992586687739851   
4   blk_7854771516489510256   

                                       EventSequence  Label  
0  E5 E22 E5 E5 E11 E11 E9 E9 E11 E9 E26 E26 E26 ...      0  
1  E5 E5 E22 E5 E11 E9 E11 E9 E11 E9 E26 E26 E26 ...      0  
2  E5 E22 E5 E5 E11 E9 E11 E9 E11 E9 E3 E26 E26 E...      1  
3  E5 E22 E5 E5 E11 E9 E11 E9 E11 E9 E26 E26 E26 ...      0  
4  E5 E5 E22 E5 E11 E9 E11 E9 E11 E9 E26 E26 E26 ...      0   (575061, 3) Label
0    558223
1     16838
Name: count, dtype: int64
 DataFrame saved as sequence_hdfs_20251024_183457.bin


In [9]:
# Keep only tokens that look like event IDs (E#, if you prefer), but simplest is just split on space
def identity_tokenizer(s: str):
    # For pre-tokenized input (space-separated)
    return s.split()

tfidf_id = TfidfVectorizer(tokenizer=identity_tokenizer, lowercase=False, preprocessor=None)
X_id = tfidf_id.fit_transform(data_df["EventSequence"].astype(str))

feature_names_id = np.array(tfidf_id.get_feature_names_out())
X_id.shape, len(feature_names_id)




((575061, 29), 29)

In [None]:
############ TODO : redo for 10 fold cross validation ###########################
# Train/test split (same 70/30 as paper)
Xtr_id, Xte_id, ytr, yte = train_test_split(X_id, data_df["Label"].values, test_size=0.30, random_state=42, stratify=data_df["Label"].values)

# Quick baseline model to verify features work (Logistic Regression)
clf_id = LogisticRegression(max_iter=200, class_weight="balanced", n_jobs=None)
clf_id.fit(Xtr_id, ytr)
yp_id = clf_id.predict(Xte_id)
print(classification_report(yte, yp_id, digits=4))


              precision    recall  f1-score   support

           0     1.0000    0.9986    0.9993    167468
           1     0.9544    0.9996    0.9765      5051

    accuracy                         0.9986    172519
   macro avg     0.9772    0.9991    0.9879    172519
weighted avg     0.9987    0.9986    0.9986    172519



In [11]:
# Save TF-IDF(ID) artifacts for reuse
out_dir = Path(DATA_DIR) / "tfidf_outputs"
out_dir.mkdir(exist_ok=True, parents=True)

np.savez_compressed(out_dir / "tfidf_id_splits.npz",
                    Xtr=Xtr_id, Xte=Xte_id, ytr=ytr, yte=yte)
with open(out_dir / "tfidf_id_feature_names.json", "w", encoding="utf-8") as f:
    json.dump(feature_names_id.tolist(), f, ensure_ascii=False, indent=2)

print(f"Saved: {out_dir/'tfidf_id_splits.npz'} and feature names JSON")


Saved: /home/andy/6309GP/HDFS/HDFS_v1/preprocessed/tfidf_outputs/tfidf_id_splits.npz and feature names JSON


In [9]:
# Build mapping EventId -> Template text
eid2tpl = dict(zip(tpl_df["EventId"].astype(str), tpl_df["EventTemplate"].astype(str)))

def session_text_from_ids(seq: str) -> str:
    # seq is "E1 E2 E1 ...": map each to template text and concatenate
    words = []
    for eid in seq.split():
        tpl = eid2tpl.get(str(eid))
        if tpl:
            words.append(tpl)
        else:
            # fallback: if EventId isn't in template table (shouldn't happen), include raw ID
            words.append(str(eid))
    # repeating templates naturally weights them by frequency
    return " . ".join(words)

data_df["SessionTemplateText"] = data_df["EventSequence"].astype(str).apply(session_text_from_ids)
print(data_df[["SessionId", "SessionTemplateText"]].head(3))


                  SessionId                                SessionTemplateText
0  blk_-1608999687919862906  [*]Receiving block[*]src:[*]dest:[*] . [*]BLOC...
1   blk_7503483334202473044  [*]Receiving block[*]src:[*]dest:[*] . [*]Rece...
2  blk_-3544583377289625738  [*]Receiving block[*]src:[*]dest:[*] . [*]BLOC...


In [10]:
# TF-IDF over template text tokens
# Strip numbers by token pattern if you want to mimic common preprocessing:
# token_pattern=r"(?u)\b[a-zA-Z][a-zA-Z_]+\b"  -> keeps alphabetic tokens
tfidf_text = TfidfVectorizer(
    max_features=None,
    ngram_range=(1, 1),
    token_pattern=r"(?u)\b[a-zA-Z][a-zA-Z_]+\b"  # ignore pure numbers and punctuation
)

X_txt = tfidf_text.fit_transform(data_df["SessionTemplateText"])
feature_names_txt = np.array(tfidf_text.get_feature_names_out())
X_txt.shape, len(feature_names_txt)


((575061, 74), 74)

In [11]:
# Train/test split (70/30)
Xtr_txt, Xte_txt, ytr, yte = train_test_split(X_txt, data_df["Label"].values, test_size=0.30, random_state=42, stratify=data_df["Label"].values)

# Quick baseline classifier
clf_txt = LogisticRegression(max_iter=200, class_weight="balanced", n_jobs=None)
clf_txt.fit(Xtr_txt, ytr)
yp_txt = clf_txt.predict(Xte_txt)
print(classification_report(yte, yp_txt, digits=4))


              precision    recall  f1-score   support

           0     1.0000    0.9965    0.9982    167468
           1     0.8948    1.0000    0.9445      5051

    accuracy                         0.9966    172519
   macro avg     0.9474    0.9982    0.9713    172519
weighted avg     0.9969    0.9966    0.9966    172519



In [12]:
# Save TF-IDF(Text) artifacts
np.savez_compressed(out_dir / "tfidf_text_splits.npz",
                    Xtr=Xtr_txt, Xte=Xte_txt, ytr=ytr, yte=yte)
with open(out_dir / "tfidf_text_feature_names.json", "w", encoding="utf-8") as f:
    json.dump(feature_names_txt.tolist(), f, ensure_ascii=False, indent=2)

print(f"Saved: {out_dir/'tfidf_text_splits.npz'} and feature names JSON")


Saved: HDFS_v1\preprocessed\tfidf_outputs\tfidf_text_splits.npz and feature names JSON


In [14]:
# Show a couple of anomalous vs normal sessions’ most-weighted TF-IDF(ID) features
def top_k_nonzero_features(row_vec, feature_names, k=10):
    row = row_vec.tocsr()
    start, end = row.indptr[0], row.indptr[1]
    cols = row.indices[start:end]
    vals = row.data[start:end]
    if vals.size == 0:
        return []
    top = np.argsort(vals)[::-1][:k]
    return [(feature_names[cols[i]], float(vals[i])) for i in top]


sample_idx = np.random.choice(Xte_id.shape[0], 3, replace=False)
for i in sample_idx:
    feats = top_k_nonzero_features(Xte_id[i], feature_names_id, k=10)
    print(f"Sample {i}, label={yte[i]}, top TF-IDF(ID) terms:", feats[:10])


Sample 143870, label=0, top TF-IDF(ID) terms: [('E23', 0.4268472363412375), ('E21', 0.42561380306813124), ('E9', 0.3558869640127482), ('E26', 0.3558869640127482), ('E11', 0.3558708728946056), ('E5', 0.35206623822982014), ('E2', 0.34492935215939297), ('E22', 0.11735541274327338)]
Sample 71818, label=0, top TF-IDF(ID) terms: [('E4', 0.5121531324880011), ('E23', 0.37370254829949945), ('E21', 0.3726226838466611), ('E9', 0.311577196793221), ('E26', 0.311577196793221), ('E11', 0.3115631091024902), ('E5', 0.3082321711262576), ('E3', 0.2498043776813618), ('E22', 0.10274405704208588)]
Sample 11032, label=0, top TF-IDF(ID) terms: [('E23', 0.4547562409076315), ('E21', 0.4534421607615443), ('E9', 0.3791562979995132), ('E26', 0.3791562979995132), ('E11', 0.37913915477876464), ('E5', 0.37508575765942254), ('E22', 0.12502858588647417)]


In [15]:
# Peek template-text features for another random sample
sample_idx = np.random.choice(Xte_txt.shape[0], 3, replace=False)
for i in sample_idx:
    feats = top_k_nonzero_features(Xte_txt[i], feature_names_txt, k=10)
    print(f"Sample {i}, label={yte[i]}, top TF-IDF(Text) terms:", feats[:10])


Sample 145574, label=0, top TF-IDF(Text) terms: [('block', 0.6447014105224141), ('to', 0.2932640544507517), ('namesystem', 0.22564549368284495), ('is', 0.19550936963383445), ('of', 0.19550936963383445), ('size', 0.19550936963383445), ('added', 0.19550936963383445), ('serving', 0.16068367184645865), ('while', 0.16068367184645865), ('got', 0.1606816773796902)]
Sample 6804, label=0, top TF-IDF(Text) terms: [('block', 0.685125583603246), ('size', 0.3196434308349457), ('namesystem', 0.21080787187792185), ('to', 0.15982171541747284), ('blockmap', 0.15982171541747284), ('added', 0.15982171541747284), ('is', 0.15982171541747284), ('updated', 0.15982171541747284), ('of', 0.15982171541747284), ('from', 0.15982171541747284)]
Sample 116817, label=0, top TF-IDF(Text) terms: [('block', 0.685125583603246), ('size', 0.3196434308349457), ('namesystem', 0.21080787187792185), ('to', 0.15982171541747284), ('blockmap', 0.15982171541747284), ('added', 0.15982171541747284), ('is', 0.15982171541747284), ('upd