In [1]:
import torch
from transformers import CLIPProcessor, CLIPModel
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import numpy as np
from datasets import load_dataset
from PIL import Image
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import requests
from io import BytesIO
import io
from scipy.sparse import csr_matrix
import faiss
from sklearn.decomposition import TruncatedSVD
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from gensim.models import Word2Vec

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
ds = load_dataset("Artificio/WikiArt")

In [3]:
train_data = ds['train']

In [4]:
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [5]:
artist_encoder = LabelEncoder()
style_encoder = LabelEncoder()
genre_encoder = LabelEncoder()

In [6]:
artist_encoder.fit(train_data['artist'])
style_encoder.fit(train_data['style'])
genre_encoder.fit(train_data['genre'])

In [7]:
def encode_metadata(example):
    example['artist_encoded'] = artist_encoder.transform([example['artist']])[0]
    example['style_encoded'] = style_encoder.transform([example['style']])[0]
    example['genre_encoded'] = genre_encoder.transform([example['genre']])[0]
    return example

In [8]:
dataset_encoded = train_data.map(encode_metadata)

In [75]:
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [76]:
def generate_clip_embeddings(example):
    if isinstance(example['image'], str):  # It's a file path
        image = Image.open(example['image']).convert("RGB")
    elif isinstance(example['image'], Image.Image):  # It's a PIL image
        image = example['image'].convert("RGB")
    else:
        raise ValueError(f"Unexpected type for 'image': {type(example['image'])}")

    inputs = clip_processor(images=image, return_tensors="pt").to(device)
    with torch.no_grad():
        image_features = clip_model.get_image_features(**inputs)
    
    example['image_embeddings'] = image_features.cpu().numpy()
    return example

In [77]:
dataset_encoded = dataset_encoded.map(generate_clip_embeddings)

NameError: name 'dataset_encoded' is not defined

In [24]:
def combine_embeddings(example):
    # Ensure metadata is properly processed
    metadata_vector = np.array([
        example['artist_encoded'],
        example['style_encoded'],
        example['genre_encoded']
    ], dtype=np.float32)

    # Normalize metadata vector
    metadata_vector = torch.nn.functional.normalize(
        torch.tensor(metadata_vector), dim=0
    ).numpy()

    # Handle the case where image embeddings are stored as a list
    image_embeddings = np.array(example['image_embeddings'])  # Convert to NumPy array if needed

    # Combine embeddings
    combined_embedding = np.concatenate([image_embeddings.flatten(), metadata_vector])
    example['combined_embeddings'] = combined_embedding
    return example


In [25]:
dataset_encoded = dataset_encoded.map(combine_embeddings)

Map: 100%|██████████| 103250/103250 [01:08<00:00, 1505.15 examples/s]


In [27]:
np.save("combined_embeddings.npy", np.vstack(dataset_encoded['combined_embeddings']))

print("Embeddings prepared and saved successfully!")

Embeddings prepared and saved successfully!


In [30]:
all_embeddings = np.array([example['combined_embeddings'] for example in dataset_encoded])

In [9]:
all_embeddings = np.load('combined_embeddings.npy')

In [10]:
all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True)

## Content-Based Filtering

In [11]:
def combine_embeddings_for_recommendation(current_embedding, previous_embedding=None, weight=0.7):
    """
    Combine embeddings using a weighted approach.
    
    Args:
        current_embedding (np.array): Embedding of the current artwork.
        previous_embedding (np.array): Combined embedding from previous interactions, or None.
        weight (float): Weight given to the current embedding (0 <= weight <= 1).
    
    Returns:
        np.array: Weighted combined embedding.
    """
    if previous_embedding is None:
        # No previous embedding exists
        return current_embedding
    else:
        # Combine embeddings using weights
        return weight * current_embedding + (1 - weight) * previous_embedding

In [12]:
def recommend_similar_artworks(combined_embedding, all_embeddings, k=5):
    """
    Recommend artworks similar to the combined embedding.
    
    Args:
        combined_embedding (np.array): Weighted combined embedding for recommendation.
        all_embeddings (np.array): All artwork embeddings in the dataset.
        k (int): Number of recommendations to return.
    
    Returns:
        list: Indices of the top-k recommended artworks.
    """
    # Compute cosine similarity
    similarities = cosine_similarity([combined_embedding], all_embeddings)
    top_k_indices = similarities.argsort()[0][-k:][::-1]  # Top-k most similar
    return top_k_indices


