In [1]:
%env HUGGINGFACE_HUB_CACHE= models
%env TORCH_HOME=models

env: HUGGINGFACE_HUB_CACHE=models
env: TORCH_HOME=models


In [22]:
import numpy as np
from moviepy import VideoFileClip
import os
import timm
import torch
import torchvision.transforms as T
from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration, M2M100ForConditionalGeneration, M2M100Tokenizer
import soundfile as sf
from nltk.stem import WordNetLemmatizer
import nltk
from PIL import Image

from sentence_transformers import SentenceTransformer
import faiss
import re
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, precision_score, recall_score, f1_score, normalized_mutual_info_score, \
    adjusted_rand_score
import warnings
from sklearn.metrics.pairwise import cosine_similarity
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer
from pycocoevalcap.cider.cider import Cider
import yt_dlp
warnings.filterwarnings('ignore')

In [23]:
device = "cuda" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

In [24]:
def download_audio(url, audio_format='mp3'):

    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': audio_format,
            'preferredquality': '192',
        }],
        'outtmpl': 'extracted_audi/%(title)s.%(ext)s',
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        print(f"Successfully downloaded and saved audio as {audio_format}.")
    except Exception as e:
        print(f"An error occurred: {e}")

In [25]:
def extract_audio(input_file, output_folder="extracted_audio"):
    os.makedirs(output_folder, exist_ok=True)

    base_name = os.path.basename(input_file)
    file_name, file_ext = os.path.splitext(base_name)

    output_file_path = os.path.join(output_folder, f"{file_name}.mp3")

    if file_ext.lower() == ".mp4":
        print(f"Detected MP4 file. Extracting audio from '{input_file}'...")
        try:
            video_clip = VideoFileClip(input_file)
            audio_clip = video_clip.audio
            audio_clip.write_audiofile(output_file_path)
            audio_clip.close()
            video_clip.close()
            print(f"Audio extracted successfully and saved to '{output_file_path}'")
        except Exception as e:
            print(f"An error occurred during MP4 processing: {e}")

    elif file_ext.lower() == ".mp3":
        print(f"Detected MP3 file. Copying '{input_file}'...")
        try:
            with open(input_file, 'rb') as f_in, open(output_file_path, 'wb') as f_out:
                f_out.write(f_in.read())
            return f_out
        except Exception as e:
            print(f"An error occurred during MP3 processing: {e}")

    else:
        print(f"Unsupported file format: {file_ext}. Please provide an MP4 or MP3 file.")
        return None

In [26]:
processor = WhisperProcessor.from_pretrained("openai/whisper-small")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small", torch_dtype=torch_dtype).to(device)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
    generate_kwargs={"task": "transcribe"}
)


Device set to use cuda


In [27]:
def audio_to_text(audio):
    data, samplerate = sf.read(audio)

    if len(data.shape) > 1:
        mono_data = np.mean(data, axis=1)
    else:
        mono_data = data

    result = pipe({"array": mono_data, "sampling_rate": samplerate}, return_timestamps=True)

    text = result["text"]
    detected_lang = result.get("language", None)

    return text, detected_lang


In [28]:
trans_model_name = "facebook/m2m100_418M"
trans_tokenizer = M2M100Tokenizer.from_pretrained(trans_model_name)
trans_model = M2M100ForConditionalGeneration.from_pretrained(trans_model_name).to(device)

In [29]:
def translate_to_english(text, source_lang):
    if source_lang is None:
        return text

    trans_tokenizer.src_lang = source_lang

    encoded = trans_tokenizer(text, return_tensors="pt").to(device)
    generated_tokens = trans_model.generate(
        **encoded,
        forced_bos_token_id=trans_tokenizer.get_lang_id("en")
    )

    translation = trans_tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
    return translation

In [30]:
lemmatizer = WordNetLemmatizer()
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\dudec\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [31]:
def lemmatize_text(text):
    words = text.lower().split()
    lemmatized_words = [lemmatizer.lemmatize(word, pos='v') for word in words]
    return " ".join(lemmatized_words)

In [32]:
model = SentenceTransformer('all-MiniLM-L6-v2')

In [33]:
def text_to_vector(text):
    text_sentences = [s.strip() for s in re.split(r'[.?!]\s+', text) if s.strip()]
    text_embeddings = model.encode(text_sentences)
    return text_embeddings

In [34]:
# vit_model = timm.create_model('vit_base_patch16_224', pretrained=True)
# vit_model.eval()
# vit_model.reset_classifier(0)
#
# transform = T.Compose([
#     T.Resize((224, 224)),
#     T.ToTensor(),
#     T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
# ])

In [35]:
# def image_to_vector(image_path):
#     img = Image.open(image_path).convert("RGB")
#     x = transform(img).unsqueeze(0)
#
#     with torch.no_grad():
#         patches = vit_model.patch_embed(x)
#
#     patch_vectors = patches.squeeze(0).numpy()
#
#     norm = np.linalg.norm(patch_vectors, axis=1, keepdims=True)
#     normalized_vectors = patch_vectors / norm
#
#     return normalized_vectors

