<a href="https://colab.research.google.com/github/akashblsbrmnm/rdk-bug-analyser/blob/main/bug_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --no-cache-dir numpy pandas faiss-cpu sentence-transformers transformers torch tqdm packaging openpyxl accelerate -q

In [None]:
from google.colab import files

print("Please upload your Excel file with bug reports:")
uploaded = files.upload()

excel_files = [f for f in uploaded.keys() if f.lower().endswith(('.xlsx', '.xls'))]
if not excel_files:
    raise ValueError("No valid Excel file found (.xlsx or .xls required)")

file_path = excel_files[0]
print(f"\nSuccessfully uploaded: {file_path} ({len(uploaded[file_path])} bytes)")

In [None]:
%%writefile bug_analyzer.py
import numpy as np
import pandas as pd
import faiss
import logging
import re
from typing import Dict, List, Any
from sentence_transformers import SentenceTransformer
from packaging import version
from transformers import pipeline
import torch
from tqdm import tqdm
from collections import defaultdict

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class BugAnalysis:
    def __init__(self, excel_path: str,
                 model_name: str = "all-MiniLM-L6-v2",
                 llm_model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
                 duplicate_threshold: float = 0.7,
                 use_gpu: bool = False):

        # Configure device
        self.device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"
        self.duplicate_threshold = duplicate_threshold
        logger.info(f"Using device: {self.device.upper()}")

        # Initialize embedding model
        self.model = SentenceTransformer(model_name, device=self.device)

        # Load bug data
        self.bugs_df = self.load_excel_data(excel_path)
        self.bug_embeddings = self.compute_embeddings()

        # Initialize FAISS index
        self.index = self._initialize_faiss_index()

        # Initialize LLM
        self.has_generator = False
        self._initialize_llm(llm_model_name, use_gpu)

    def _initialize_llm(self, model_name: str, use_gpu: bool) -> None:
        """Initialize LLM with fallback"""
        try:
            logger.info(f"Initializing LLM: {model_name}")
            self.generator = pipeline(
                "text-generation",
                model=model_name,
                device=0 if use_gpu else -1,
                torch_dtype=torch.float16 if use_gpu else torch.float32
            )
            self.has_generator = True
        except Exception as e:
            logger.error(f"LLM initialization failed: {str(e)}")
            self.has_generator = False

    def load_excel_data(self, file_path: str) -> pd.DataFrame:
        try:
            df = pd.read_excel(file_path, engine="openpyxl")
            required_columns = {
                "bug_id", "bug_name", "bug_type",
                "component_affected", "root_cause", "build_number"
            }

            if not required_columns.issubset(df.columns):
                missing = required_columns - set(df.columns)
                raise ValueError(f"Missing columns: {missing}")

            return df.fillna("")
        except Exception as e:
            raise ValueError(f"Excel error: {str(e)}")

    def _initialize_faiss_index(self) -> faiss.Index:
        """Create FAISS index with cosine similarity"""
        index = faiss.IndexFlatIP(self.bug_embeddings.shape[1])
        embeddings = self.bug_embeddings.astype('float32')
        faiss.normalize_L2(embeddings)
        index.add(embeddings)
        return index

    def compute_embeddings(self) -> np.ndarray:
        """Generate and normalize embeddings"""
        bug_texts = self.bugs_df.apply(
            lambda row: f"{row['bug_name']} {row['bug_type']} {row['component_affected']}",
            axis=1
        ).tolist()
        embeddings = self.model.encode(bug_texts, convert_to_numpy=True, show_progress_bar=True)
        faiss.normalize_L2(embeddings)
        return embeddings

    def analyze_bug(self, new_bug: Dict[str, str]) -> Dict[str, Any]:
      """Full analysis workflow"""
      query_text = f"{new_bug['bug_name']} {new_bug['bug_type']} {new_bug['component_affected']}"
      query_embedding = self.model.encode([query_text])
      faiss.normalize_L2(query_embedding)

      D, I = self.index.search(query_embedding.astype('float32'), 5)

      similar_bugs = []
      for score, idx in zip(D[0], I[0]):
          if idx < len(self.bugs_df):
              bug = self.bugs_df.iloc[idx].to_dict()
              bug["score"] = float(score)
              similar_bugs.append(bug)

      sorted_similar_bugs = sorted(similar_bugs, key=lambda x: x["score"], reverse=True)

      return {
          "similar_bugs": sorted_similar_bugs,
          "is_duplicate": any(b["score"] >= self.duplicate_threshold for b in similar_bugs),
          "root_causes": self._analyze_root_causes(similar_bugs),
          "explanation": self._generate_explanation(similar_bugs, new_bug)
      }

    def _analyze_root_causes(self, similar_bugs: List[Dict[str, Any]]) -> List[tuple]:
      """Weighted root cause analysis"""
      causes = defaultdict(float)
      total_score = sum(bug['score'] for bug in similar_bugs)

      for bug in similar_bugs:
          if bug['root_cause'] and total_score > 0:
              causes[bug['root_cause']] += bug['score'] / total_score

      return sorted(causes.items(), key=lambda x: x[1], reverse=True)


    def _generate_explanation(self, similar_bugs: List[Dict], new_bug: Dict) -> str:
        """Generate LLM explanation"""
        if not self.has_generator or not similar_bugs:
            return "Explanation unavailable"

        context = "\n".join([f"- {b['score']:.2f}: {b['bug_name']} ({b['root_cause']})" for b in similar_bugs[:3]])
        prompt = f"""Analyze this new bug:
        Name: {new_bug['bug_name']}
        Type: {new_bug['bug_type']}
        Component: {new_bug['component_affected']}

        Similar bugs:
        {context}

        Technical analysis of likely root causes:"""

        try:
            result = self.generator(prompt, max_length=200)[0]['generated_text']
            return result.split("Technical analysis")[-1].strip()
        except Exception as e:
            logger.error(f"Explanation failed: {str(e)}")
            return "Could not generate explanation"

In [None]:
from bug_analyzer import BugAnalysis

# Initialize with GPU support
analyzer = BugAnalysis(
    excel_path=file_path,
    llm_model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    use_gpu=True
)

In [None]:

new_bug = {
    "bug_name": "device failing to start",
    "bug_type": "crash",
    "component_affected": "core"
}

results = analyzer.analyze_bug(new_bug)

print("\nAnalysis Results:")
print(f"Duplicate: {'Yes' if results['is_duplicate'] else 'No'}")
print("\nTop Similar Bugs:")
for bug in results['similar_bugs']:
    print(f"- {bug['bug_id']} ({bug['score']:.2f}): {bug['root_cause']}")

print("\nRoot Cause Probabilities:")
for cause, prob in results['root_causes']:
    print(f"- {cause}: {prob*100:.1f}%")

print("\nExplanation:", results['explanation'])

In [None]:
# from google.colab import drive
# import json
# from datetime import datetime

# drive.mount('/content/drive')
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# save_path = f"/content/drive/MyDrive/bug_analysis_{timestamp}.json"
# with open(save_path, 'w') as f:
#     json.dump(results, f, indent=2)

# print(f"✅ Results saved to: {save_path}")