In [None]:
import requests
import json

API_KEY =
API_URL = "https://api.deepseek.com/v1/chat/completions"

class DeepSeekLLM:
    class ResponseWrapper:
        def __init__(self, text):
            self.text = text  # ✅ mimic old behavior

    def __init__(self, model_name="deepseek-chat"):
        self.model_name = model_name

    def complete(self, prompt: str):
        headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }

        data = {
            "model": self.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You are a senior software quality assistant. "
                        "Analyze the provided metrics and retrieved examples for code smells. "
                        "At the end, state:\n"
                        "Detected Code Smell: Yes or No\n"
                        "Smell Type: Long Method / Large Class / No Smell"
                    )
                },
                {"role": "user", "content": prompt}
            ]
        }

        response = requests.post(API_URL, headers=headers, data=json.dumps(data))
        response.raise_for_status()
        text = response.json()["choices"][0]["message"]["content"]
        return self.ResponseWrapper(text)


In [None]:
!pip install pymongo datasets
!pip install llama-index
!pip install llama-index-retrievers-bm25

In [None]:
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    StorageContext,
    SimpleKeywordTableIndex,
)

In [None]:
import pandas as pd
from llama_index.core.schema import Document

def load_dataset_for_prediction(path):
    df = pd.read_csv(path)
    docs = []

    for _, row in df.iterrows():
        text = f"Summary of metrics:\n{row['metrics_summary']}\n Explantaion:\n{row['smell_explanation']}"

        metadata = {
            "smell_type": row.get("smell_type", None),  # use for eval or filtering
            "language": row.get("language", ""),
            "effort": row.get("effort", None),
            "volume": row.get("volume", None)
        }

        docs.append(Document(text=text, metadata=metadata))

    return docs
long_method_docs = load_dataset_for_prediction("/content/Dataset_Long_Method_RAG_Index.csv")
large_class_docs = load_dataset_for_prediction("/content/Dataset_Large Classes_RAG_Index.csv")


In [None]:
# This is for Long method and we do the same for Large class
class LoggingVectorRetriever:
    def __init__(self, retriever):
        self.retriever = retriever

    def retrieve(self, query):
        print(f"🤖 [VectorRetriever] Activated for query: {query}")
        results = self.retriever.retrieve(query)

        for r in results:
            if hasattr(r, "node") and hasattr(r.node, "metadata"):
                r.node.metadata["retriever"] = "Vector"
            elif hasattr(r, "metadata"):
                r.metadata["retriever"] = "Vector"
            else:
                print(f"⚠️ Could not tag node of type: {type(r)}")

        return results


In [None]:
vector_index = VectorStoreIndex.from_documents(long_method_docs)
raw_vector = vector_index.as_retriever(similarity_top_k=5)
vector_retriever = LoggingVectorRetriever(raw_vector)


In [None]:
class LoggingBM25Retriever:
    def __init__(self, retriever):
        self.retriever = retriever

    def retrieve(self, query):
        print(f"🔍 [BM25Retriever] Activated for query: {query}")
        results = self.retriever.retrieve(query)

        for r in results:
            # If it's NodeWithScore → tag its node
            if hasattr(r, "node") and hasattr(r.node, "metadata"):
                r.node.metadata["retriever"] = "BM25"
            # If it's a direct TextNode (or Document)
            elif hasattr(r, "metadata"):
                r.metadata["retriever"] = "BM25"
            else:
                print(f"⚠️ Cannot tag node of type: {type(r)}")

        return results


In [None]:
from llama_index.retrievers.bm25 import BM25Retriever
import Stemmer

long_method_bm25_retriever = LoggingBM25Retriever(
    BM25Retriever.from_defaults(
        nodes=long_method_nodes,
        similarity_top_k=5,
        stemmer=Stemmer.Stemmer("english"),
        language="english"
    )
)


# CustomHybridRetriever (Reusable)

In [None]:
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.base.query_pipeline.query import QueryBundle
from llama_index.core.schema import NodeWithScore
from typing import List