In [13]:
# Example usage
# Assume `current_embedding` is the embedding of the clicked artwork
# `previous_combined_embedding` is the embedding combined from prior interactions (initially None)
# `all_embeddings` contains all the artwork embeddings in the dataset

# User clicks on an artwork
current_embedding = all_embeddings[10]  # Example: clicked artwork's embedding
previous_combined_embedding = None  # Initially, no previous embedding exists

# Combine embeddings
weight = 0.7  # Example: give 70% weight to the current click
combined_embedding = combine_embeddings_for_recommendation(
    current_embedding, previous_combined_embedding, weight
)

# Update the previous combined embedding for future interactions
previous_combined_embedding = combined_embedding

# Recommend similar artworks
recommended_indices = recommend_similar_artworks(combined_embedding, all_embeddings)
print("Recommended Artwork Indices:", recommended_indices)

Recommended Artwork Indices: [   10 60455 97505 27393 75978]


In [14]:
# Access the first data point from the training set
current_data_point = ds['train'][10]

# Check the column name that contains the image
image_column_name = "image"  # Replace with the actual column name if different

# Get the image from the dataset
image_data = current_data_point[image_column_name]

In [15]:
# Display the image
if isinstance(image_data, Image.Image):  # For already decoded images
    image_data.show()
elif isinstance(image_data, bytes):  # For encoded images (e.g., byte strings)
    image = Image.open(io.BytesIO(image_data))
    image.show()
else:
    print("Image format not recognized!")

In [16]:
for i in recommended_indices:
    curr_img_data = ds['train'][int(i)]['image']
    if isinstance(image_data, Image.Image):  # For already decoded images
        curr_img_data.show()
    elif isinstance(image_data, bytes):  # For encoded images (e.g., byte strings)
        new_image = Image.open(io.BytesIO(curr_img_data))
        new_image.show()
    else:
        print("Image format not recognized!")

## Collaborative Filtering

In [17]:
user_interactions = [
    [0, 1],  # User 0 interacted with data point 1
    [0, 2],  # User 0 interacted with data point 2
    [1, 1],  # User 1 interacted with data point 0
    [1, 2],  # User 1 interacted with data point 3
    [2, 1],  # User 2 interacted with data point 4
    [3, 100],
    [4, 100],
    [3, 101],
    [4, 101],
    [3, 102]
]

In [18]:
user_ids = [interaction[0] for interaction in user_interactions]
data_point_indices = [interaction[1] for interaction in user_interactions]

In [19]:
n_users = max(user_ids) + 1  # Total number of users
n_items = all_embeddings.shape[0]  # Total number of data points from embeddings

In [20]:
data = [1] * len(user_interactions)  # Interaction weights (all set to 1 here)
interaction_matrix = csr_matrix((data, (user_ids, data_point_indices)), shape=(n_users, n_items))

In [21]:
print(f"Sparse Interaction Matrix Shape: {interaction_matrix.shape}")
print(f"Non-zero interactions: {interaction_matrix.nnz}")

# Compute user-user similarity using cosine similarity
user_similarity = cosine_similarity(interaction_matrix)

print("User Similarity Matrix:")
print(user_similarity)

Sparse Interaction Matrix Shape: (5, 103250)
Non-zero interactions: 10
User Similarity Matrix:
[[1.         1.         0.70710678 0.         0.        ]
 [1.         1.         0.70710678 0.         0.        ]
 [0.70710678 0.70710678 1.         0.         0.        ]
 [0.         0.         0.         1.         0.81649658]
 [0.         0.         0.         0.81649658 1.        ]]


In [22]:

def recommend_items_user_user(user_id, interaction_matrix, user_similarity, top_k=5):
    """
    Recommend items to a user based on user-user collaborative filtering.

    Args:
        user_id: ID of the user to recommend items for.
        interaction_matrix: Sparse matrix of user-item interactions.
        user_similarity: User similarity matrix.
        top_k: Number of recommendations to generate.

    Returns:
        List of recommended item indices.
    """
    # Get user similarity scores for the given user
    similar_users = np.argsort(-user_similarity[user_id])[1:]  # Exclude self (at index 0)

    # Get indices of items interacted by the target user
    target_user_items = set(interaction_matrix[user_id].nonzero()[1])

    # Keep track of item scores
    item_scores = {}

    # Aggregate scores from similar users
    for similar_user in similar_users:
        similarity_score = user_similarity[user_id, similar_user]

        # Get items interacted by the similar user
        similar_user_items = interaction_matrix[similar_user].nonzero()[1]

        for item in similar_user_items:
            if item not in target_user_items:  # Exclude already interacted items
                if item not in item_scores:
                    item_scores[item] = 0
                item_scores[item] += similarity_score

    # Sort items by aggregated score and return top_k recommendations
    recommended_items = sorted(item_scores.keys(), key=lambda x: -item_scores[x])[:top_k]
    return recommended_items

