In [None]:
import pandas as pd

# Specify the path to your CSV file
file_path = 'train_file.csv'

# Read the CSV file using pandas, specifying the delimiter as ';'
train_df = pd.read_csv(file_path, delimiter=';')

# Specify the path to your CSV file
file_path = 'test_file.csv'

# Read the CSV file using pandas, specifying the delimiter as ';'
test_df = pd.read_csv(file_path, delimiter=';')

In [None]:
import transformers
from transformers import AutoTokenizer
import torch

base_prompt = "<s>[INST]\n<<SYS>>\n{system_prompt}\n<</SYS>>\n\n{user_prompt}[/INST]"

access_token = "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"

tokenizer = None
pipeline = None
# Uncomment to download and run
model = "meta-llama/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model, token=access_token)
pipeline = transformers.pipeline(
    "text-generation",
    model=model,
    torch_dtype=torch.float32,
    device_map="auto",
    token=access_token,
)

In [None]:
def Format(input):
    if input.find("negative") != -1:
        return "negative"

    elif input.find("neutral") != -1:
        return "neutral"

    elif input.find("positive") != -1:
        return "positive"

    return "Error"

def Generate_Few_Shot_llama (train_df, test_df, shot_num, retrieval_technique):

    true_labels = []
    predicted_labels = []
    misclassifications = []

    for idx, testrow in test_df.iterrows():

        user_prompt = ""
        
        event = "A chat between a user and an artificial intelligence assistant. The assistant gives one word reply to the user's question."
        task = "Given the sentence from GitHub, perform sentiment classification task. Only return 'negative', 'neutral' or 'positive'"
        guess = testrow[2]

        retrieved_examples = retrieval_technique(guess, shot_num)

        for i in retrieved_examples:
            user_prompt += "Example Sentence: "+ train_df['Text'][i] + "Label: " +train_df['Polarity'][i]
        
        prompt = f"<s>[INST] <<SYS>>\n{event}\n<</SYS>>\n{task}\n{user_prompt}[/INST]\nSure!\n</s><s>[INST]\nSentence: {guess}\nLabel:\n[/INST]\n"

        # Generate the natural status description using Llama 2
        generated_text = pipeline(prompt, do_sample=True, max_new_tokens=20, num_return_sequences=1)[0]['generated_text']

        output = generated_text.strip()

        # Find the index of "Label:"
        index_label = output.rfind("]")

        # Extract substring after "Label:"
        substring_after_label = output[index_label + 1:].strip()

        predicted_label = Format(substring_after_label.lower())
        true_label = testrow[1]
        base = []

        if predicted_label != true_label:
            base.append(guess)
            base.append(true_label)
            base.append(predicted_label)
            misclassifications.append(base)

        predicted_labels.append(predicted_label)
        true_labels.append(true_label)

        if idx % 50 == 0:
            print(idx)
    
    return true_labels, predicted_labels, misclassifications

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, f1_score

def Result (true_labels_pre, predicted_labels_pre):

    true_labels = true_labels_pre
    predicted_labels = predicted_labels_pre

    for idx, label in enumerate(predicted_labels):
        if label == "Error":
            del predicted_labels[idx]
            del true_labels[idx]

    conf_matrix = confusion_matrix(true_labels, predicted_labels)

    print("Confusion Matrix:")
    print(conf_matrix)

    class_rep = classification_report(true_labels, predicted_labels, zero_division=0)

    print("\nClassification Report:")
    print(class_rep)

    # Calculate the F1-score
    
    f1_macro = f1_score(true_labels, predicted_labels, average='macro')

    print(f"F1-score macro: {f1_macro:.2f}")

    
    f1_micro = f1_score(true_labels, predicted_labels, average='micro')

    print(f"F1-score micro: {f1_micro:.2f}")

In [None]:
from transformers import DPRQuestionEncoder, DPRQuestionEncoderTokenizer, DPRContextEncoder, DPRContextEncoderTokenizer
from sentence_transformers import SentenceTransformer, util
import faiss

# Load DPR question and context encoders
question_encoder = DPRQuestionEncoder.from_pretrained('facebook/dpr-question_encoder-single-nq-base')
question_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained('facebook/dpr-question_encoder-single-nq-base')
context_encoder = DPRContextEncoder.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base')
context_tokenizer = DPRContextEncoderTokenizer.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base')