class CustomHybridRetriever(BaseRetriever):
    def __init__(self, bm25, vector):
        super().__init__()
        self.bm25 = bm25
        self.vector = vector

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        print(f"\n📘 [HybridRetriever] Activated for query: {query_bundle.query_str}")

        # BM25 Retrieval (already logs and tags)
        bm25_results = self.bm25.retrieve(query_bundle.query_str)

        # Vector Retrieval (already logs and tags)
        vector_results = self.vector.retrieve(query_bundle.query_str)

        print(f"✅ [HybridRetriever] BM25 returned {len(bm25_results)} | Vector returned {len(vector_results)}\n")

        return bm25_results[:3] + vector_results[:3]


In [None]:
from llama_index.core.tools import RetrieverTool

# Wrap each retriever in a RetrieverTool
long_method_bm25_tool = RetrieverTool.from_defaults(
    retriever=long_method_bm25_retriever,
    description="BM25 for long method smells (keyword-based matching)"
)

long_method_vector_tool = RetrieverTool.from_defaults(
    retriever=vector_retriever,
    description="Vector retriever for semantic queries about long method smells"
)

long_method_hybrid_tool = RetrieverTool.from_defaults(
    retriever=CustomHybridRetriever(
        bm25=long_method_bm25_retriever,
        vector=vector_retriever
    ),
    description="Hybrid (BM25 + Vector) for long method smell explanation"
)


In [None]:
from llama_index.core.tools import RetrieverTool


In [None]:
from llama_index.core.retrievers import RouterRetriever
from llama_index.core.selectors import PydanticSingleSelector
from llama_index.core.tools import RetrieverTool

In [None]:
from llama_index.llms.openai import OpenAI

In [None]:
long_method_bm25_tool.name = "BM25"
long_method_vector_tool.name = "Vector"
long_method_hybrid_tool.name = "Hybrid"


In [None]:
from llama_index.core.selectors import PydanticSingleSelector
from llama_index.core.retrievers import RouterRetriever

selector = PydanticSingleSelector.from_defaults(llm=llm)

router = RouterRetriever(
    retriever_tools=[
        long_method_bm25_tool,
        long_method_vector_tool,
        long_method_hybrid_tool
    ],
    selector=selector
)


# RouterAgent with Logging

In [None]:
class RouterAgent:
    def __init__(self, long_method_router, large_class_router, llm):
        self.long_method_router = long_method_router
        self.large_class_router = large_class_router
        self.llm = llm

    def classify_smell_type(self, code_str, metrics_text, query):
        prompt = f"""
You are a code analysis expert. Based on the provided code, metrics, and the user's question, determine the most relevant code smell type.

---
Code:
{code_str}

---
Metrics:
{metrics_text}

---
Question:
{query}

Which type does it represent more clearly?
- Long Method
- Large Class

Only answer with one: Long Method or Large Class.
"""
        print("🧠 Invoking LLM to classify smell type...")
        response = self.llm.complete(prompt).text.strip()
        print(f"📌 LLM Response for Classification: {response}")
        return response.lower()

    def retrieve_context(self, code_str, metrics_text, query):
        smell_type = self.classify_smell_type(code_str, metrics_text, query)

        if "long" in smell_type:
            print("🔍 Routing to Long Method Retriever...")
            nodes = self.long_method_router.retrieve(query)
        elif "class" in smell_type:
            print("🔍 Routing to Large Class Retriever...")
            nodes = self.large_class_router.retrieve(query)
        else:
            print("⚠️ Smell Type not recognized")

        print(f"✅ Retrieved {len(nodes)} nodes.")
        for i, node in enumerate(nodes):
            source = node.node.metadata.get("retriever", "Unknown") if hasattr(node, "node") else node.metadata.get("retriever", "Unknown")
            text_snippet = node.node.text if hasattr(node, "node") else node.text
            print(f"   🔗 Node {i+1}: Source={source}, Text Snippet={text_snippet[:60]}...")
        return nodes, smell_type