In [23]:
user_id = 2  # Example user ID
recommended_items = recommend_items_user_user(user_id, interaction_matrix, user_similarity, top_k=3)
print(f"Recommended items for user {user_id}: {recommended_items}")

Recommended items for user 2: [2, 100, 101]


## Collaborative Filtering using NCF

In [24]:
class ArtworkDataset():
    def __init__(self, user_item_matrix, item_embeddings):
        """
        user_item_matrix: List of tuples [(user_id, item_id), ...]
        item_embeddings: Tensor of shape (num_items, combined_embedding_dim) -> Precomputed combined embeddings
        """
        self.user_item_matrix = user_item_matrix
        self.item_embeddings = item_embeddings
    
    def __len__(self):
        return len(self.user_item_matrix)
    
    def __getitem__(self, idx):
        user_id, item_id = self.user_item_matrix[idx]
        item_embedding = self.item_embeddings[item_id]
        return user_id, item_embedding

In [25]:
class NCFModel(nn.Module):
    def __init__(self, num_users, embedding_dim, combined_embedding_dim, hidden_layers):
        super(NCFModel, self).__init__()
        # Embedding layer for users (initialize with max number of users)
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        
        # MLP for interaction modeling
        self.mlp = nn.Sequential(
            nn.Linear(embedding_dim + combined_embedding_dim, hidden_layers[0]),
            nn.ReLU(),
            *[layer for hidden_dim in zip(hidden_layers[:-1], hidden_layers[1:]) 
              for layer in (nn.Linear(hidden_dim[0], hidden_dim[1]), nn.ReLU())]
        )
        
        # Output layer
        self.output = nn.Linear(hidden_layers[-1], 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, user_id, item_embedding):
        user_vec = self.user_embedding(user_id)
        combined = torch.cat([user_vec, item_embedding], dim=-1)
        hidden = self.mlp(combined)
        return self.sigmoid(self.output(hidden))
    
    def resize_user_embeddings(self, new_num_users):
        """
        Resize the user embedding layer to accommodate new users.
        """
        old_weights = self.user_embedding.weight.data
        self.user_embedding = nn.Embedding(new_num_users, old_weights.size(1))
        with torch.no_grad():
            self.user_embedding.weight[:old_weights.size(0)] = old_weights

In [26]:
def get_num_users(user_item_matrix):
    return max(user_id for user_id, _ in user_item_matrix) + 1

In [27]:
embedding_dim = 64
hidden_layers = [128, 64, 32]
batch_size = 128
learning_rate = 0.001
epochs = 10

In [28]:
user_item_matrix = [
    [0, 1],  # User 0 interacted with data point 1
    [0, 2],  # User 0 interacted with data point 2
    [1, 1],  # User 1 interacted with data point 0
    [1, 2],  # User 1 interacted with data point 3
    [2, 1],  # User 2 interacted with data point 4
    [3, 100],
    [4, 100],
    [3, 101],
    [4, 101],
    [4, 10]
]

all_embeddings_ncf = torch.tensor(all_embeddings, dtype=torch.float32)

In [29]:
num_users = get_num_users(user_item_matrix)

In [30]:
dataset_ncf = ArtworkDataset(user_item_matrix, all_embeddings_ncf)
data_loader = DataLoader(dataset_ncf, batch_size=batch_size, shuffle=True)

In [31]:
model = NCFModel(num_users, embedding_dim, all_embeddings_ncf.size(1), hidden_layers)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [32]:
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for user_id, item_embedding in data_loader:
        user_id = user_id.long()  # User IDs
        preds = model(user_id, item_embedding.float())
        labels = torch.ones_like(preds)  # Replace with actual labels if available
        loss = criterion(preds, labels)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}")

