In [24]:
import os
import time
import csv
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel
import torch
import ollama
import re

def get_timestamp():
    """
    Returns the current timestamp in the format YYYYMMDD_HHMMSS.
    """
    return time.strftime("%Y%m%d_%H%M%S")

def create_output_folder(model_name):
    """
    Creates a timestamped output folder for a given model name.
    Replaces forbidden characters in the folder name.
    """
    # Create the base output and tagging folders
    base_output_path = os.path.join("voiceapp", "output")
    tagging_folder_path = os.path.join(base_output_path, "tagging_folder")
    
    os.makedirs(base_output_path, exist_ok=True)
    os.makedirs(tagging_folder_path, exist_ok=True)

    # Sanitize the model name by replacing forbidden characters with '_'
    safe_model_name = re.sub(r'[<>:"/\\|?*]', '_', model_name)

    # Generate a timestamp and create the final folder name
    timestamp = get_timestamp()
    folder_name = f"{safe_model_name}_{timestamp}"

    # Create the model-specific folder
    model_folder_path = os.path.join(tagging_folder_path, folder_name)
    os.makedirs(model_folder_path, exist_ok=True)

    return model_folder_path

def initialize_environment(models_list):
    """
    Initializes the environment by creating output folders for each model
    and loading the tokenizer and embedding model.
    """
    output_folders = {}

    for model_name in models_list:
        output_folders[model_name] = create_output_folder(model_name)

    tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-distilroberta-v1")
    model = AutoModel.from_pretrained("sentence-transformers/all-distilroberta-v1")

    return output_folders, tokenizer, model


def read_file_paths(input_file):
    """
    Reads file paths from the input file.
    """
    with open(input_file, 'r') as f:
        file_paths = [line.strip().replace('.mp3', '.txt') for line in f]
    return file_paths

def split_into_chunks(text, chunk_size=100):
    """
    Splits a text into chunks of approximately 100 words.
    """
    words = text.split()
    return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

def compute_embeddings(texts, tokenizer, model):
    """
    Computes embeddings for a list of texts using the given tokenizer and model.
    """
    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings

def compute_cosine_similarity(embedding1, embedding2):
    """
    Computes the cosine similarity between two embeddings.
    """
    return torch.nn.functional.cosine_similarity(embedding1, embedding2).item()

def tag_text(chunk, model_name):
    """
    Generates tags for a given text chunk using the specified LLM model.
    """
    response = ollama.chat(
        model=model_name,
        messages=[{
            "role": "user",
            "content": f"""
            You are a professional tagger. Your task is to analyze a given text and return 3 highly relevant tags to the main topics and themes of the text. 

            Guidelines:
            1. Only provide the tags, nothing else.
            2. Each tag must be a single word, not a phrase.
            3. Separate the tags with commas, without spaces or additional formatting.

            Example Input:
            "Artificial intelligence is transforming industries like healthcare, finance, and transportation."

            Example Output:
            ai,technology,automation

            Now, generate tags for the following text:
            "{chunk}"
            """
        }]
    )

    # Extract and clean tags
    if isinstance(response, list):
        tags = [msg['content'] for msg in response if 'content' in msg]
    elif isinstance(response, dict):
        tags = response.get('message', {}).get('content', "error,generating,tags").split(',')
    else:
        tags = ["error", "generating", "tags"]

    return [tag.strip() for tag in tags]

def select_best_iteration(all_tags_per_iteration, avg_similarities_per_iteration):
    """
    Selects the best iteration based on the highest average cosine similarity.
    Returns the tags from the best iteration and its average similarity.
    """
    # Find the iteration with the highest average similarity
    best_iteration = max(avg_similarities_per_iteration, key=lambda x: x[1])[0]
    
    # Get the tags from the best iteration
    best_iteration_tags = all_tags_per_iteration[best_iteration - 1]

    return best_iteration_tags

def process_file(file_path, model_name, output_folder, tokenizer, model, iterations=5):
    """
    Processes a single file: splits it into chunks, tags each chunk, and saves results.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        original_text = f.read()

    chunks = split_into_chunks(original_text)
    all_tags_per_iteration = []  # To store tags per iteration
    avg_similarities_per_iteration = []  # To store average similarity per iteration

    for iteration in range(1, iterations + 1):
        iteration_tags = []
        iteration_similarities = []

        for chunk in chunks:
            tags = tag_text(chunk, model_name)
            chunk_embedding = compute_embeddings([chunk], tokenizer, model)
            tag_similarities = []

            for tag in tags:
                tag_embedding = compute_embeddings([tag], tokenizer, model)
                similarity = compute_cosine_similarity(tag_embedding, chunk_embedding)
                tag_similarities.append((tag, similarity))

            iteration_tags.extend(tag_similarities)

        # Sort tags by similarity
        sorted_tags = sorted(iteration_tags, key=lambda x: x[1], reverse=True)
        all_tags_per_iteration.append(sorted_tags)

        # Save results to CSV for this iteration
        csv_output_path = os.path.join(
            output_folder,
            f"{os.path.basename(file_path).replace('.txt', '')}_tags_{iteration}.csv"
        )
        save_tags_to_csv(csv_output_path, sorted_tags)

        # Calculate average similarity for this iteration
        avg_similarity = sum([similarity for _, similarity in iteration_tags]) / len(iteration_tags)
        avg_similarities_per_iteration.append((iteration, avg_similarity))

    # Use the select_best_iteration function to get the best tags
    best_iteration_tags = select_best_iteration(all_tags_per_iteration, avg_similarities_per_iteration)

    # Save the best tags to a CSV file
    best_csv_output_path = os.path.join(
        output_folder,
        f"{os.path.basename(file_path).replace('.txt', '')}_best_tags.csv"
    )
    save_tags_to_csv(best_csv_output_path, best_iteration_tags)
        




def save_tags_to_csv(output_path, tags):
    """
    Saves tags and their cosine similarities to a CSV file.
    """
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['tag_name', 'cosine_similarity'])
        for tag, similarity in tags:
            csv_writer.writerow([tag, f"{similarity:.4f}"])




In [27]:
def main():
    # Initialize variables
    input_file = 'D:\\Ai\\Audio-Classifier\\voiceapp\\lista-1.txt'
    models_list = [        
        'mistral:7b',
        'mistral-small',
         'llama3.2:3b',
        'qwen2:7b',
        'yi',
        'glm4:9b',
        'qwen2.5:7b',
        'qwen2.5:72b'
    ]
    output_folders, tokenizer, model = initialize_environment(models_list)

    # Read file paths
    file_paths = read_file_paths(input_file)

    # Process each file
    pbar = tqdm(total=len(file_paths), desc="Processing files")
    for file_path in file_paths:
        if os.path.exists(file_path):
            for model_name, model_folder in output_folders.items():
                process_file(file_path, model_name, model_folder, tokenizer, model)
        pbar.update(1)
    pbar.close()
if __name__ == "__main__":
    main()










Processing files:   0%|                                                                                                                                                                                   | 0/97 [00:00<?, ?it/s][A[A[A[A[A[A[A[A

KeyboardInterrupt: 