
# 🧠 LogLens: GenAI-Powered Root Cause Analysis Assistant for Logs

## 🎯 Use Case

In modern distributed systems, debugging failures across Spark, Kafka, and Airflow can be extremely time-consuming. Engineers often search logs, check documentation, and look for community solutions to resolve recurring issues. **LogLens** transforms this experience by acting as a **GenAI-powered RCA (Root Cause Analysis) assistant**, reducing resolution time and offering clear, actionable suggestions with high confidence.

---

## 🛠️ GenAI Capabilities Used

This notebook integrates **three or more GenAI capabilities**:

1. **Retrieval-Augmented Generation (RAG)** using `ChromaDB` for retrieving similar historical logs.
2. **LLM Reasoning** using `Gemini 2.0 Flash` to generate human-readable error explanations and root causes.
3. **Web Search + Summarization** via `SerpAPI` + `Gemini` to enhance LLM output with live online content.
4. **Confidence Scoring and Audit Agent**: Comparing Gemini vs external answers to improve trust and quality.

---

## 📈 Objective

- Simplify debugging for distributed systems.
- Provide clean, human-readable fixes with supporting explanations.
- Automate log analysis using advanced GenAI flows.



## 🔄 Solution Flow

```mermaid
flowchart TD
    A[User Inputs Error Log] --> B[Retrieve Similar Logs from ChromaDB]
    B --> C[Gemini Agent: Generate RCA]
    C --> D[Web Agent: Search StackOverflow/GitHub]
    D --> E[Gemini: Summarize External Solutions]
    C --> F[Audit Agent: Compare Gemini vs Web Fix]
    F --> G[Return Final Fix with Confidence Score]
    G --> H[Suggest Follow-Up Questions & Learning Resources]
```



## 🔍 Step-by-Step Walkthrough

1. **Log Simulation & Storage**:
    - Simulated ~30+ logs across Spark, Kafka, and Airflow.
    - Stored them in ChromaDB with embedded vectors.

2. **User Error Input**:
    - User enters an error like: `org.apache.spark.shuffle.FetchFailedException...`

3. **LLM RCA Agent**:
    - Gemini retrieves similar logs and summarizes the root cause + fixes in plain English.

4. **Web Fix Agent**:
    - Uses SerpAPI to find relevant StackOverflow or GitHub results.
    - Gemini summarizes external findings into readable fixes.

5. **Audit Agent**:
    - Compares Gemini RCA vs Web fix, then gives a confidence score and verdict.

6. **Chatbot Interface**:
    - Keeps the user engaged with follow-up prompts and allows continuous queries.

---

## 📌 Output Format Example

```
Spark failed to fetch shuffle data. This usually happens when workers crash or run out of memory.
Fix: Try increasing memory, disabling dynamic executor removal, and reviewing executor logs.
Confidence: 0.85
```

---

## 📚 Learning Recommendations

For each error, the assistant provides:
- Key Spark/Kafka/Airflow documentation links
- Concepts like dynamic allocation, GC tuning, etc.
- Tools like Spark UI or monitoring systems

---

## ⚖️ Limitations & Future Work

- Currently works on simulated logs; can be extended to live ingestion from production systems.
- Search depends on SerpAPI rate limits.
- Confidence score is heuristic; can be improved using LLM evaluation frameworks.
- Future plans: Streamlit UI, Slack bot integration, CI/CD DevOps hooks.

---

## ✅ Conclusion

LogLens is a GenAI-first RCA assistant that saves time, boosts engineering productivity, and makes debugging accessible for all engineers. The combination of RAG, LLMs, and web scraping provides a powerful triage workflow.

👉 Try entering your own error at the bottom of this notebook!


In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
# Install necessary packages
!pip uninstall -y google google-cloud-aiplatform google-genai -q
!pip install -q google-generativeai chromadb serpapi

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m67.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m59.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.9/94.9 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m284.2/284.2 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m54.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.6/101.

In [3]:
import os
import json
import uuid
import random
import requests
from datetime import datetime, timedelta
from kaggle_secrets import UserSecretsClient
import chromadb
import google.generativeai as genai

# Get API keys from Kaggle secrets
user_secrets = UserSecretsClient()
genai.configure(api_key=user_secrets.get_secret("google_api_key"))
serpapi_key = user_secrets.get_secret("serpapi_key")

# Load Gemini model
model = genai.GenerativeModel("gemini-2.0-flash")


**Generate Fake Logs + Store in ChromaDB**