In [None]:
router_agent = RouterAgent(
    long_method_router=router,       # Your Long Method router
    large_class_router=router_lc,    # Your Large Class router
    llm=llm                          # Your OpenAI or other LLM instance
)


In [None]:
import pandas as pd
import re
import time


csv_file = ""
df = pd.read_csv(csv_file).reset_index(drop=True)

results = []
user_query = "Does the code present any code smell and if so what is it Long method or Large Class?"

start_time = time.time()
batch_start_time = start_time  # for tracking time per batch

output_file = "DeepSeek_Full_Ablation_Results.csv"

print(f"✅ Starting Ablation Study on {len(df)} rows...\n")

for idx, row in df.iterrows():
    row_start = time.time()

    console_log = []
    log = lambda msg: (print(msg), console_log.append(msg))

    log(f"🖥️ Processing Row {idx+1}/{len(df)} | Unique ID: {row['unique_id']}")
    log("🧠 Invoking LLM to classify smell type...")

    code_str = row['metrics_summary']

    # ✅ No retrieval for ablation
    retrieved_nodes = []
    log("\n✅ Retrieved Nodes: None (Ablation Study)")
    retrieved_docs_log = []

    llm = DeepSeekLLM(model_name="deepseek-chat")
    final_answer = detect_code_smell_with_llm(
        code_str=code_str,
        retrieved_nodes=retrieved_nodes,
        llm=llm
    )

    log("\n🧪 Final LLM Answer:")
    log(final_answer)

    # ✅ Extract smell classification
    detected_smell_match = re.search(r"Smell Type:\s*(.*)", final_answer)
    detected_smell = detected_smell_match.group(1).strip() if detected_smell_match else "Not Parsed"

    classification = detected_smell
    routing = (
        "Long Method Retriever" if "Long Method" in classification
        else "Large Class Retriever" if "Large Class" in classification
        else "No Retriever (No Smell)" if "No Smell" in classification
        else "Unknown"
    )

    log(f"📌 LLM Response for Classification: {classification}")
    log(f"🔍 Routing to {routing}...")

    row_time = time.time() - row_start
    log(f"⏳ Time taken for this row: {row_time:.2f} seconds")

    # ✅ Append to results
    results.append({
        "Unique ID": row['unique_id'],
        "User Query": user_query,
        "Console Log": "\n".join(console_log),
        "LLM Response for Classification": classification,
        "Routing to Retriever": routing,
        "Retrieved Docs": "\n".join(retrieved_docs_log),
        "Full LLM Response": final_answer,
        "Detected Code Smell by LLM": detected_smell,
        "Ground Truth": row['smell_type'],
        "Row Processing Time (s)": round(row_time, 2)
    })

    # ✅ Save progress every 100 rows
    if (idx + 1) % 100 == 0:
        pd.DataFrame(results).to_csv(output_file, index=False)
        batch_time = time.time() - batch_start_time
        print(f"\n✅ Saved {idx+1} rows to {output_file} (Batch Time: {batch_time/60:.2f} minutes)")
        batch_start_time = time.time()

# ✅ Final save
pd.DataFrame(results).to_csv(output_file, index=False)
total_time = time.time() - start_time
print(f"\n✅ All {len(df)} rows processed in {total_time/60:.2f} minutes.")
print(f"✅ Final results saved to {output_file}")


In [None]:
import pandas as pd
from sklearn.metrics import classification_report, accuracy_score

# ✅ Load your saved results
df = pd.read_csv("/content/DeepSeek_Full_Ablation_Results.csv")

# ✅ Normalize labels (just in case of spacing issues)
df["Detected Code Smell by LLM"] = df["Detected Code Smell by LLM"].str.strip()
df["Ground Truth"] = df["Ground Truth"].str.strip()

# ✅ Filter out rows where detection failed (optional)
valid_df = df[df["Detected Code Smell by LLM"] != "Not Parsed"]

# ✅ Evaluation
y_true = valid_df["Ground Truth"]
y_pred = valid_df["Detected Code Smell by LLM"]

print("✅ Accuracy:", accuracy_score(y_true, y_pred))
print("\n✅ Detailed Classification Report:\n")
print(classification_report(y_true, y_pred, zero_division=0))


