In [3]:
import os
import csv
import time
import re
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel
import torch
import ollama
from difflib import SequenceMatcher

# Load a pre-trained model for semantic similarity
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-distilroberta-v1")
model = AutoModel.from_pretrained("sentence-transformers/all-distilroberta-v1")

# Function to compute embeddings
def compute_embeddings(texts):
    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings

# Function to compute cosine similarity between two embeddings
def compute_cosine_similarity(embedding1, embedding2):
    similarity = torch.nn.functional.cosine_similarity(embedding1, embedding2)
    return similarity.item()

# Function to split text into chunks of approximately 100 words
def split_into_chunks(text, chunk_size=100):
    words = text.split()
    chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
    return chunks

# Function to chunk the tags list to avoid exceeding the token limit
def chunk_tags(tag_list, max_tags_per_chunk=100):
    return [tag_list[i:i + max_tags_per_chunk] for i in range(0, len(tag_list), max_tags_per_chunk)]

# Helper to get the current timestamp
def get_timestamp():
    return time.strftime("%Y%m%d_%H%M%S")

# Function to create a folder based on the model name and timestamp
def create_output_folder(model_name):
    """
    Creates a timestamped output folder for a given model name.
    Replaces forbidden characters in the folder name.
    """
    # Create the base output and tagging folders
    base_output_path = os.path.join("voiceapp", "output")
    combined_tags_folder_path = os.path.join(base_output_path, "combined_tags_folder")
    
    os.makedirs(base_output_path, exist_ok=True)
    os.makedirs(combined_tags_folder_path, exist_ok=True)

    # Sanitize the model name by replacing forbidden characters with '_'
    safe_model_name = re.sub(r'[<>:"/\\|?*]', '_', model_name)

    # Generate a timestamp and create the final folder name
    timestamp = get_timestamp()
    folder_name = f"{safe_model_name}_{timestamp}"

    # Create the model-specific folder
    model_folder_path = os.path.join(combined_tags_folder_path, folder_name)
    os.makedirs(model_folder_path, exist_ok=True)

    return model_folder_path

# Step 1: Combine all tags into combine_tags.csv
def combine_tags_from_files(lista_path, output_folder):
    combined_tags = set()
    with open(lista_path, 'r') as f:
        file_paths = [line.strip() for line in f.readlines()]

    total_files = len(file_paths)
    pbar = tqdm(total=total_files, desc="Combining tags from files")

    for file_path in file_paths:
        file_path = file_path.replace('.mp3', '_best_tags.csv')
        if os.path.exists(file_path):
            with open(file_path, 'r', encoding='utf-8') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    if 'tag_name' in row:
                        combined_tags.add(row['tag_name'].strip())
        pbar.update(1)
    pbar.close()

    # Write to combine_tags.csv
    output_path = os.path.join(output_folder, 'combine_tags.csv')
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['tag_name'])
        for tag in sorted(combined_tags):  # Sort for consistency
            writer.writerow([tag])
    return output_path

# Step 2: Clean tags
def clean_tags(input_path, output_folder):
    def is_similar(word1, word2, threshold=0.8):
        return SequenceMatcher(None, word1, word2).ratio() > threshold

    cleaned_tags = []
    with open(input_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        tags = [row['tag_name'].strip() for row in reader if 'tag_name' in row]

    # Remove duplicates and similar tags
    for tag in tags:
        cleaned_tag = tag.strip()
        if cleaned_tag not in cleaned_tags and len(cleaned_tag) <= 50 and re.match("^[a-zA-Z0-9_-]+$", cleaned_tag):
            if all(not is_similar(cleaned_tag, existing_tag) for existing_tag in cleaned_tags):
                cleaned_tags.append(cleaned_tag)

    # Write cleaned tags to a new file
    output_path = os.path.join(output_folder, 'cleaned_tags.csv')
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['tag_name'])
        for tag in sorted(cleaned_tags):  # Sort for consistency
            writer.writerow([tag])
    return output_path

