In [None]:
!pip install timm sentence_transformers ultralytics

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from PIL import Image, ImageDraw
import torchvision.transforms as transforms
import timm
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from ultralytics import YOLO
import torch.nn.functional as F
from transformers import RobertaModel, RobertaTokenizer
import pickle
coco_classes = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
    "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
    "potted plant", "bed", "dining table", "toilet", "TV", "laptop", "mouse", "remote", "keyboard", "cell phone",
    "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear",
    "hair drier", "toothbrush"
]

coco_colors = [
    "red", "green", "blue", "yellow", "purple", "orange", "pink", "brown", "cyan", "magenta", "lime", "maroon", "navy",
    "olive", "teal", "aqua", "fuchsia", "gray", "black", "white", "silver", "gold", "indigo", "violet", "coral", "chocolate",
    "crimson", "darkorange", "darkred", "darkviolet", "deepskyblue", "forestgreen", "hotpink", "lavender", "lightblue",
    "lightgreen", "lightpink", "lightsalmon", "limegreen", "mediumblue", "mediumorchid", "mediumpurple", "midnightblue",
    "mistyrose", "navajowhite", "navyblue", "oldlace", "olivedrab", "orange", "orangered", "orchid", "palegoldenrod", "palegreen",
    "paleturquoise", "palevioletred", "papayawhip", "peachpuff", "peru", "plum", "powderblue", "rosybrown", "royalblue",
    "saddlebrown", "salmon", "sandybrown", "seagreen", "sienna", "skyblue", "slateblue", "slategray", "snow", "springgreen",
    "steelblue", "tan", "thistle", "tomato", "turquoise", "violetred", "wheat", "yellowgreen"
]
custom_classes = ['door', 'dresser', 'lamp', 'wardrobe', 'window']

custom_colors = ["teal", "lime", "gold", "indigo", "grey"]

desired_classes = [
    coco_classes.index('bed'),
    coco_classes.index('chair'),
    coco_classes.index('couch'),
    coco_classes.index('dining table'),
    coco_classes.index('clock'),
    coco_classes.index('potted plant')
]

default_yolo_model = YOLO('yolov8s.pt')
custom_yolo_model = YOLO('static/best (2).pt')

def get_color_map(classes, colors):
    return {cls: color for cls, color in zip(classes, colors)}

coco_color_map = get_color_map(coco_classes, coco_colors)
custom_color_map = get_color_map(custom_classes, custom_colors)

def detect_objects(image_path, save_dir, desired_classes, min_confidence=0.7):
    img = Image.open(image_path).convert('RGB')
    results_default = default_yolo_model(img)
    results_custom = custom_yolo_model(img)
    detections = []
    draw = ImageDraw.Draw(img)

    for result in results_default:
        for box in result.boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])
            if cls in desired_classes and conf >= min_confidence:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                detections.append((x1, y1, x2, y2, conf, cls))
                color = coco_color_map[coco_classes[cls]]
                draw.rectangle([x1, y1, x2, y2], outline=color, width=10)
                draw.text((x1, y1), f"{coco_classes[cls]}:{conf:.2f}", fill=color)

    for result in results_custom:
        for box in result.boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])
            if conf >= min_confidence:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                detections.append((x1, y1, x2, y2, conf, cls))
                color = custom_color_map[custom_classes[cls]]
                draw.rectangle([x1, y1, x2, y2], outline=color, width=10)
                draw.text((x1, y1), f"{custom_classes[cls]}:{conf:.2f}", fill=color)

    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, os.path.basename(image_path))
    img.save(save_path)
    return detections


def expand_roi(x1, y1, x2, y2, image_size, expansion_factor=1.5):
    width, height = image_size
    new_width = (x2 - x1) * expansion_factor
    new_height = (y2 - y1) * expansion_factor
    x1 = max(0, x1 - (new_width - (x2 - x1)) / 2)
    y1 = max(0, y1 - (new_height - (y2 - y1)) / 2)
    x2 = min(width, x1 + new_width)
    y2 = min(height, y1 + new_height)
    return int(x1), int(y1), int(x2), int(y2)

def combine_features_with_attention(full_image_features, roi_features, roi_confidences, full_weight=0.2):
    roi_confidences = np.array(roi_confidences)
    attention_weights = F.softmax(torch.tensor(roi_confidences), dim=0).numpy()
    weighted_features = [full_image_features * full_weight]
    weights = [full_weight]
    detection_weight = (1 - full_weight) / len(roi_features)
    for features, weight in zip(roi_features, attention_weights):
        weighted_features.append(features * weight * detection_weight)
        weights.append(weight * detection_weight)
    combined_features = np.sum(weighted_features, axis=0) / sum(weights)
    return combined_features