In [4]:
# Sample log errors for simulation
error_catalog = {
    "Spark": [
        ("OutOfMemoryError", "Job aborted: java.lang.OutOfMemoryError: Java heap space", "Increase executor memory"),
        ("NotSerializableException", "Task not serializable: java.io.NotSerializableException", "Make UDF class Serializable"),
        ("ClassNotFoundException", "Caused by: java.lang.ClassNotFoundException", "Add missing JAR dependency"),
        ("NullPointerException", "Exception: java.lang.NullPointerException", "Add null checks in code"),
        ("AnalysisException", "cannot resolve column in input schema", "Fix column name or select statement"),
        ("DiskFullError", "Executor lost: No space left on device", "Clean up disk or use bigger disk"),
        ("StageRetryLimit", "Stage failed 4 times, aborting job", "Fix data skew or memory issues"),
        ("FetchFailedException", "org.apache.spark.shuffle.FetchFailedException", "Investigate shuffle configuration"),
        ("FileNotFoundException", "Input path does not exist: s3://...", "Verify file path in job config"),
        ("SparkSubmitError", "spark-submit failed with exit code 1", "Validate spark-submit command and configs")
    ],
    "Airflow": [
        ("BrokenDAG", "Broken DAG: No module named 'plugin'", "Ensure plugin exists in airflow/plugins"),
        ("TriggerRuleError", "Invalid trigger rule: ALL_WRONG", "Use valid trigger like all_success"),
        ("FileNotFoundError", "FileNotFoundError: 'data.csv' not found", "Check file path or upstream task"),
        ("TaskSkipped", "Task skipped due to dependency", "Ensure upstream tasks are healthy"),
        ("NoneTypeError", "'NoneType' object has no attribute 'write'", "Initialize object before usage"),
        ("TaskTimeout", "Task timed out after 300s", "Increase timeout or optimize task"),
        ("ImportError", "ImportError: cannot import airflow.providers...", "Check module install or DAG syntax"),
        ("InvalidCron", "Invalid cron expression: */99 * * * *", "Fix cron syntax"),
        ("SQLAlchemyError", "sqlalchemy.exc.OperationalError", "Check DB connectivity and credentials"),
        ("DeadlockError", "Scheduler deadlock: no heartbeat from workers", "Scale out workers or debug DAGs")
    ],
    "Kafka": [
        ("ConsumerLag", "[WARN] Consumer lag high: 50000", "Scale up consumer instances"),
        ("SSLHandshakeError", "SSL handshake failed with broker", "Check SSL cert and config"),
        ("TopicNotFound", "No such topic 'user_events'", "Create topic before consuming"),
        ("KafkaTimeout", "Timeout expired while committing offsets", "Check broker latency or partition load"),
        ("StuckConsumer", "Consumer stuck for 10+ mins", "Restart or debug consumer group"),
        ("OffsetOutOfRange", "OffsetOutOfRangeException", "Reset offset to earliest/latest"),
        ("LeaderNotAvailable", "No leader for partition 0", "Restart broker or check cluster state"),
        ("BufferExhaustedException", "Buffer full, producer failed", "Increase buffer or reduce message rate"),
        ("UnknownTopicOrPartition", "Unknown topic or partition", "Check spelling and Kafka setup"),
        ("RebalanceInProgress", "RebalanceInProgressException", "Wait or tune rebalance configs")
    ]
}

# Generate logs
def generate_logs():
    logs = []
    now = datetime.utcnow()
    for system, errors in error_catalog.items():
        for idx, (etype, msg, fix) in enumerate(errors):
            logs.append({
                "log_id": f"{system.lower()}-{idx:03}",
                "component": system,
                "timestamp": (now - timedelta(minutes=random.randint(1, 5000))).isoformat() + "Z",
                "error_type": etype,
                "content": msg,
                "expected_fix": fix,
                "is_resolved": False
            })
    return logs

logs = generate_logs()

# Store in ChromaDB
chroma_client = chromadb.PersistentClient(path="/kaggle/working/chroma_db")
collection = chroma_client.get_or_create_collection("logs")

for log in logs:
    doc_id = str(uuid.uuid4())
    text = f"{log['component']} | {log['error_type']}: {log['content']}"
    collection.add(
        ids=[doc_id],
        documents=[text],
        metadatas=[{
            "log_id": log["log_id"],
            "component": log["component"],
            "error_type": log["error_type"],
            "expected_fix": log["expected_fix"]
        }]
    )


/root/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|██████████| 79.3M/79.3M [00:02<00:00, 38.1MiB/s]


**Similar Log Retrieval + RCA Agent (Text Response)**

In [10]:
def retrieve_similar_logs(query_text, top_k=3):
    results = collection.query(query_texts=[query_text], n_results=top_k)
    return results["documents"][0], results["metadatas"][0]

def gemini_rca_summary(log_entry):
    docs, metas = retrieve_similar_logs(log_entry["content"])
    context = "\n".join(
        f"- Log: {doc}\n  Fix: {meta['expected_fix']}"
        for doc, meta in zip(docs, metas)
    )

    prompt = f"""
You're a helpful assistant for debugging system logs.

Here are similar logs:
{context}

Now, for the following issue:
"{log_entry['content']}"

Give a very short and simple explanation:
1. What the issue is (in 1–2 lines)
2. What caused it (in simple words)
3. What can fix it (2–3 quick suggestions)
4. End with a confidence score between 0 and 1

Be brief. Use everyday language. No jargon. No code. No JSON. Just clear plain text.
"""

    return model.generate_content(prompt).text.strip()


