In [1]:
import torch
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import os
import json

# Load embeddings
embeddings_path = 'yars_data/base_out/features.pt'
embeddings = torch.load(embeddings_path)
embeddings = embeddings[1:]

# Load paths
paths_file = 'yars_data/base_out/paths.txt'
with open(paths_file, 'r') as file:
    paths = file.readlines()
paths = paths[1:]

base_dir = 'yars_data/photos/'
paths = [base_dir + path.strip() for path in paths]


In [2]:
from sklearn.cluster import KMeans
from sklearn.neighbors import NearestNeighbors

# photo_directory = 'yars_data/photos/'
# paths = [photo_directory + path for path in paths]

# Perform PCA to reduce to 300 components
pca = PCA(n_components=300)
pca_results = pca.fit_transform(embeddings.numpy())

n_clusters = 20
kmeans = KMeans(n_clusters=n_clusters, random_state=0)
clusters = kmeans.fit_predict(pca_results)

cluster_images = {i: [] for i in range(n_clusters)}
cluster_embeddings = {i: [] for i in range(n_clusters)}
for i in range(len(clusters)):
    cluster = clusters[i]
    cluster_images[cluster].append(paths[i])
    cluster_embeddings[cluster].append(embeddings[i])

# print(cluster_images.items())

In [None]:
k = 25
nbrs = NearestNeighbors(n_neighbors=k, algorithm='auto', metric='euclidean')

test_embeddings_path = 'test/caption_features.pt'
test_embeddings = torch.load(test_embeddings_path)
test_embeddings = test_embeddings

test_paths_path = 'test/caption_paths.txt'
with open(test_paths_path, 'r') as file:
    test_paths = file.readlines()
test_paths = test_paths

new_points_pca = pca.transform(test_embeddings.detach().numpy())
new_labels = kmeans.predict(new_points_pca)
print(len(test_paths))

In [None]:
root_dir = 'yars_data'

photos_data = {}
with open(os.path.join(root_dir, 'photos.json'), 'r') as file:
        for line in file:
            line = line.rstrip()
            try:
                photo_record = json.loads(line)
                photos_data[photo_record['photo_id']] = photo_record
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e} at line: {line}")

test_results = []
for i, label in enumerate(new_labels):
    test_cluster = cluster_embeddings[label]
    image_cluster = cluster_images[label]
    nbrs.fit(test_cluster)
    distances, indices = nbrs.kneighbors([test_embeddings[0]])
    
    print(f"Neighbors found for test image {i}")
    neighbor_captions = []
    
    for idx in indices[0]:
        image_id = image_cluster[idx].split('/')[-1][:-4]
        neighbor_captions.append(photos_data[image_id]["caption"])

    test_results.append(neighbor_captions)

In [None]:
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from nltk.corpus import wordnet as wn

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

def tokenize_and_count(captions):
    # Regular expression to find words
    word_pattern = re.compile(r'\b\w+\b')
    text = ' '.join(captions).lower()
    words = word_pattern.findall(text)
    frequency = Counter(words)
    return frequency

test_display_count = 5
k = 5 # top-k labels
fig, axs = plt.subplots(5, 1, figsize=(10, 15))

for i, neighbor_captions in enumerate(test_results[:test_display_count]):
    im = Image.open(test_paths[i])
    freq = tokenize_and_count(neighbor_captions)
    filtered_counts = {word: count for word, count in freq.items() if word not in stop_words}
    
    axs[i, 0].imshow(im)
    axs[i, j].set_title(f'top-{k} labels using KNN: {",".join(sorted(filtered_counts.items(), key=lambda x: x[1], reverse=True)[:k])}')
    axs[i, j].axis('off')
    
plt.tight_layout()
plt.show()