def encode_and_save_images_convnext_with_yolov8(dataset_image_paths, save_path, detect_save_dir):
    model = timm.create_model('convnext_base', pretrained=True)
    model.eval()
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    device = torch.device("cpu")
    model.to(device)
    image_embeddings = []
    for image_path in dataset_image_paths:
        print(f"Processing image with ConvNeXt and YOLOv8: {image_path}")
        image = Image.open(image_path).convert('RGB')
        detections = detect_objects(image_path, detect_save_dir, desired_classes)
        full_image_tensor = transform(image).unsqueeze(0).to(device)
        with torch.no_grad():
            full_image_features = model(full_image_tensor).squeeze(0).cpu().numpy()

        if not detections:
            image_embeddings.append(full_image_features)
        else:
            roi_embeddings = []
            roi_confidences = []
            for det in detections:
                x1, y1, x2, y2, conf, cls = det
                x1, y1, x2, y2 = expand_roi(x1, y1, x2, y2, image.size)
                cropped_image = image.crop((x1, y1, x2, y2))
                image_tensor = transform(cropped_image).unsqueeze(0).to(device)
                with torch.no_grad():
                    features = model(image_tensor).squeeze(0).cpu().numpy()
                roi_embeddings.append(features)
                roi_confidences.append(conf)
            combined_features = combine_features_with_attention(full_image_features, roi_embeddings, roi_confidences, full_weight=0.6)
            image_embeddings.append(combined_features)
    np.save(save_path, np.array(image_embeddings))

def encode_with_roberta(descriptions, model, tokenizer):
    encoded_inputs = tokenizer(descriptions, padding=True, truncation=True, return_tensors="pt", max_length=512)
    with torch.no_grad():
        model_output = model(**encoded_inputs)
    embeddings = model_output.last_hidden_state[:, 0, :].cpu().numpy()
    return embeddings

def ensemble_embeddings(sbert_embeddings, roberta_embeddings, weights=[0.5, 0.5]):
    roberta_embeddings_reduced = roberta_embeddings[:, :sbert_embeddings.shape[1]]
    combined_embeddings = (weights[0] * sbert_embeddings + weights[1] * roberta_embeddings_reduced)
    return combined_embeddings

def calculate_accuracy(relevant_images, query_embedding, all_embeddings):
    similarities = cosine_similarity([query_embedding], all_embeddings)[0]
    most_similar_index = np.argmax(similarities)
    return most_similar_index in relevant_images

def calculate_ap(relevant_images, query_embedding, all_embeddings):
    similarities = cosine_similarity([query_embedding], all_embeddings)[0]
    sorted_indices = np.argsort(-similarities)
    is_relevant = np.array([index in relevant_images for index in sorted_indices])
    precisions = np.cumsum(is_relevant) / (np.arange(len(is_relevant)) + 1)
    return np.sum(precisions * is_relevant) / np.sum(is_relevant)

def calculate_map(all_image_paths, relevant_images_dict, cv_embeddings, nlp_embeddings):
    ap_scores = []
    path_to_index = {path: idx for idx, path in enumerate(all_image_paths)}
    for image_path, relevant_images in relevant_images_dict.items():
        relevant_indices = [path_to_index[path] for path in relevant_images]
        query_cv_embedding = cv_embeddings[path_to_index[image_path]]
        query_nlp_embedding = nlp_embeddings[path_to_index[image_path]]
        fused_similarities = late_fusion(cosine_similarity([query_cv_embedding], cv_embeddings)[0], cosine_similarity([query_nlp_embedding], nlp_embeddings)[0])
        sorted_indices = np.argsort(-fused_similarities)
        is_relevant = np.array([index in relevant_indices for index in sorted_indices])
        precisions = np.cumsum(is_relevant) / (np.arange(len(is_relevant)) + 1)
        ap = np.sum(precisions * is_relevant) / np.sum(is_relevant)
        print(f"AP for {image_path}: {ap}")
        ap_scores.append(ap)
    map_score = np.mean(ap_scores)
    return map_score

def late_fusion(cv_similarities, nlp_similarities):
    return (cv_similarities + nlp_similarities) / 2

def search_engine(image_query_path, text_query, image_embeddings, text_embeddings, all_image_paths, top_k=5):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = timm.create_model('convnext_base', pretrained=True).to(device)
    model.eval()

    image = Image.open(image_query_path).convert('RGB')
    image_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        image_embedding = model(image_tensor).squeeze(0).cpu().numpy()
    model = SentenceTransformer('all-MiniLM-L6-v2')
    text_embedding = model.encode([text_query])[0]
    image_similarities = cosine_similarity(image_embedding.reshape(1, -1), image_embeddings).flatten()
    text_similarities = cosine_similarity(text_embedding.reshape(1, -1), text_embeddings).flatten()
    combined_similarities = 0.5 * image_similarities + 0.5 * text_similarities
    ranked_indices = np.argsort(-combined_similarities)[:top_k]
    ranked_results = [(all_image_paths[idx], combined_similarities[idx]) for idx in ranked_indices]

    return ranked_results