# Encode the corpus using DPR context encoder
corpus_embeddings = []
for passage in train_df['Text']:
    inputs = context_tokenizer(passage, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        embeddings = context_encoder(**inputs).pooler_output
    corpus_embeddings.append(embeddings.squeeze().numpy())

corpus_embeddings = torch.tensor(corpus_embeddings)

index = faiss.IndexFlatIP(corpus_embeddings.shape[1])  # Using inner product (dot product) for similarity
index.add(corpus_embeddings.numpy())


In [None]:
def retrieve_relevant_examples(query, k=3):
    # Encode the query using DPR question encoder
    query_inputs = question_tokenizer(query, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        query_embedding = question_encoder(**query_inputs).pooler_output

    for i in range(1,5000):

        if k == 3:
            pos = 1
            neg = 1
            neu = 1
        
            # Perform similarity search using FAISS
            D, I = index.search(query_embedding.numpy(), k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 3:
                break

        if k==5:
            pos = 2
            neg = 2
            neu = 2
        
            # Perform similarity search using FAISS
            D, I = index.search(query_embedding.numpy(), k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 5:
                break

            if len(result) == 6:
                result.pop()
                break


    return result

# Example query
query = "Wow this code is great"
retrieved_examples = retrieve_relevant_examples(query, 5)
print("Retrieved Examples:")
for i in retrieved_examples:
    print()
    print(train_df['Polarity'][i])
    print(train_df['Text'][i])
    

In [None]:
#For 3 shot
true_labels_DPR, predicted_labels_DPR, misclassifications_DPR = Generate_Few_Shot_llama(train_df, test_df, 3, retrieve_relevant_examples)

In [None]:
Result(true_labels_DPR, predicted_labels_DPR)

In [None]:
#For 5 shot
true_labels_DPR_5, predicted_labels_DPR_5, misclassifications_DPR_5 = Generate_Few_Shot_llama(train_df, test_df, 5, retrieve_relevant_examples)

In [None]:
Result(true_labels_DPR_5, predicted_labels_DPR_5)

In [None]:
# Load the sentence transformer model for creating embeddings
model_faiss = SentenceTransformer('all-MiniLM-L6-v2')

# Encode the corpus using the sentence transformer model
corpus_embeddings_faiss = model_faiss.encode(train_df['Text'])

# Use FAISS for efficient similarity search
index_faiss = faiss.IndexFlatL2(corpus_embeddings_faiss.shape[1])  # Using L2 (Euclidean) distance
index_faiss.add(corpus_embeddings_faiss)

def retrieve_relevant_examples_faiss(query, k=3):
    # Encode the query
    query_embedding = model_faiss.encode([query])

    for i in range(1,5000):

        if k == 3:
            pos = 1
            neg = 1
            neu = 1
        
            # Perform similarity search using FAISS
            D, I = index_faiss.search(query_embedding, k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 3:
                break

        if k==5:
            pos = 2
            neg = 2
            neu = 2
        
            # Perform similarity search using FAISS
            D, I = index_faiss.search(query_embedding, k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 5:
                break

            if len(result) == 6:
                result.pop()
                break


    return result

# Example query
query = "You should change the for loop in lin 54"
retrieved_examples_faiss = retrieve_relevant_examples_faiss(query, 5)
print("Retrieved Examples:")
for i in retrieved_examples_faiss:
    print()
    print(train_df['Polarity'][i])
    print(train_df['Text'][i])


In [None]:
#For 3 shot
true_labels_faiss_3, predicted_labels_faiss_3, misclassifications_faiss_3 = Generate_Few_Shot_llama(train_df, test_df, 3, retrieve_relevant_examples_faiss)

In [None]:
Result(true_labels_faiss_3, predicted_labels_faiss_3)

In [None]:
#For 3 shot
true_labels_faiss_5, predicted_labels_faiss_5, misclassifications_faiss_5 = Generate_Few_Shot_llama(train_df, test_df, 5, retrieve_relevant_examples_faiss)

In [None]:
Result(true_labels_faiss_5, predicted_labels_faiss_5)

In [None]:
from transformers import AutoModelForSequenceClassification
from sklearn.neighbors import NearestNeighbors

# Load SBERT model for creating embeddings
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')

# Encode the corpus using the sentence transformer model
corpus_embeddings_nn = sbert_model.encode(train_df['Text'])

neigh = NearestNeighbors(n_neighbors=5, metric='manhattan')
neigh.fit(corpus_embeddings_nn)

def retrieve_relevant_examples_nn(query, k=3):
    # Encode the query using SBERT model
    query_embedding = sbert_model.encode([query])

    for i in range(1,5000):

        if k == 3:
            pos = 1
            neg = 1
            neu = 1
        
            # Perform similarity search using Manhattan
            D, I = neigh.kneighbors(query_embedding, n_neighbors=k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 3:
                break

        if k==5:
            pos = 2
            neg = 2
            neu = 2
        
            # Perform similarity search using Manhattan
            D, I = neigh.kneighbors(query_embedding, n_neighbors=k*i)

            result = []

            for idx in I[0]:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 5:
                break

            if len(result) == 6:
                result.pop()
                break


    return result

# Example query
query = "You should change the for loop in lin 54"
retrieved_examples_nn = retrieve_relevant_examples_nn(query, 5)
print("Retrieved Examples:")
for i in retrieved_examples_nn:
    print()
    print(train_df['Polarity'][i])
    print(train_df['Text'][i])

In [None]:
#For 3 shot
true_labels_nn_3, predicted_labels_nn_3, misclassifications_nn_3 = Generate_Few_Shot_llama(train_df, test_df, 3, retrieve_relevant_examples_nn)

In [None]:
Result(true_labels_nn_3, predicted_labels_nn_3)

In [None]:
#For 5 shot
true_labels_nn_5, predicted_labels_nn_5, misclassifications_nn_5 = Generate_Few_Shot_llama(train_df, test_df, 5, retrieve_relevant_examples_nn)

In [None]:
Result(true_labels_nn_5, predicted_labels_nn_5)

In [None]:
from rank_bm25 import BM25Okapi

# Tokenize the corpus
tokenized_corpus = [doc.split(" ") for doc in train_df['Text']]

# Initialize BM25
bm25 = BM25Okapi(tokenized_corpus)

def retrieve_relevant_examples_bm25(query, k=3):
    # Tokenize the query
    tokenized_query = query.split(" ")
    
    # Get BM25 scores
    scores = bm25.get_scores(tokenized_query)
    
    for i in range(1,5000):

        if k == 3:
            pos = 1
            neg = 1
            neu = 1

            last_num = k*i
        
            # Perform similarity search using Manhattan
            I = scores.argsort()[-last_num:][::-1]

            result = []

            for idx in I:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 3:
                break

        if k==5:
            pos = 2
            neg = 2
            neu = 2

            last_num = k*i
        
            # Perform similarity search using Manhattan
            I = scores.argsort()[-last_num:][::-1]

            result = []

            for idx in I:
                polarity = train_df['Polarity'][idx]
                if polarity == 'positive' and pos!=0:
                    result.append(idx)
                    pos -= 1
                elif polarity == 'negative' and neg!=0:
                    result.append(idx)
                    neg -= 1
                elif polarity == 'neutral' and neu!=0:
                    result.append(idx)
                    neu -= 1
        
            if len(result) == 5:
                break

            if len(result) == 6:
                result.pop()
                break

    return result


# Example query
query = "You should change the for loop in lin 54"
retrieved_examples_bm25 = retrieve_relevant_examples_bm25(query, 5)
print("Retrieved Examples:")
for i in retrieved_examples_bm25:
    print()
    print(train_df['Polarity'][i])
    print(train_df['Text'][i])


In [None]:
#For 3 shot
true_labels_bm25, predicted_labels_bm25, misclassifications_bm25 = Generate_Few_Shot_llama(train_df, test_df, 3, retrieve_relevant_examples_bm25)

In [None]:
Result(true_labels_bm25, predicted_labels_bm25)

In [None]:
#For 3 shot
true_labels_bm25_5, predicted_labels_bm25_5, misclassifications_bm25_5 = Generate_Few_Shot_llama(train_df, test_df, 5, retrieve_relevant_examples_bm25)

In [None]:
Result(true_labels_bm25_5, predicted_labels_bm25_5)