Epoch 1/10, Loss: 0.7312
Epoch 2/10, Loss: 0.7181
Epoch 3/10, Loss: 0.7096
Epoch 4/10, Loss: 0.7037
Epoch 5/10, Loss: 0.6980
Epoch 6/10, Loss: 0.6925
Epoch 7/10, Loss: 0.6872
Epoch 8/10, Loss: 0.6816
Epoch 9/10, Loss: 0.6760
Epoch 10/10, Loss: 0.6698


In [33]:
new_user_id = 3
if new_user_id >= model.user_embedding.num_embeddings:
    print("Resizing user embedding for new users.")
    model.resize_user_embeddings(new_user_id + 1)

# Inference Example
user_id = torch.tensor([0])  # Example user
item_embedding = all_embeddings_ncf[1]  # Example item embedding
prediction = model(user_id, item_embedding.unsqueeze(0))
print(f"Recommendation score: {prediction.item()}")

Recommendation score: 0.519916296005249


In [34]:
def get_top_k_recommendations(user_id, model, all_embeddings, k=5):
    """
    user_id: The user for whom we want to get the recommendations
    model: The trained NCF model
    all_embeddings: The precomputed item embeddings (tensor)
    k: The number of recommendations to return (default 5)
    
    Returns:
    top_k_items_with_scores: List of tuples [(item_id, score), ...] for the top k recommended items
    """
    # Ensure user_id is a tensor
    user_id = torch.tensor([user_id], dtype=torch.long)
    
    # Get the user embedding from the model
    user_embedding = model.user_embedding(user_id)
    
    # Compute predicted scores for all items
    with torch.no_grad():
        scores = []
        for item_id in range(all_embeddings.shape[0]):  # Loop over all items
            item_embedding = all_embeddings[item_id].unsqueeze(0)  # Get embedding for the item
            score = model(user_id, item_embedding)  # Predict score for this item
            scores.append(score.item())  # Store the score
        
    # Convert scores to tensor
    scores = torch.tensor(scores)
    
    # Get the top k item indices and scores
    top_k_values, top_k_indices = torch.topk(scores, k)
    
    # Combine indices and their scores into a list of tuples
    top_k_items_with_scores = [(top_k_indices[i].item(), top_k_values[i].item()) for i in range(k)]
    
    return top_k_items_with_scores

In [35]:
user_id = 3  # Example user ID
top_k_items_with_scores = get_top_k_recommendations(user_id, model, all_embeddings_ncf, k=5)

# Print the top 5 items along with their predicted scores
print(f"Top 5 recommendations for user {user_id}:")
for item_id, score in top_k_items_with_scores:
    print(f"Item ID: {item_id}, Predicted Score: {score:.4f}")

Top 5 recommendations for user 3:
Item ID: 100, Predicted Score: 0.5171
Item ID: 83126, Predicted Score: 0.5170
Item ID: 67353, Predicted Score: 0.5170
Item ID: 102429, Predicted Score: 0.5170
Item ID: 2480, Predicted Score: 0.5169


In [36]:
for i, j in top_k_items_with_scores:
    curr_img_data = ds['train'][int(i)]['image']
    if isinstance(image_data, Image.Image):  # For already decoded images
        curr_img_data.show()
    elif isinstance(image_data, bytes):  # For encoded images (e.g., byte strings)
        new_image = Image.open(io.BytesIO(curr_img_data))
        new_image.show()
    else:
        print("Image format not recognized!")

## Hybrid Model using Content-Based filtering and 1st Collaborative Filtering approach

