In [2]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from dotenv import load_dotenv
from sklearn.metrics import normalized_mutual_info_score
from huggingface_hub import login
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import asyncio
import tensorflow as tf
import torch
load_dotenv()
LLAMA = os.getenv("LLAMA")
login(token=LLAMA)

In [3]:
#Laptop Path
#source_path_for_bbc = r"C:\Users\elias\Documents\NLP and speech processing"
#Desktop Path
source_path_for_bbc = r"C:\Users\super161\Documents\NLP and speech processing"


def process_folders(folder_path):
    
    documents_dict = {}
    document_list = []
    for folder_name in os.listdir(folder_path):
        folder_full_path = os.path.join(folder_path, folder_name)
        documents_dict[folder_name] = []
        
        for document_name in os.listdir(folder_full_path):
            document_full_path = os.path.join(folder_full_path, document_name)
            
            # Open and read the content of each document
            with open(document_full_path, 'r', encoding='utf-8') as file:
                document_content = file.read()
            document_list.append((folder_name, document_content, f"bbc_{folder_name}_{document_name}"))
        
    bbc_news = pd.DataFrame(document_list, columns = ['category', 'content', 'name'])

    train, test = train_test_split(bbc_news, test_size=0.2)

    train = train.sort_values(by='category')
    train_first_doc = train.groupby('category').first().reset_index()

    train_first_doc['div'] = 'train'
    test['div'] = 'test'


    return test, train_first_doc

#Data Structure: Topic, Article Text

test_df, instruction_df = process_folders(source_path_for_bbc)
print(test_df.head())


      category                                            content  \
1509     sport  FA probes crowd trouble\n\nThe FA is to take a...   
1801     sport  Safin cool on Wimbledon\n\nNewly-crowned Austr...   
136   business  Bank set to leave rates on hold\n\nUK interest...   
1378     sport  Britain boosted by Holmes double\n\nAthletics ...   
2183      tech  Confusion over high-definition TV\n\nNow that ...   

                      name   div  
1509     bbc_sport_197.txt  test  
1801     bbc_sport_489.txt  test  
136   bbc_business_137.txt  test  
1378     bbc_sport_066.txt  test  
2183      bbc_tech_360.txt  test  


In [4]:
def make_prompts(bbc_instructions, bbc_data):
    prompts = []
    
    # General instructions and fixed texts
    general_instruction = (
        "You are a perfect topic modeling machine. Given a text and the different topics, "
        "you will classify the texts to the correct topic. First you will receive the topics, "
        "afterwards an example and finally the text you have to assign one of the before mentioned topics to."
    )
    topics = "The topics are business, entertainment, politics, sport and tech. Please make sure, you know the topics and their meaning."
    transition_to_examples = "Now an example for each of the categories will follow."
    transition_to_text_to_classify = (
        "Now the text, you have to classify will follow. Please assess its topic and answer only the topic of it."
    )

    # Iterate through the test DataFrame rows
    for _, test_row in bbc_data.iterrows():
        prompt = general_instruction + "\n" + topics + "\n" + transition_to_examples + "\n"

        # Iterate through instruction DataFrame to add examples
        for _, instruction_row in bbc_instructions.iterrows():
            category = instruction_row['category']
            example_text = instruction_row['content']
            prompt += f"For the following text: \n{example_text}\nThe correct answer would be: {category}\n"

        # Add the actual text to classify from the test set
        text_to_classify = test_row['content']
        prompt += transition_to_text_to_classify + "\n" + text_to_classify + "\n"
        name = f"{test_row['name']}"
        #print(name)
        prompt_dict = {}
        prompt_dict[name] = prompt
        prompts.append(prompt_dict)
    return prompts            

prompts = make_prompts(instruction_df, test_df)

print(len(prompts))

445


In [None]:
#PLACEHOLDER for Prompting LLaMA

In [None]:
def add_correct_anser(results, test_df):
    result_with_correct_answer = []
    for result in results:
       for key, value in result.items():
            value = value.replace("\n", "")
            value = value.replace(" \n", "")
            value = value.replace(" ","")
            value = value.lower()
            matching_rows = test_df.loc[test_df['name'] == key, 'category']
            
            if not matching_rows.empty:
                category_value = matching_rows.values[0]
                category_value = category_value.lower()
                result_with_correct_answer.append({key: (value, category_value)})
            else:
                print(f"No matching category found for key: {key}")
                result_with_correct_answer.append({key: (value, None)})
    
    return result_with_correct_answer

results_with_correct_answer = add_correct_anser(results, test_df)
print(results_with_correct_answer)
print(len(results_with_correct_answer))

In [None]:
def extract_ground_truth_and_predictions(results_with_correct_answer):
    ground_truth = []
    predictions = []
    for result in results_with_correct_answer:
        for key, value in result.items():
            ground_truth.append(value[1])
            predictions.append(value[0])
    return ground_truth, predictions
ground_truth, predictions = extract_ground_truth_and_predictions(results_with_correct_answer)

In [None]:
def calculate_NMI(ground_truth, predictions):
    
    nmi_score = normalized_mutual_info_score(ground_truth, predictions)
    print(f"Normalized Mutual Information Score: {nmi_score}")
    return nmi_score


In [None]:
import numpy as np
from collections import Counter

def calculate_purity(predicted_labels, true_labels):
    # Convert lists to numpy arrays for easier indexing
    predicted_labels = np.array(predicted_labels)
    true_labels = np.array(true_labels)
    
    # Get unique clusters
    unique_clusters = np.unique(predicted_labels)
    
    # Total number of instances
    total_instances = len(true_labels)
    
    # Calculate the number of correctly classified instances in each cluster
    correctly_classified = 0
    for cluster in unique_clusters:
        # Get the true labels of instances in the current cluster
        indices_in_cluster = np.where(predicted_labels == cluster)[0]
        labels_in_cluster = true_labels[indices_in_cluster]
        
        # Determine the most common true label in this cluster
        majority_label_count = Counter(labels_in_cluster).most_common(1)[0][1]
        
        # Add the number of correctly classified instances in this cluster
        correctly_classified += majority_label_count
    
    # Calculate purity
    purity = correctly_classified / total_instances
    print(f"Purity: {purity}")
    return purity





In [None]:
def calculate_accuracy(predicted_labels, true_labels):
    # Ensure that the predicted_labels and true_labels have the same length
    if len(predicted_labels) != len(true_labels):
        raise ValueError("The length of predicted and true labels must be the same.")
    
    # Count the number of correct predictions
    correct_predictions = sum(1 for pred, true in zip(predicted_labels, true_labels) if pred == true)
    
    # Calculate accuracy
    accuracy = correct_predictions / len(true_labels)
    print(f"Accuracy: {accuracy}")
    return accuracy





In [None]:
from sklearn.metrics import f1_score
def calculate_f1_score(ground_truth, predictions):

    f1 = f1_score(ground_truth, predictions, average='micro') # Because there might be over/ under representation of some classes
    print(f"F1 Score: {f1}")
    return f1



In [None]:
calculate_NMI(ground_truth, predictions)
calculate_purity(predictions, ground_truth)
calculate_accuracy(predictions, ground_truth)
calculate_f1_score(ground_truth, predictions)