In [None]:
import pandas as pd
import re
import time

# ✅ Load the full CSV
csv_file = "/content/Merged_Test_Set_LargeClass_LongMethod.csv"
df = pd.read_csv(csv_file)

results = []
user_query = "Does the code present any code smell and if so what is it Long method or Large Class?"

start_time = time.time()

for idx, row in df.iterrows():
    row_start = time.time()

    # ✅ Capture console log for each row
    console_log = []
    log = lambda msg: (print(msg), console_log.append(msg))

    log(f"\n🖥️ Processing Row {idx+1}/{len(df)} | Unique ID: {row['unique_id']}")
    log("🧠 Invoking LLM to classify smell type...")

    code_str = row['metrics_summary']

    # ✅ Retrieve relevant nodes
    retrieved_nodes = router_agent.retrieve_context(
        code_str=code_str,
        metrics_text=row['metrics_summary'],
        query=user_query
    )

    log("\n✅ Retrieved Nodes:")
    retrieved_docs_log = []
    for i, node in enumerate(retrieved_nodes, 1):
        snippet = node.node.text if hasattr(node, "node") else node.text
        log_line = f"🔗 Node {i}: {snippet[:120]}..."
        log(log_line)
        retrieved_docs_log.append(log_line)

    # ✅ Run detection with DeepSeek
    llm = DeepSeekLLM(model_name="deepseek-chat")
    final_answer = detect_code_smell_with_llm(
        code_str=code_str,
        retrieved_nodes=retrieved_nodes,
        llm=llm
    )

    log("\n🧪 Final LLM Answer:")
    log(final_answer)

    # ✅ Extract detected smell
    detected_smell_match = re.search(r"Smell Type:\s*(.*)", final_answer)
    detected_smell = detected_smell_match.group(1).strip() if detected_smell_match else "Not Parsed"

    classification = detected_smell
    routing = (
        "Long Method Retriever" if "Long Method" in classification
        else "Large Class Retriever" if "Large Class" in classification
        else "No Retriever (No Smell)" if "No Smell" in classification
        else "Unknown"
    )

    log(f"📌 LLM Response for Classification: {classification}")
    log(f"🔍 Routing to {routing}...")

    row_time = time.time() - row_start
    log(f"⏳ Time taken for this row: {row_time:.2f} seconds")

    # ✅ Append to results
    results.append({
        "Unique ID": row['unique_id'],
        "User Query": user_query,
        "Console Log": "\n".join(console_log),
        "LLM Response for Classification": classification,
        "Routing to Retriever": routing,
        "Retrieved Docs": "\n".join(retrieved_docs_log),
        "Full LLM Response": final_answer,
        "Detected Code Smell by LLM": detected_smell,
        "Ground Truth": row['smell_type'],
        "Row Processing Time (s)": round(row_time, 2)
    })

# ✅ Save full results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("Full_Size_DeepSeek_Test_Results.csv", index=False)

total_time = time.time() - start_time
print(f"\n✅ All {len(df)} rows processed in {total_time/60:.2f} minutes.")
print("✅ Results saved to DeepSeek_Full_Test_Results.csv")


In [None]:
import pandas as pd
from sklearn.metrics import classification_report, accuracy_score

# ✅ Load your saved results
df = pd.read_csv("/content/Full_Size_DeepSeek_Test_Results.csv")

# ✅ Normalize labels (just in case of spacing issues)
df["Detected Code Smell by LLM"] = df["Detected Code Smell by LLM"].str.strip()
df["Ground Truth"] = df["Ground Truth"].str.strip()

# ✅ Filter out rows where detection failed (optional)
valid_df = df[df["Detected Code Smell by LLM"] != "Not Parsed"]

# ✅ Evaluation
y_true = valid_df["Ground Truth"]
y_pred = valid_df["Detected Code Smell by LLM"]

print("✅ Accuracy:", accuracy_score(y_true, y_pred))
print("\n✅ Detailed Classification Report:\n")
print(classification_report(y_true, y_pred, zero_division=0))