In [36]:
def optimal_n_clusters(vectarray):
    n_samples = len(vectarray)

    if n_samples < 3:
        print("Not enough samples to determine optimal clusters. Defaulting to 1.")
        return 1

    max_clusters = min(n_samples - 1, 30)

    if max_clusters < 3:
        print("Too few samples for meaningful clustering range. Defaulting to 2.")
        return 2

    range_n_clusters = list(range(3, max_clusters))

    if not range_n_clusters:
        return max_clusters

    silhouette_scores = []
    print(f"Testing cluster counts from 3 to {max_clusters-1}...")
    for n_clusters in range_n_clusters:
        if n_clusters >= n_samples:
            break

        clustering_model = KMeans(n_clusters=n_clusters, random_state=0, n_init='auto')
        cluster_labels = clustering_model.fit_predict(vectarray)

        score = silhouette_score(vectarray, cluster_labels)
        silhouette_scores.append(score)

    if not silhouette_scores:
        return 2

    optimal_n = range_n_clusters[np.argmax(silhouette_scores)]
    print(f"Optimal number of clusters found: {optimal_n}")
    return optimal_n

In [37]:
def vector_merger(vectarray):
    n = optimal_n_clusters(vectarray)
    final_clustering_model = KMeans(n_clusters=n, random_state=0, n_init=10)
    merger_model = final_clustering_model.fit_predict(vectarray)

    merged_embeddings = []
    for i in range(n):
        cluster_indices = np.argwhere(merger_model == i).flatten()
        if len(cluster_indices) > 0:
            cluster_embeddings = vectarray[cluster_indices]
            cluster_centroid = np.mean(cluster_embeddings, axis=0)

            norm = np.linalg.norm(cluster_centroid)
            if norm > 0:
                cluster_centroid = cluster_centroid / norm

            merged_embeddings.append(cluster_centroid)

    return np.array(merged_embeddings), merger_model

In [38]:
def faiss_scoring(base_vectarray, test_vectarray):
    test_vectarray = np.array(test_vectarray)
    base_vectarray = np.array(base_vectarray)

    faiss_index = faiss.IndexFlatIP(test_vectarray.shape[1])
    faiss_index.add(test_vectarray)

    distances, _ = faiss_index.search(base_vectarray, k=1)

    similarity_scores = distances.flatten()

    return similarity_scores

In [39]:
def jaccard_similarity(set1, set2):
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return intersection / union if union > 0 else 0.0

In [40]:
def coverage_score(base_vecs, test_vecs, threshold=0.7):
    sim_matrix = cosine_similarity(base_vecs, test_vecs)
    covered = np.sum(np.max(sim_matrix, axis=1) >= threshold)
    return covered / len(base_vecs)

In [41]:
def evaluate_clustered_vectors(base_vecs, test_vecs, base_text, test_text, coverage_threshold=[0.7]):
    results = {}

    # --- Vector-based similarity checks ---
    sim_matrix = cosine_similarity(base_vecs, test_vecs)
    best_matches_base_to_test = np.max(sim_matrix, axis=1)
    results["Asymmetric Mean Similarity (Base to Test)"] = float(np.mean(best_matches_base_to_test))
    best_matches_test_to_base = np.max(sim_matrix, axis=0)
    results["Asymmetric Mean Similarity (Test to Base)"] = float(np.mean(best_matches_test_to_base))

    for i in coverage_threshold:
        results[f"Coverage (Base in Test) @{i}"] = np.sum(best_matches_base_to_test >= i) / len(base_vecs)

    base_vecs_32 = np.ascontiguousarray(base_vecs.astype('float32'))
    test_vecs_32 = np.ascontiguousarray(test_vecs.astype('float32'))

    index = faiss.IndexFlatIP(test_vecs_32.shape[1])
    index.add(test_vecs_32)
    faiss_scores, _ = index.search(base_vecs_32, k=1)

    results["FAISS Similarity (Max of Best Matches)"] = float(np.max(faiss_scores))
    results["FAISS Similarity (Mean of Best Matches)"] = float(np.mean(faiss_scores))

    # --- Text-based similarity checks ---
    # BLEU score
    try:
        # Sentence BLEU needs a list of tokenized words
        base_tokens = base_text.lower().split()
        test_tokens = test_text.lower().split()

        # Use a smoothing function for better results with short sentences
        smoothie = SmoothingFunction().method4
        bleu_score = sentence_bleu([base_tokens], test_tokens, smoothing_function=smoothie)
        results["BLEU Score"] = bleu_score
    except Exception as e:
        results["BLEU Score"] = f"Error: {e}"

    try:
        scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
        scores = scorer.score(base_text, test_text)
        results["ROUGE-1 Score"] = scores['rouge1'].fmeasure
        results["ROUGE-L Score"] = scores['rougeL'].fmeasure
    except Exception as e:
        results["ROUGE Scores"] = f"Error: {e}"

    try:
        gts = {0: [base_text]}
        res = {0: [test_text]}
        cider_scorer = Cider()
        (score, _) = cider_scorer.compute_score(gts, res)
        results["CIDEr Score"] = float(score)
    except Exception as e:
        results["CIDEr Score"] = f"Error: {e}"

    return results

