# Unified Log Analyzer with Classification, FAISS Retrieval, and Feedback Loop

In [2]:
import json
import os
import pandas as pd
from pathlib import Path
import joblib
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
from datetime import datetime


# --- Config Paths ---
original_data_path = Path("structured_failures.json")
feedback_data_path = Path("feedback_log.jsonl")
model_path = Path("model/latest_classifier.pkl")
vectorizer_path = Path("model/latest_vectorizer.pkl")
faiss_index_path = Path("model/latest_faiss.index")

2025-04-16 15:41:06.270201: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-16 15:41:06.272717: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-16 15:41:06.304849: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-04-16 15:41:06.304875: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-04-16 15:41:06.305822: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to

## Step 1: Load original and feedback data

In [None]:
print("Loading data from Robot Framework test report...")
with open(original_data_path, "r") as f:
    base_data = json.load(f)

print("Loading data from User Feedback...")
feedback_entries = []
if feedback_data_path.exists():
    with open(feedback_data_path, "r") as f:
        feedback_entries = [json.loads(line) for line in f if line.strip()]

Loading data from Robot Framework test report...


NOTE :

- Here `original_data_path` contains test fails that serve as a learning dataset

Eventually, we can take tests from multiple test executions (multiple `output.xml`) and merge it into a single `.json` file that will be the large dataset with all the training data

- `feedback_data_path` is another `.json` file that contains corrections from the user

These corrections come from the current test execution (tests that have just been run and now the user is correcting the fails), when a prediction is done, the user tries and fix the bug, the correction from the user is added to the learning data for the very next fail analysis

## Step 1.5: Label data

In [4]:
def auto_label_fix_category(data):
    for item in data:
        if "fix_category" not in item or not item["fix_category"]:
            error = item["error"].lower()
            if "missing" in error and "argument" in error:
                item["fix_category"] = "missing_argument"
            elif "not found" in error or "selector" in error:
                item["fix_category"] = "invalid_selector"
            elif "assert" in error or "should be equal" in error:
                item["fix_category"] = "assertion_failed"
            elif "timeout" in error:
                item["fix_category"] = "timeout"
            elif "connection" in error:
                item["fix_category"] = "connection_error"
            else:
                item["fix_category"] = "other"
    return data

print("Labeling data (fail -> fix)...")
base_data = auto_label_fix_category(base_data)

Labeling data (fail -> fix)...


## Step 2: Merge structured + feedback into unified dataset

In [None]:

merged = []

def build_log_text(item):
    msg = f"Test name: {item['test_name']}\n"
    msg += f"Doc: {item.get('doc', '')}\n"
    msg += f"Error: {item['error']}\n"
    for step in item.get("steps", []):
        msg += f"Step: {step['keyword']}\n"
        msg += f"Args: {' '.join(step['args'])}\n"
        msg += f"Status: {step['status']}\n"
        if step.get("doc"):
            msg += f"Doc: {step['doc']}\n"
        if step.get("messages"):
            msg += f"Messages: {' | '.join(step['messages'])}\n"
    return msg

print("preparing training data (x=fail, y=fix_category)...")
for item in base_data:
    merged.append({
        "log_text": build_log_text(item),
        "fix_category": item["fix_category"]
    })

# For tests that have positive feedback OR correction/actual fix informed by the user
# -> add the test to training data
for fb in feedback_entries:
    if fb.get("feedback") == "correct" or fb.get("actual_category"):
        merged.append({
            "log_text": fb["log_text"],
            "fix_category": fb.get("actual_category", fb["predicted_category"])
        })

preparing training data (x=fail, y=fix category)...


## Step 3: Train or reload classifier

In [None]:
texts = [r["log_text"] for r in merged]
labels = [r["fix_category"] for r in merged]

if model_path.exists() and vectorizer_path.exists():
    print("Existing model found, loading model...")
    clf = joblib.load(model_path)
    vectorizer = joblib.load(vectorizer_path)
