In [None]:
!pip install openai;
!pip install anthropic;
!pip install chromadb;
!pip install transformers;
!pip install huggingface_hub;
!pip install --upgrade huggingface_hub;

In [None]:
import numpy as np
import pandas as pd
import os
import re
import anthropic
import openai
import sklearn
import torch
import chromadb
import json
from collections import defaultdict

## All LLM Setup

In [None]:
from huggingface_hub import notebook_login
notebook_login()
token = #huggingface token

In [None]:
import openai
import os
from openai import OpenAI

openai_key =  #your key
openai.api_key = openai_key
os.environ["OPENAI_API_KEY"] = openai_key


def query_gpt_chat(api_key, messages, model="gpt-3.5-turbo-0125", temperature=0.2,max_tokens =1024):
    openai.api_key = api_key
    client = OpenAI(
    api_key=api_key,
    )

    response = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens =max_tokens,
        temperature = temperature
    )
    return response.choices[0].message.content

In [None]:
import re
deepseek_key = #your deepseek key
os.environ["deepseek_key"] = deepseek_key
def query_deepseek(api_key, messages, model="deepseek-ai/deepseek-r1",temperature=0.2,max_tokens =1024):
    
    client = OpenAI(
        api_key=api_key,
        base_url= "https://integrate.api.nvidia.com/v1",
        )
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens =max_tokens,
        temperature = temperature,
        stream=False
    )
    response_text = response.choices[0].message.content
    return re.sub(r"<think>.*?</think>\n*", "", response_text, flags=re.DOTALL).strip()

In [None]:
ANTHROPIC_API_KEY= #your key
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY

def query_claude(api_key, messages_system, messages_user, model="claude-3-5-sonnet-20241022",temperature=0.2,max_tokens =1024):
    client = anthropic.Anthropic(api_key=api_key)
    message = client.messages.create(
        model=model,
        max_tokens=max_tokens,
        temperature=temperature,
        system=messages_system,
        messages=messages_user
    )
    return message.content[0].text 

In [None]:
import time

def run_llm(text,system=None,temperature=0.2):
    api_key = os.getenv("OPENAI_API_KEY")
    sys_prompt = system 
    msg= [
        {"role": "system", "content": sys_prompt},
        {"role": "user", "content": text}
    ]
    return query_gpt_chat(api_key, msg,temperature=temperature)

    #deepseek
    # api_key = os.getenv("deepseek_key")
    # return query_deepseek(api_key, msg) 
           
    
    #CLAUD
    # api_key_antropic = os.getenv("ANTHROPIC_API_KEY")
    # return query_claude( api_key=api_key_antropic, messages_system=sys_prompt, messages_user=[{"role": "user", "content": text}])

## Embedding (encder) setup

In [None]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.metrics import classification_report
from transformers import AutoTokenizer, AutoModel
import torch
from chromadb.config import Settings
import chromadb


MODEL_NAME = "microsoft/codebert-base"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()



MASK_TOKEN = "<mask>"
SEPARATOR_TOKEN = "<sep>"
PAD_TOKEN = "<pad>"
CLS_TOKEN = "<cls>"

def prepare_tokenizer(tokenizer):
    tokenizer.add_special_tokens({"pad_token": PAD_TOKEN})
    tokenizer.add_special_tokens({"sep_token": SEPARATOR_TOKEN})
    tokenizer.add_special_tokens({"cls_token": CLS_TOKEN})
    tokenizer.add_special_tokens({"mask_token": MASK_TOKEN})
    return tokenizer

tokenizer = prepare_tokenizer(tokenizer)
def get_embeddings(texts):
    inputs = tokenizer(texts.tolist(), padding=True, truncation=True, return_tensors="pt").to(device)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.cpu().numpy()

In [None]:
def encode_text(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) 
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs

def save_embeddings(embeddings, file_path):
    np.save(file_path, embeddings)

## Nearest Sample Search

In [None]:
def find_nearest_samples(test_embedding, collection, n_neighbors=20):
    query_results = collection.query(query_embeddings=[test_embedding], n_results=n_neighbors)
    
    documents = query_results['documents']
    metadatas = query_results['metadatas']
    
    nearest_by_class = {}

    for doc, meta, distance in zip(documents[0], metadatas[0], query_results['distances'][0]):
        bug_type = meta['bug_type']
        if bug_type not in nearest_by_class:
            nearest_by_class[bug_type] = {'distance': distance, 'document': doc, 'metadata': meta}
        elif distance < nearest_by_class[bug_type]['distance']:
            nearest_by_class[bug_type] = {'distance': distance, 'document': doc, 'metadata': meta}
    
    nearest_documents = [entry['document'] for entry in nearest_by_class.values()]
    nearest_metadatas = [entry['metadata'] for entry in nearest_by_class.values()]
    return nearest_documents, nearest_metadatas

In [None]:
train_df = pd.read_csv("train_data_path.csv")  # Replace with your actual train data path
test_df = pd.read_csv("test_data_path.csv")

# use this function for summarizing training posts. Stage 2
def summary_with_cot(post, answer, label, label_description):
    prompt = f"Here is the stackoverflow post below. \n\n <POST>{post}</POST>. \n\n <Answer> {answer} </Answer> \n and <Class lable> {label} </Class label> and \n <Lable Description> {label_description} </Lable Description>"
    system_prompt= "You are an expert in summarization and reasoning. You are given a Stack Overflow post containing an <Answer> and its associated <Label> along with the <Label Description> presenting what it means. Your task is to: Concisely summarize the post, preserving all critical information. Explain why the post has been assigned the given label, providing a clear chain of thought and reasoning. Ensure the response is structured as a short paragraph that integrates the summary, the solution, and the reasoning seamlessly. Avoid phrases like `The summary is' and focus on providing a cohesive and informative output."
    return run_llm(prompt, system_prompt,1.0)