In [42]:
def process_and_evaluate(data_type, base_path, test_path, coverage_thresh):
    if data_type == 'text':
        with open(base_path, 'r') as f:
            base_content = f.read()
        base_vectors = text_to_vector(lemmatize_text(base_content))

        with open(test_path, 'r') as f:
            test_content = f.read()
        test_vectors = text_to_vector(lemmatize_text(test_content))

        base_translated = base_content
        test_translated = test_content

    elif data_type == 'audio':
        # --- Base file ---
        base_text, base_lang = audio_to_text(base_path)
        base_translated = translate_to_english(base_text, base_lang)
        base_vectors = text_to_vector(lemmatize_text(base_translated))

        # --- Test file ---
        test_text, test_lang = audio_to_text(test_path)
        test_translated = translate_to_english(test_text, test_lang)
        test_vectors = text_to_vector(lemmatize_text(test_translated))

        print(f"[Base Audio] Detected {base_lang}, transcribed: {base_text}")
        print(f"[Base Audio] English translation: {base_translated}")
        print(f"[Test Audio] Detected {test_lang}, transcribed: {test_text}")
        print(f"[Test Audio] English translation: {test_translated}")

    else:
        print("Error: Invalid data_type. Please choose 'text', 'image', or 'audio'.")
        return

    print(f"\nBase data has {len(base_vectors)} initial vectors.")
    print(f"Test data has {len(test_vectors)} initial vectors.")

    print("\nFinding optimal clusters and merging base vectors...")
    base_merged_vectors, _ = vector_merger(base_vectors)
    print(f"Created {len(base_merged_vectors)} base cluster centroids.")

    print("\nFinding optimal clusters and merging test vectors...")
    test_merged_vectors, _ = vector_merger(test_vectors)
    print(f"Created {len(test_merged_vectors)} test cluster centroids.")

    # Pass the translated text to the evaluation function
    evaluation_results = evaluate_clustered_vectors(
        base_merged_vectors,
        test_merged_vectors,
        base_translated,
        test_translated,
        coverage_threshold=coverage_thresh
    )

    print("\n--- Evaluation Summary ---")
    print(f"Data Type Compared: {data_type}")
    print(f"Base Path: {base_path} ({len(base_merged_vectors)} centroids)")
    print(f"Test Path: {test_path} ({len(test_merged_vectors)} centroids)")
    print("--------------------------------------------------")
    print(f"Asymmetric Mean Similarity (Base to Test): {evaluation_results['Asymmetric Mean Similarity (Base to Test)']:.4f}")
    print(f"Asymmetric Mean Similarity (Test to Base): {evaluation_results['Asymmetric Mean Similarity (Test to Base)']:.4f}")

    for i in coverage_thresh:
        print(f"Coverage (Base in Test) @{i}:              {evaluation_results[f'Coverage (Base in Test) @{i} ']:.4f}")

    print(f"FAISS Similarity (Max of Best Matches):    {evaluation_results['FAISS Similarity (Max of Best Matches)']:.4f}")
    print(f"FAISS Similarity (Mean of Best Matches):   {evaluation_results['FAISS Similarity (Mean of Best Matches)']:.4f}")
    print("--------------------------------------------------")
    print(f"BLEU Score:                                {evaluation_results['BLEU Score']:.4f}")
    print(f"ROUGE-1 Score:                             {evaluation_results.get('ROUGE-1 Score', 'N/A'):.4f}")
    print(f"CIDEr Score:                               {evaluation_results.get('CIDEr Score', 'N/A'):.4f}")
    print("--------------------------------------------------\n")

In [43]:
DATA_TYPE_CHOICE = ('text')

if DATA_TYPE_CHOICE == 'text':
    BASE_FILE_PATH = 'data/base_text.txt'
    TEST_FILE_PATH = 'data/test_text.txt'

elif DATA_TYPE_CHOICE == 'image':
    BASE_FILE_PATH = 'data/base_image.png'
    TEST_FILE_PATH = 'data/test_image.png'

elif DATA_TYPE_CHOICE == 'audio':
    BASE_FILE_PATH = 'extracted_audio/AI and future of work.mp3'
    # TEST_FILE_PATH = 'extracted_audio/lR-ip0EZQXS_uczWPpahSQ_b9e17546063c4d59822e7488419afbf1_200826.005_MP4_720.mp3'
    TEST_FILE_PATH = 'extracted_audio/AI and future of work.mp3'

COVERAGE_THRESHOLD = [0.7, 0.75, 0.8, 0.85, 0.9, 0.95]

In [44]:
process_and_evaluate(DATA_TYPE_CHOICE, BASE_FILE_PATH, TEST_FILE_PATH, COVERAGE_THRESHOLD)

KeyboardInterrupt: 