**Web Search + Summarizer Agent**

In [11]:
def search_and_summarize_web(log_text):
    params = {
        "engine": "google",
        "q": f"{log_text} site:stackoverflow.com OR site:github.com",
        "api_key": serpapi_key,
        "num": "5"
    }
    res = requests.get("https://serpapi.com/search", params=params).json()
    snippets = [r.get("snippet", "") for r in res.get("organic_results", [])][:3]
    
    context = "\n".join(snippets)
    
    if not context:
        return "No relevant info found online."

    prompt = f"""
You are a GenAI assistant. Given these web snippets:

{context}

Summarize the most effective fix or strategy for this issue.
"""
    return model.generate_content(prompt).text.strip()


**Compare LLM vs Web – Audit Agent**

In [12]:
def audit_agent(log_text, gemini_fix, web_fix):
    prompt = f"""
You are an audit agent comparing two solutions:

Log: {log_text}

--- Gemini's RCA ---
{gemini_fix}

--- External Web Fix ---
{web_fix}

Evaluate which is more accurate, what's missing, and give a confidence score (0–1).
Return plain text (no JSON).
"""
    return model.generate_content(prompt).text.strip()


In [13]:
def run_rca_chat():
    print("👋 Welcome to LogLens AI Assistant!")
    while True:
        user_input = input("\nEnter the error you faced (or type 'exit' to quit):\n> ")
        if user_input.lower() in ["exit", "quit"]:
            print("👋 Goodbye! Stay bug-free.")
            break
        
        # Create log structure
        log = {
            "log_id": "user-log",
            "component": "Unknown",
            "error_type": "UserInput",
            "content": user_input,
            "is_resolved": False,
            "expected_fix": None
        }

        print("\n🤖 Analyzing with Gemini + ChromaDB...")
        gemini_response = gemini_rca_summary(log)
        print("\n🧠 Gemini RCA Suggestion:")
        print(gemini_response)

        print("\n🔍 Searching web for external solutions...")
        web_fix = search_and_summarize_web(user_input)
        print("\n🌐 Web-Based Fix Summary:")
        print(web_fix)

        print("\n📊 Comparing both responses...")
        audit_result = audit_agent(user_input, gemini_response, web_fix)
        print("\n📢 Final Verdict:")
        print(audit_result)

        print("\n💡 You can now enter a follow-up question or another error.")


In [14]:
run_rca_chat()

👋 Welcome to LogLens AI Assistant!



Enter the error you faced (or type 'exit' to quit):
>  Lost executor 1 on host: Executor heartbeat timed out after 128083 ms



🤖 Analyzing with Gemini + ChromaDB...

🧠 Gemini RCA Suggestion:
Log: Spark | ExecutorTimeout: Executor heartbeat timed out

1. Issue: A Spark executor stopped talking to the main process.
2. Cause: The executor took too long to respond, probably due to being overloaded or slow network.
3. Fix: Give executors more resources, check network, or increase timeout.
4. Confidence: 0.8

🔍 Searching web for external solutions...

🌐 Web-Based Fix Summary:
No relevant info found online.

📊 Comparing both responses...

📢 Final Verdict:
Gemini's RCA is a reasonable and generally correct assessment. The log message clearly indicates an executor timeout, suggesting the executor became unresponsive within the specified time limit (128083 ms). The proposed causes (overloaded executor, slow network) are the most likely culprits. The suggested fixes (more resources, network check, increased timeout) are also standard troubleshooting steps.

What's missing:

*   More specific debugging steps. While the s


Enter the error you faced (or type 'exit' to quit):
>  spark.conf.set("spark.task.maxFailures", "4") wouldnt this help?Is this what you suggested in the previous response



🤖 Analyzing with Gemini + ChromaDB...

🧠 Gemini RCA Suggestion:
1. It looks like you're trying to increase the number of times a task can fail before the whole Spark job fails.
2. This usually happens when tasks fail due to flaky data or temporary issues.
3. Setting `spark.task.maxFailures` can help, but it's a band-aid. Dig into *why* the tasks are failing in the first place. Check for bad data or resource problems.
4. Confidence: 0.6

🔍 Searching web for external solutions...

🌐 Web-Based Fix Summary:
The provided snippets touch on issues related to Apache Spark job failures, specifically:

*   A job being killed with a "SparkDeploySchedulerBackend Error: Application has been killed" message.
*   The need to set timeouts for Spark tasks or map operations to skip long-running tasks.
*   Limiting the number of retries on Spark job failure.

While these snippets don't offer a single "most effective fix", they highlight strategies to *mitigate* issues that lead to job failures and impro


Enter the error you faced (or type 'exit' to quit):
>  exit


👋 Goodbye! Stay bug-free.