cls = 'bug_type'  # Replace with your actual class label column name. 

X_train= train_df['post']
X_train_summary = train_df['summary_with_cot'] #from stage 2. Already saved to save computation
y_train = train_df[cls]

X_test = test_df['post']
y_test = test_df[cls]

In [None]:
def concept_distillation(post, label):
    prompt = f"Here is the stackoverflow post below. \n\n <POST>{post}</POST>. \n\n <Lable> {label} </label>"
    concept_distillation_prompt= "You are an expert in summarization. You are given a Stack Overflow post with the <Label> of the <Post>. Now return the characteristic of the post in very short (1/2 sentences) by observing the label of the post. The return output may look like: 'ValueError: class_weight in Keras fails for multi-label binary classification'"
    return run_llm(prompt, concept_distillation_prompt,1.0)



def consolidation(text):
    prompt = f"Here is the characteristics below. \n\n <Text>{text}</Text>."
    consolidate_prompt = "You are an expert in summarization. You are given some characteristics of a particular class. You have to observe the characteristics and return a summary of them by removing redundancy and preserving the uniqueness. It should not be very large nor small. Just return the summary. Do not start with `the summary is' or `summary:' type words."
    return run_llm(prompt, consolidate_prompt,1.0)




In [None]:
# Concept distillation and consolidation for training data (stage 1)

result_dict = {}
for bug_type, group in train_df.groupby(cls):
    result_dict[bug_type] = [concept_distillation(row_text, bug_type) for row_text in group['text']]
result_dict_summary = {}
for class_ in result_dict.keys():
    result_dict_summary[class_]=consolidation("\n".join(result_dict[class_]),class_)
result_dict_summary

In [None]:
embeddings = get_embeddings(X_train)

## Storing data in Chromadb

In [None]:
client_chroma = chromadb.Client()
collection_name = "bug_db"
collection = client_chroma.get_or_create_collection(name=collection_name)
import math


for idx, (embedding, text, label,summary) in enumerate(zip(embeddings, X_train, y_train, X_train_summary)):

    collection.add(
        embeddings = [embedding.tolist()],
        documents=text,
        metadatas=[{'bug_type': label, 'summay':summary}],
        ids=[f"doc_{idx}"]  
    )


In [None]:
cls_list = list(y_train.unique())
cls_description = result_dict_summary #from stage 1
system_prompt =  f"""You are a one-shot classifier that categorizes Stack Overflow posts. You have to return one value from this list only: {cls_list}. Based on the bug type of the test sample, return only ONE VALUE FROM THE LIST. The characteristics of each class are provided below:"""

for class_name in cls_list:

    system_prompt = system_prompt +"\n"+class_name+": "+cls_description[class_name]
    print(len(cls_description[class_name]))

    
print(system_prompt)

## Classification

In [None]:
def classify_with_llm(test_sample, nearest_samples,system_prompt):
    prompt = ""
    for neighbor_text, neighbor_metadata in zip(nearest_samples[0], nearest_samples[1]):
        neighbor_label = neighbor_metadata['bug_type']

        neighbor_summary = neighbor_metadata['summay']
        prompt += f"Example Post: {neighbor_summary}. <class>: {neighbor_label}\n"

        
    prompt += f"\nTest Sample: {test_sample}. <class>?:"
    
    
    result = run_llm(prompt,system_prompt)
    return result

In [None]:
def simple_summary(post):
    system_prompt="You are given a Stack Overflow post. You have to summarize the post. You may illustrate the error in detail. Do not provide any solution to this problem. The overall output should be short. Just return the summary without extra words."
    client_prompt = f"<Post> {post} </Post>"
    return run_llm(client_prompt,system_prompt)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, f1_score, matthews_corrcoef, confusion_matrix
import numpy as np

# true_labels = y_test.tolist()
predictions_bt = []
not_found_count = 0
not_founds = [ ]
for i in range(len(X_test)):#
    test_embedding = get_embeddings(pd.Series([X_test.iloc[i]]))[0]
    nearest_documents, nearest_metadatas = find_nearest_samples(test_embedding, collection)
    X_test_summary = simple_summary(X_test.iloc[i]]) #
    classification = classify_with_llm(X_test_summary, [nearest_documents, nearest_metadatas],system_prompt)
    
    class_found = False
    for class_name in y_train.unique():
        if class_name.lower().strip() in classification.lower().strip():
            predictions_bt.append(class_name)
            print(f"{i}, pred: {class_name}")
            class_found = True
            break
    
    if not class_found:
        print(f"not found: {classification}")
        predictions_bt.append(classification)
        not_founds.append(i)
        not_found_count += 1

In [None]:
#execute this code only if LLM fails to make a prediction from the class list

not_founds_2=[]
for i in not_founds:
    test_embedding = get_embeddings(pd.Series([X_test.iloc[i]]))[0]
    nearest_documents, nearest_metadatas = find_nearest_samples(test_embedding, collection)
    classification = classify_with_llm(X_test_summary.iloc[i], [nearest_documents, nearest_metadatas],system_prompt)
    
    class_found = False
    for class_name in y_test.unique():
        if class_name.strip().lower() in classification.strip().lower():
            predictions_bt[i] =  class_name
            print(f"{i}, class: {class_name}\n\n")
            class_found = True
            break
    
    if not class_found:
        print(f"not found: {classification}")
        not_founds_2.append(i)
        not_found_count += 1