In [12]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from transformers import AutoImageProcessor, AutoModel
from sklearn.metrics.pairwise import cosine_similarity
from PIL import Image
import torch
import torch.nn as nn
import re
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [13]:
# Load DINOv2 Model (HuggingFace)
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
processor = AutoImageProcessor.from_pretrained('facebook/dinov2-base')
dino_model = AutoModel.from_pretrained('facebook/dinov2-base').to(device)

# Load VGG16 Model (Keras)
vgg16 = VGG16(weights='imagenet', include_top=False, pooling='avg')

In [15]:
def extract_features(image_path, model_type='vgg16'):
    try:
        if model_type == 'vgg16':
            # VGG16 Feature Extraction
            image = load_img(image_path, target_size=(224, 224))
            image_array = img_to_array(image)
            image_array = np.expand_dims(image_array, axis=0)
            image_array = preprocess_input(image_array)
            features = vgg16.predict(image_array)
            return features.flatten()
        elif model_type == 'dinov2':
            # DINOv2 Feature Extraction
            image = Image.open(image_path).convert("RGB")
            with torch.no_grad():
                inputs = processor(images=image, return_tensors="pt").to(device)
                outputs = dino_model(**inputs)
                image_features = outputs.last_hidden_state
                # Mean pooling of the embeddings
                return image_features.mean(dim=1).squeeze().cpu().numpy()
        else:
            raise ValueError("Invalid model_type. Choose 'vgg16' or 'dinov2'.")
    except Exception as e:
        print(f"Error extracting features for {image_path} using {model_type}: {e}")
        return None  

# Helper function to compute similarity score (0-4 scale)
def similarity_score(vector1, vector2):
    try:
        cosine_sim = cosine_similarity([vector1], [vector2])[0][0]
        if cosine_sim > 0.85:
            return 4
        elif cosine_sim > 0.75:
            return 3
        elif cosine_sim > 0.65:
            return 2
        elif cosine_sim > 0.5:
            return 1
        else:
            return 0
    except Exception as e:
        print(f"Error calculating similarity score: {e}")
        return 0  # Assign 0 similarity in case of an error

# Function to extract article and image numbers for sorting
def extract_article_image_nums(filename):
    match = re.match(r"image_(\d+)_(\d+)", filename)
    if match:
        return int(match.group(1)), int(match.group(2))
    return float('inf'), float('inf')


In [16]:
def compute_semantic_similarity(input_dir, website_list=None, model_type='vgg16'):
    if website_list is None:
        website_list = [f for f in os.listdir(input_dir) if os.path.isdir(os.path.join(input_dir, f))]
    
    for website in tqdm(website_list, desc="Processing Websites"):
        website_path = os.path.join(input_dir, website)
        if not os.path.isdir(website_path):
            print(f"Skipping {website}: Not a directory.")
            continue
        
        for category in tqdm(os.listdir(website_path), desc=f"Processing Categories in {website}", leave=False):
            category_path = os.path.join(website_path, category)
            if not os.path.isdir(category_path):
                continue
            
            image_files = sorted(
                [f for f in os.listdir(category_path) if f.endswith(('.jpg', '.png', '.jpeg'))],
                key=extract_article_image_nums
            )
            image_paths = [os.path.join(category_path, img) for img in image_files]
            
            features = {}
            for img, path in zip(image_files, image_paths):
                features[img] = extract_features(path, model_type=model_type)
            
            n = len(image_files)
            similarity_matrix = np.zeros((n, n))
            
            for i in range(n):
                for j in range(i, n):
                    if i == j:
                        similarity_matrix[i, j] = 4  # Diagonal elements
                    else:
                        try:
                            vec1 = features[image_files[i]]
                            vec2 = features[image_files[j]]
                            if vec1 is None or vec2 is None:
                                similarity_matrix[i, j] = 0
                            else:
                                similarity_matrix[i, j] = similarity_score(vec1, vec2)
                        except Exception as e:
                            print(f"Error processing pair ({image_files[i]}, {image_files[j]}): {e}")
                            similarity_matrix[i, j] = 0
            
            # Mirror upper triangle to lower triangle
            similarity_matrix += np.triu(similarity_matrix, k=1).T
            
            # Save to CSV
            output_path = os.path.join(category_path, f"{model_type}_pred_labels.csv")
            image_names = [img.replace('.jpg', '').replace('.png', '').replace('.jpeg', '') for img in image_files]
            df = pd.DataFrame(similarity_matrix.astype(int), index=image_names, columns=image_names)
            df.to_csv(output_path, index=True)

In [None]:
input_directory = "../../data"
compute_semantic_similarity(input_directory, model_type='dinov2')  # Use 'vgg16' or 'dinov2'