In [None]:
!pip install radon


In [None]:
!pip install lizard


In [None]:
import lizard
from radon.raw import analyze
from radon.metrics import h_visit
from textwrap import dedent
import tempfile

def code_to_metrics(code_str):
    code_str = dedent(code_str)

    # --- 1. Raw Metrics ---
    raw = analyze(code_str)
    loc = raw.loc
    comments = raw.comments
    multi = raw.multi
    blank = raw.blank
    sloc = raw.sloc
    lloc = raw.lloc

    # Derived comment ratios
    c_per_l = round((comments / loc) * 100, 2) if loc else 0
    c_per_s = round((comments / sloc) * 100, 2) if sloc else 0
    c_plus_m_per_l = round(((comments + multi) / loc) * 100, 2) if loc else 0

    # --- 2. Halstead Metrics (wrap in function) ---
    wrapped_code = f"def _wrapped_fn():\n" + "\n".join("    " + line for line in code_str.splitlines())
    try:
        h_result = h_visit(wrapped_code)
        h = h_result[0] if h_result else None
    except Exception:
        h = None

    halstead_metrics = {
        "volume": getattr(h, "volume", 0),
        "effort": getattr(h, "effort", 0),
        "time": getattr(h, "time", 0),
        "bugs": getattr(h, "bugs", 0),
        "η1": getattr(h, "h1", 0),  # distinct operators
        "η2": getattr(h, "h2", 0),  # distinct operands
        "N1": getattr(h, "N1", 0),  # total operators
        "N2": getattr(h, "N2", 0)   # total operands
    }

    # --- 3. Lizard Metrics ---
    with tempfile.NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f:
        f.write(code_str)
        temp_path = f.name

    lizard_result = lizard.analyze_file(temp_path)
    lizard_metrics = []
    for func in lizard_result.function_list:
        lizard_metrics.append({
            "function_name": func.name,
            "nloc": func.nloc,
            "cyclomatic_complexity": func.cyclomatic_complexity,
            "token_count": func.token_count,
            "parameter_count": func.parameter_count,
            "length": func.length,
        })

    # Return final result
    return {
        "raw": {
            "loc": loc,
            "sloc": sloc,
            "lloc": lloc,
            "comments": comments,
            "multi": multi,
            "blank": blank,
            "comment_ratio_LOC": c_per_l,
            "comment_ratio_SLOC": c_per_s,
            "comment+multi_ratio_LOC": c_plus_m_per_l,
        },
        "halstead": halstead_metrics,
        "lizard": lizard_metrics
    }


In [None]:

def detect_code_smell_with_llm(code_str, retrieved_nodes, llm, smell_type=" "):
    user_metrics = extract_metrics_from_summary(code_str)

    if smell_type == "Large Class":
        filtered_examples = filter_retrieved_examples(user_metrics, "Large Class", kb_large)
    elif smell_type == "Long Method":
        filtered_examples = filter_retrieved_examples(user_metrics, "Long Method", kb_long)
    else:
        context = "\n\n".join([
            node.node.text if hasattr(node, "node") else node.text
            for node in retrieved_nodes
        ])
        filtered_examples = None

    if filtered_examples is not None and not filtered_examples.empty:
        context = "\n\n".join(filtered_examples["metrics_summary"].tolist())
    else:
        context = "\n\n".join([
            node.node.text if hasattr(node, "node") else node.text
            for node in retrieved_nodes
        ])

    # ✅ Prompt
    prompt = f"""
You are a senior software quality expert.

IMPORTANT:
1. You MUST analyze the given metrics FIRST.
2. Retrieved examples are ONLY for comparison.
3. DO NOT reinterpret or modify the given metrics.

---
### Code Metrics to Analyze:
{code_str}

---
### Retrieved Examples for Comparison:
{context}

---
At the end of your response, write clearly:
Detected Code Smell: Yes or No
Smell Type: Long Method / Large Class / No Smell
"""

    response = llm.complete(prompt)
    return response.text