def visualize_results(ranked_results):
    fig, axes = plt.subplots(nrows=1, ncols=len(ranked_results), figsize=(5, 5))
    for idx, (result_path, similarity) in enumerate(ranked_results, 1):
        img = mpimg.imread(result_path)
        axes[idx-1].imshow(img)
        # axes[idx-1].set_title(f"Rank {idx}\nSimilarity: {similarity:.2f}")
        axes[idx-1].axis('off')
    plt.tight_layout()
    plt.show()

def main():
    data_dir = "static/descriptions"
    detect_save_dir = "static/detection sample"
    all_image_paths = []
    descriptions = []
    relevant_images_dict = {}
    model = timm.create_model('convnext_base', pretrained=True)
    model.eval()
    tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
    sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
    roberta_model = RobertaModel.from_pretrained('roberta-base')

    for filename in os.listdir(data_dir):
        if filename.endswith(".xlsx"):
            file_path = os.path.join(data_dir, filename)
            df = pd.read_excel(file_path)
            image_paths = ["static/" + path for path in df.iloc[:, 0].tolist()]
            desc = df.iloc[:, 1].tolist()
            descriptions.extend(desc)
            all_image_paths.extend(image_paths)
            for image_path, description in zip(image_paths, desc):
                relevant_images_dict[image_path] = [img_path for img_path in image_paths if img_path != image_path]

            sbert_encoded_file = f'static/encodes/sBERT/encoded_texts_sBERT_{filename[:-5]}.npy'
            roberta_encoded_file = f'static/encodes/RoBERTa/encoded_texts_RoBERTa_{filename[:-5]}.npy'

            if not os.path.exists(sbert_encoded_file):
                sbert_embeddings = sbert_model.encode(desc, show_progress_bar=False)
                np.save(sbert_encoded_file, sbert_embeddings)

            if not os.path.exists(roberta_encoded_file):
                roberta_embeddings = encode_with_roberta(desc, roberta_model, tokenizer)
                np.save(roberta_encoded_file, roberta_embeddings)

            encoded_images_file = f'static/encodes/ConvNeXtV1&YOLOv8/ConvNeXtV1&YOLOv8_{filename[:-5]}.npy'
            if not os.path.exists(encoded_images_file):
                encode_and_save_images_convnext_with_yolov8(image_paths, encoded_images_file, detect_save_dir)

    cv_embeddings = []
    nlp_embeddings = []
    for filename in os.listdir(data_dir):
        if filename.endswith(".xlsx"):
            encoded_images_file = f'static/encodes/ConvNeXtV1&YOLOv8/ConvNeXtV1&YOLOv8_{filename[:-5]}.npy'
            sbert_encoded_file = f'static/encodes/sBERT/encoded_texts_sBERT_{filename[:-5]}.npy'
            roberta_encoded_file = f'static/encodes/RoBERTa/encoded_texts_RoBERTa_{filename[:-5]}.npy'
            cv_embeddings.append(np.load(encoded_images_file))
            sbert_embeddings = np.load(sbert_encoded_file)
            roberta_embeddings = np.load(roberta_encoded_file)
            nlp_embeddings.append(ensemble_embeddings(sbert_embeddings, roberta_embeddings))

    cv_embeddings = np.vstack(cv_embeddings)
    nlp_embeddings = np.vstack(nlp_embeddings)
    image_query_path = 'static/Grad Dataset/chair/Image_30.jpeg'
    text_query = 'black'
    top_k = 2
    ranked_results = search_engine(image_query_path, text_query, cv_embeddings, nlp_embeddings, all_image_paths, top_k)
    visualize_results(ranked_results)
    
    # Save the model and embeddings to a pickle file
    model_dict = {
        'sbert_model': sbert_model,
        'transform': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]),
        'roberta_model': roberta_model,
        'cv_embeddings': cv_embeddings,
        'nlp_embeddings': nlp_embeddings,
        'model' : model
    }
    model_file_path = 'model.pkl'
    with open(model_file_path, 'wb') as f:
        pickle.dump(model_dict, f)
    print("Model saved successfully as", model_file_path)

if __name__ == "__main__":
    main()

In [None]:
import pickle
model_dict = {
    
    'sbert_model': sbert_model,
    'model' : model,
    'transform': transform,
    'roberta_model' : roberta_model,
    'cv_embeddings': cv_embeddings,
    'nlp_embeddings': nlp_embeddings,
}
model_file_path = 'model.pkl'
with open(model_file_path, 'wb') as f:
    pickle.dump(model_dict, f)
print("Model saved successfully as", model_file_path)