# Step 3: Generate relevant tags using cleaned tags for each file
def generate_tags_with_llm(lista_path, cleaned_tags_path, output_folder, model_name):
    # Load predefined tags
    with open(cleaned_tags_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        predefined_tags = [row['tag_name'].strip() for row in reader if 'tag_name' in row]

    max_tags = 50
    predefined_tags = predefined_tags[:max_tags]

    with open(lista_path, 'r') as f:
        file_paths = [line.strip().replace('.mp3', '.txt') for line in f.readlines()]

    total_files = len(file_paths)
    pbar = tqdm(total=total_files, desc=f"Generating tags with LLM ({model_name})")

    for file_path in file_paths:
        if os.path.exists(file_path):
            with open(file_path, 'r', encoding='utf-8') as f:
                original_text = f.read()

            chunks = split_into_chunks(original_text, chunk_size=100)
            all_tags_with_similarity = []
            used_tags = set()  # Track tags that have already been used for this file

            for chunk in chunks:
                # Filter predefined_tags to exclude used tags
                available_tags = [tag for tag in predefined_tags if tag not in used_tags]

                # If no tags are left, skip further processing for this chunk
                if not available_tags:
                    break

                response = ollama.chat(
                    model=model_name,
                    messages=[{
                        "role": "user",
                        "content": f"""
                        You are a professional tagger. Your task is to analyze the given text and select exactly 3 relevant tags from the provided list of tags.

                        Instructions:
                        1. Do not summarize the text.
                        2. Only select 3 tags from the provided list.
                        3. Do not provide any extra text or explanations, just return the 3 tags.
                        4. Separate the tags with commas (without spaces or other punctuation).

                        Select 3 tags from the following list:
                        {', '.join(available_tags)}

                        Text:
                        "{chunk}"
                        """
                    }]
                )

                tags = []
                if isinstance(response, list):
                    for message in response:
                        if 'content' in message:
                            tags = [tag.strip() for tag in message['content'].strip().split(',')]
                            break
                elif isinstance(response, dict):
                    tags = [tag.strip() for tag in response.get('message', {}).get('content', "No content available").strip().split(',')]
                else:
                    tags = ["error", "generating", "tags"]

                # Update used tags to ensure no duplicates are generated for this file
                for tag in tags:
                    if tag in available_tags:
                        used_tags.add(tag)

                # Compute cosine similarity for each tag
                chunk_embedding = compute_embeddings([chunk])
                for tag in tags:
                    tag_embedding = compute_embeddings([tag])
                    similarity = compute_cosine_similarity(tag_embedding, chunk_embedding)
                    all_tags_with_similarity.append((tag, similarity))

            unique_tags_with_similarity = list(set(all_tags_with_similarity))

            output_csv_path = os.path.join(output_folder, os.path.basename(file_path).replace('.txt', '_tag_final.csv'))
            with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['tag_name', 'cosine_similarity'])
                for tag, similarity in sorted(unique_tags_with_similarity, key=lambda x: x[1], reverse=True):
                    writer.writerow([tag.strip(), f"{similarity:.4f}"])

        pbar.update(1)

    pbar.close()




In [4]:
if __name__ == "__main__":
    # Lista nazw modeli
    model_names = [        
        'mistral:7b',
        'mistral-small',
         'llama3.2:3b',
        'qwen2:7b',
        'yi',
        'glm4:9b',
        'qwen2.5:7b',
        'qwen2.5:72b'
    ]

    lista_path = 'D:\\Ai\\Audio-Classifier\\voiceapp\\lista-1.txt'

    for model_name in model_names:
        print(f"Processing model: {model_name}")
        
        # Create output folder for each model
        output_folder = create_output_folder(model_name)

        # Step 1: Combine tags
        combined_tags_path = combine_tags_from_files(lista_path, output_folder)

        # Step 2: Clean tags
        cleaned_tags_path = clean_tags(combined_tags_path, output_folder)

        # Step 3: Generate relevant tags using cleaned tags for each file
        generate_tags_with_llm(lista_path, cleaned_tags_path, output_folder, model_name)
        
        print(f"Finished processing model: {model_name}\n")

Processing model: mistral:7b


NameError: name 'tagging_folder_path' is not defined