In [37]:
def hybrid_recommendation(user_id, combined_embedding, all_embeddings, interaction_matrix, user_similarity, content_weight=0.6, collaborative_weight=0.4, top_k=5):
    """
    Hybrid recommendation system combining content-based and collaborative filtering.

    Args:
        user_id: ID of the user to recommend items for.
        combined_embedding: Weighted combined embedding for content-based recommendation.
        all_embeddings: All artwork embeddings in the dataset.
        interaction_matrix: Sparse matrix of user-item interactions.
        user_similarity: User similarity matrix.
        content_weight: Weight for content-based recommendations.
        collaborative_weight: Weight for collaborative recommendations.
        top_k: Number of recommendations to generate.

    Returns:
        List of recommended item indices.
    """
    # Content-based recommendations
    content_similarities = cosine_similarity([combined_embedding], all_embeddings)
    content_scores = content_similarities[0]

    # Collaborative filtering recommendations
    similar_users = np.argsort(-user_similarity[user_id])[1:]  # Exclude self (at index 0)
    target_user_items = set(interaction_matrix[user_id].nonzero()[1])
    collaborative_scores = np.zeros(all_embeddings.shape[0])

    # Aggregate scores from similar users
    for similar_user in similar_users:
        similarity_score = user_similarity[user_id, similar_user]
        similar_user_items = interaction_matrix[similar_user].nonzero()[1]

        for item in similar_user_items:
            if item not in target_user_items:  # Exclude already interacted items
                collaborative_scores[item] += similarity_score

    # Normalize both scores
    content_scores = content_scores / np.max(content_scores) if np.max(content_scores) > 0 else content_scores
    collaborative_scores = collaborative_scores / np.max(collaborative_scores) if np.max(collaborative_scores) > 0 else collaborative_scores

    # Combine scores using the specified weights
    final_scores = content_weight * content_scores + collaborative_weight * collaborative_scores

    # Get top-k recommendations
    recommended_items = np.argsort(-final_scores)[:top_k]
    return recommended_items

In [38]:
user_id = 2  # Example user ID
combined_embedding = previous_combined_embedding  # Use previously combined embedding
top_k = 5  # Number of recommendations

recommended_items = hybrid_recommendation(
    user_id, combined_embedding, all_embeddings, interaction_matrix, user_similarity, content_weight=0.6, collaborative_weight=0.4, top_k=top_k
)

print(f"Recommended items for user {user_id} (hybrid): {recommended_items}")

Recommended items for user 2 (hybrid): [    2    10 60455 97505 27393]


## Hybrid Model using Content-Based filtering and 2nd Collaborative Filtering approach

In [39]:
def hybrid_recommendations_ncf(user_id, model, all_embeddings, combined_embedding, k=5, content_weight=0.6, cf_weight=0.4):
    """
    Generate hybrid recommendations by combining content-based and collaborative filtering scores.
    
    Args:
        user_id: User ID for the collaborative filtering part.
        model: The trained NCF model for collaborative filtering.
        all_embeddings: Precomputed item embeddings (tensor).
        combined_embedding: Combined embedding from content-based filtering.
        k: Number of recommendations to return.
        content_weight: Weight for content-based filtering scores.
        cf_weight: Weight for collaborative filtering scores.
    
    Returns:
        List of tuples [(item_id, score)] for the top-k recommended items.
    """
    # Collaborative Filtering Scores
    user_id_tensor = torch.tensor([user_id], dtype=torch.long)
    with torch.no_grad():
        cf_scores = []
        for item_id in range(all_embeddings.shape[0]):
            item_embedding = all_embeddings[item_id].unsqueeze(0)
            score = model(user_id_tensor, item_embedding)
            cf_scores.append(score.item())
    cf_scores = np.array(cf_scores)
    
    # Content-Based Filtering Scores
    content_similarities = cosine_similarity([combined_embedding], all_embeddings.numpy())[0]
    
    # Hybrid Scores
    hybrid_scores = content_weight * content_similarities + cf_weight * cf_scores
    
    # Top-k Recommendations
    top_k_indices = np.argsort(hybrid_scores)[-k:][::-1]
    top_k_items_with_scores = [(idx, hybrid_scores[idx]) for idx in top_k_indices]
    
    return top_k_items_with_scores

In [40]:
user_id = 3  # Example user ID
current_embedding = all_embeddings_ncf[10]  # Example: embedding of the clicked artwork
previous_combined_embedding = None  # Start with no previous interaction
combined_embedding = combine_embeddings_for_recommendation(current_embedding, previous_combined_embedding, weight=0.7)

top_k_recommendations = hybrid_recommendations_ncf(
    user_id, model, all_embeddings_ncf, combined_embedding, k=5
)

print(f"Top 5 hybrid recommendations for user {user_id}:")
for item_id, score in top_k_recommendations:
    print(f"Item ID: {item_id}, Hybrid Score: {score:.4f}")

Top 5 hybrid recommendations for user 3:
Item ID: 10, Hybrid Score: 0.8064
Item ID: 60455, Hybrid Score: 0.7177
Item ID: 97505, Hybrid Score: 0.7103
Item ID: 27393, Hybrid Score: 0.7007
Item ID: 75978, Hybrid Score: 0.6975