else:
    print("No existing model found, training model...")
    vectorizer = TfidfVectorizer(max_features=500, stop_words="english")
    X = vectorizer.fit_transform(texts)
    X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, random_state=42)
    clf = RandomForestClassifier()
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    print("\nClassification Report:\n", classification_report(y_test, y_pred))
    joblib.dump(clf, model_path)
    joblib.dump(vectorizer, vectorizer_path)

## Step 4: Rebuild FAISS index

In [None]:
print("Building FAISS index (similarity retrieval)...")
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
faiss_index = faiss.IndexFlatIP(embeddings.shape[1])
faiss_index.add(np.array(embeddings))
faiss.write_index(faiss_index, str(faiss_index_path))



## Step 5: Predict on new fail
 You can replace this with live input or loop

In [None]:
new_data = {
    "test_name": "Connect without API key",
    "error": "TypeError: TestObject.__init__() missing 1 required positional argument: 'api_key'",
    "doc": "Attempts to connect to the server without providing API key.",
    "steps": [
        {
            "keyword": "Connect",
            "args": ["http://localhost"],
            "status": "FAIL",
            "doc": "Connects to backend server using TestObject",
            "messages": ["Connecting to http://localhost", "Exception raised: missing 'api_key'"]
        }
    ]
}

log_text = build_log_text(new_data)

print("Predicting fix for new fail...")
new_vec = vectorizer.transform([log_text])
pred = clf.predict(new_vec)
print("\n Suggested Fix Category:", pred[0])


üß† Suggested Fix Category: missing_argument


## Step 6: Retrieve similar logs via FAISS

In [None]:
print("Looking for similar fails...")
query_embedding = model.encode([log_text], normalize_embeddings=True)
D, I = faiss_index.search(np.array(query_embedding), k=3)

print("\n Top 3 similar failures:")
for rank, idx in enumerate(I[0]):
    print(f"\n#{rank+1} (Score: {D[0][rank]:.2f})")
    print("Log:", texts[idx][:300].replace("\n", " ") + "...")
    print("Fix Category:", labels[idx])


üîç Top 3 similar failures:

#1 (Score: 0.66)
Log: Test name: Update User with Admin Rights Doc: Changes Password of an existing user. Error: Parent suite setup failed: TypeError: TestObject.__init__() missing 1 required positional argument: 'api_key' ...
Fix Category: missing_argument

#2 (Score: 0.65)
Log: Test name: Access Other Users Details With User Rights Doc: Tests does fail, due to insufficiant rights... Error: Parent suite setup failed: TypeError: TestObject.__init__() missing 1 required positional argument: 'api_key' ...
Fix Category: missing_argument

#3 (Score: 0.65)
Log: Test name: Access All Users With Admin Rights Doc: Tests if all users can be accessed with Admin User. Error: Parent suite setup failed: TypeError: TestObject.__init__() missing 1 required positional argument: 'api_key' ...
Fix Category: missing_argument


## Step 7: Ask for feedback

In [None]:
feedback = input("\nWas the suggested fix correct? (y/n): ")

actual = None
if feedback.lower() == "n":  # If the feedback is 'no', ask for the correct fix category
    actual = input("What is the correct fix category? (type and press Enter): ")

# Output user feedback
if feedback.lower() == "y":
    print("User confirmed the fix was correct.")
else:
    print(f"User said the fix was incorrect. The correct fix category is: {actual}")

Widget Javascript not detected.  It may not be installed or enabled properly. Reconnecting the current kernel may help.


Widget Javascript not detected.  It may not be installed or enabled properly. Reconnecting the current kernel may help.


## Step 8: Save feedback

In [None]:
entry = {
    "timestamp": datetime.now().isoformat(),
    "log_text": log_text,
    "predicted_category": pred[0],
    "feedback": "correct" if feedback.lower() == "y" else "wrong",
    "actual_category": actual if actual else None
}

print("Saving feedback from user for future predictions...")
with open(feedback_data_path, "a") as f:
    f.write(json.dumps(entry) + "\n")

print("\n‚úÖ Feedback saved.")