# Version 1 - Populate Pinecone with movie poster embeddings and metadata

V1 movie poster dataset sourced from Hugging Face:
https://huggingface.co/datasets/pinecone/movie-posters

V2 sources movies using TMDB API's popular endpoint (called in August 2024), and is a list of more popular/modern movies (and movie posters of better image quality)
https://developer.themoviedb.org/reference/movie-popular-list

In [None]:
#!pip install datasets
!pip install pinecone-client
#!pip install tmdbv3api

CLIP

In [None]:
from datasets import load_dataset
import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import requests
from io import BytesIO
import pinecone
from tmdbv3api import TMDb, Find
from google.colab import userdata
import time
from pinecone import Pinecone
from pinecone import ServerlessSpec
from google.colab import userdata

# Configuration Variables
# MODEL_NAME = "openai/clip-vit-base-patch32"
# PROCESSOR_NAME = "openai/clip-vit-base-patch32"
# NAMESPACE = "CLIP"

MODEL_NAME = "openai/clip-vit-base-patch32"
PROCESSOR_NAME = "openai/clip-vit-base-patch32"
NAMESPACE = "CLIP"

# Initialize Pinecone
pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters")

# Initialize TMDB API client
tmdb = TMDb()
tmdb.api_key = userdata.get("TMDB_API_KEY")

# Initialize the Find class
find = Find()

# Load the Pinecone movie poster dataset
ds = load_dataset("pinecone/movie-posters")

# Initialize CLIP model and processor
model = CLIPModel.from_pretrained(MODEL_NAME)
processor = CLIPProcessor.from_pretrained(PROCESSOR_NAME)

# Function to fetch an image from a URL and get its embedding
def get_image_embedding(poster_url):
    response = requests.get(poster_url)
    image = Image.open(BytesIO(response.content))
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():
        embedding = model.get_image_features(**inputs)
    return embedding.squeeze().numpy()

# Function to fetch movie metadata from TMDB API using IMDb ID
def get_movie_metadata(imdb_id):
    time.sleep(0.02)  # 50 requests per second = 1 request every 0.02 seconds so we don't overload TMDB API
    try:
        # Use the Find class to search for the movie by IMDb ID
        results = find.find(imdb_id, 'imdb_id')
        if results.movie_results:
            movie = results.movie_results[0]  # Take the first result
            return {
                'title': movie.title,
                'overview': movie.overview,
                'release_date': movie.release_date
            }
        else:
            print(f"No metadata found for IMDb ID {imdb_id}")
            return None
    except Exception as e:
        print(f"Error fetching metadata for IMDb ID {imdb_id}: {e}")
        return None

# Process the dataset and upload embeddings with metadata to Pinecone
def process_and_upload():
    for item in ds['train']:
        imdb_id = item['imdbId']
        poster_url = item['poster']

        # Fetch movie metadata
        metadata = get_movie_metadata(imdb_id)
        if metadata is None:
            continue

        # Add poster URL from dataset to metadata
        metadata['poster_url'] = poster_url

        # Generate image embedding
        try:
            embedding = get_image_embedding(poster_url)
        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")
            continue

        # Upload to Pinecone with namespace and metadata as a dictionary
        index.upsert(
            vectors=[(imdb_id, embedding, metadata)],
            namespace=NAMESPACE
        )

#process_and_upload()

In [None]:
get_movie_metadata("tt6087562")

In [None]:
# Pinecone and create an index
from pinecone import Pinecone
from pinecone import ServerlessSpec
from google.colab import userdata

pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))

# pc.create_index(
#     name="movie-posters",
#     dimension=512, # dimensions from CLIP
#     metric="cosine",
#     spec=ServerlessSpec(
#         cloud="aws",
#         region="us-east-1"
#     )
# )

# Access the existing index
index = pc.Index("movie-posters")


In [None]:
#index.delete(delete_all=True, namespace='')

In [None]:
print(index.describe_index_stats())

ResNet

In [None]:
from datasets import load_dataset
import torch
from torchvision import models, transforms
from PIL import Image
import requests
from io import BytesIO
import pinecone
from tmdbv3api import TMDb, Find
from google.colab import userdata
import time

# Configuration Variables
MODEL_NAME = "resnet50"  # Using ResNet50 model
NAMESPACE = "ResNet-50"
VECTOR_DIM = 512  # Limiting to 512 to match CLIP dimensions (Pinecone free plan has only one index)

# Initialize Pinecone
pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters")

# Initialize TMDB API client
tmdb = TMDb()
tmdb.api_key = userdata.get("TMDB_API_KEY")

# Initialize the Find class
find = Find()

# Load the Pinecone movie poster dataset
ds = load_dataset("pinecone/movie-posters")

# Initialize ResNet model and modify it to produce 512-dimensional embeddings
class ResNet50WithEmbedding(torch.nn.Module):
    def __init__(self, embedding_dim):
        super(ResNet50WithEmbedding, self).__init__()
        self.base_model = models.resnet50(pretrained=True)
        # Remove the final fully connected layer
        self.base_model = torch.nn.Sequential(*list(self.base_model.children())[:-1])
        # Add a new fully connected layer
        self.fc = torch.nn.Linear(in_features=2048, out_features=embedding_dim)

    def forward(self, x):
        x = self.base_model(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.fc(x)
        return x

model = ResNet50WithEmbedding(VECTOR_DIM)
model.eval()

# Define image preprocessing
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Function to fetch an image from a URL and get its embedding
def get_image_embedding(poster_url):
    response = requests.get(poster_url)
    image = Image.open(BytesIO(response.content)).convert('RGB')
    inputs = preprocess(image).unsqueeze(0)  # Add batch dimension
    with torch.no_grad():
        embedding = model(inputs).squeeze().numpy()
    return embedding

# Function to fetch movie metadata from TMDB API using IMDb ID
def get_movie_metadata(imdb_id):
    time.sleep(0.02)  # 50 requests per second = 1 request every 0.02 seconds so we don't overload TMDB API
    try:
        results = find.find(imdb_id, 'imdb_id')
        if results.movie_results:
            movie = results.movie_results[0]  # Take the first result
            return {
                'title': movie.title,
                'overview': movie.overview,
                'release_date': movie.release_date
            }
        else:
            print(f"No metadata found for IMDb ID {imdb_id}")
            return None
    except Exception as e:
        print(f"Error fetching metadata for IMDb ID {imdb_id}: {e}")
        return None

# Process the dataset and upload embeddings with metadata to Pinecone
def process_and_upload():
    for item in ds['train']:
        imdb_id = item['imdbId']
        poster_url = item['poster']

        # Fetch movie metadata
        metadata = get_movie_metadata(imdb_id)
        if metadata is None:
            continue

        # Add poster URL from dataset to metadata
        metadata['poster_url'] = poster_url

        # Generate image embedding
        try:
            embedding = get_image_embedding(poster_url)
        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")
            continue

        # Upload to Pinecone with the specified namespace and metadata as a dictionary
        index.upsert(
            vectors=[(imdb_id, embedding.tolist(), metadata)],
            namespace=NAMESPACE
        )

#process_and_upload()

VGG16

In [None]:
import torch
from torchvision import models, transforms
from PIL import Image
import requests
from io import BytesIO
import pinecone
from tmdbv3api import TMDb, Find
from google.colab import userdata
import time

# Configuration Variables
VECTOR_DIM = 512  # Dimension for Pinecone index, limited to 512 by CLIP, and only one index allowed on free tier Pinecone
NAMESPACE = "VGG16"

# Initialize Pinecone
pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters")

# Initialize TMDB API client
tmdb = TMDb()
tmdb.api_key = userdata.get("TMDB_API_KEY")

# Initialize the Find class
find = Find()

# Load the Pinecone movie poster dataset
ds = load_dataset("pinecone/movie-posters")

# Initialize VGG16 model and modify it to produce 512-dimensional embeddings
class VGG16WithEmbedding(torch.nn.Module):
    def __init__(self, embedding_dim):
        super(VGG16WithEmbedding, self).__init__()
        self.base_model = models.vgg16(pretrained=True)
        # Remove the final fully connected layers
        self.base_model.classifier = torch.nn.Sequential(*list(self.base_model.classifier.children())[:-3])
        # Add a new fully connected layer
        self.fc = torch.nn.Linear(in_features=4096, out_features=embedding_dim)

    def forward(self, x):
        x = self.base_model(x)
        x = self.fc(x)
        return x

model = VGG16WithEmbedding(VECTOR_DIM)
model.eval()

# Define image preprocessing
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Function to fetch an image from a URL and get its embedding
def get_image_embedding(poster_url):
    response = requests.get(poster_url)
    image = Image.open(BytesIO(response.content)).convert('RGB')
    inputs = preprocess(image).unsqueeze(0)  # Add batch dimension
    with torch.no_grad():
        embedding = model(inputs).squeeze().numpy()
    return embedding

# Function to fetch movie metadata from TMDB API using IMDb ID
def get_movie_metadata(imdb_id):
    time.sleep(0.02)  # 50 requests per second = 1 request every 0.02 seconds so we don't overload TMDB API
    try:
        results = find.find(imdb_id, 'imdb_id')
        if results.movie_results:
            movie = results.movie_results[0]  # Take the first result
            return {
                'title': movie.title,
                'overview': movie.overview,
                'release_date': movie.release_date
            }
        else:
            print(f"No metadata found for IMDb ID {imdb_id}")
            return None
    except Exception as e:
        print(f"Error fetching metadata for IMDb ID {imdb_id}: {e}")
        return None

# Process the dataset and upload embeddings with metadata to Pinecone
def process_and_upload():
    for item in ds['train']:
        imdb_id = item['imdbId']
        poster_url = item['poster']

        # Fetch movie metadata
        metadata = get_movie_metadata(imdb_id)
        if metadata is None:
            continue

        # Add poster URL from dataset to metadata
        metadata['poster_url'] = poster_url

        # Generate image embedding
        try:
            embedding = get_image_embedding(poster_url)
        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")
            continue

        # Upload to Pinecone with the specified namespace and metadata as a dictionary
        index.upsert(
            vectors=[(imdb_id, embedding.tolist(), metadata)],
            namespace=NAMESPACE
        )

#process_and_upload()


# Mapping from imdb id to movie name
##(for the user to be able to query on only movie names which exist in the dataset)

In [None]:
# pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
# index = pc.Index("movie-posters")
# namespaces =['ResNet-50', 'VGG16', 'CLIP']

# mapping = {}
# for ns in namespaces:
#   for ids in index.list(namespace=ns):
#     #print(ids)
#     res = index.fetch(ids=ids, namespace=ns)

#     vectors = res['vectors']
#     for k in vectors:
#       title = vectors[k]['metadata']['title']
#       if title not in mapping:
#         mapping[title] = k
# print(mapping)

In [None]:
# print(len(mapping))

Testing embedding results:

In [None]:
import random

# Create a dictionary for quick lookup of poster URLs by IMDb ID
id_to_poster = {item['imdbId']: item['poster'] for item in ds['train']}

# Choose a random movie from the dataset
random_movie = random.choice(ds['train'])

# Extract the IMDb ID and poster URL of the random movie
specific_imdb_id = random_movie['imdbId']

id_to_poster = {item['imdbId']: item['poster'] for item in ds['train']}

# Look up the movie in the dataset using the specific IMDb ID
if specific_imdb_id in id_to_poster:
    poster_url = id_to_poster[specific_imdb_id]

    # Get the embedding of the specified movie (use the function get_image_embedding)
    embedding = get_image_embedding(poster_url)  # Assuming you have already defined this function

    # Query Pinecone to get the top 10 closest movie posters
    result = index.query(vector=embedding.tolist(), top_k=10, include_metadata=True, namespace="CLIP")

    # Print the specified movie and the top 10 similar movies with metadata
    print(f"Specified Movie IMDb ID: {specific_imdb_id}")
    print(f"Poster URL: {poster_url}")
    print("Top 10 Similar Movies:")
    for match in result['matches']:
        similar_imdb_id = match['id']
        similar_poster_url = id_to_poster.get(similar_imdb_id, "URL not found")
        # Access the metadata
        metadata = match.get('metadata', {})
        title = metadata.get('title', "Title not found")
        overview = metadata.get('overview', "Overview not found")
        release_date = metadata.get('release_date', "Release date not found")
        poster_url = metadata.get('poster_url', "Poster URL not found")

        print(f"IMDb ID: {similar_imdb_id}, Score: {match['score']}")
        print(f"Title: {title}")
        print(f"Overview: {overview}")
        print(f"Release Date: {release_date}")
        print(f"Poster URL: {poster_url}")
else:
    print(f"IMDb ID {specific_imdb_id} not found in the dataset.")


In [None]:
# Specify the IMDb ID of the movie you want to use
specific_imdb_id = 'tt3638686'

id_to_poster = {item['imdbId']: item['poster'] for item in ds['train']}

# Look up the movie in the dataset using the specific IMDb ID
if specific_imdb_id in id_to_poster:
    poster_url = id_to_poster[specific_imdb_id]

    # Get the embedding of the specified movie (use the function get_image_embedding)
    embedding = get_image_embedding(poster_url)  # Assuming you have already defined this function

    # Query Pinecone to get the top 10 closest movie posters
    result = index.query(vector=embedding.tolist(), top_k=10, include_metadata=True, namespace="CLIP")

    # Print the specified movie and the top 10 similar movies with metadata
    print(f"Specified Movie IMDb ID: {specific_imdb_id}")
    print(f"Poster URL: {poster_url}")
    print("Top 10 Similar Movies:")
    for match in result['matches']:
        similar_imdb_id = match['id']
        similar_poster_url = id_to_poster.get(similar_imdb_id, "URL not found")
        # Access the metadata
        metadata = match.get('metadata', {})
        title = metadata.get('title', "Title not found")
        overview = metadata.get('overview', "Overview not found")
        release_date = metadata.get('release_date', "Release date not found")
        poster_url = metadata.get('poster_url', "Poster URL not found")

        print(f"IMDb ID: {similar_imdb_id}, Score: {match['score']}")
        print(f"Title: {title}")
        print(f"Overview: {overview}")
        print(f"Release Date: {release_date}")
        print(f"Poster URL: {poster_url}")
else:
    print(f"IMDb ID {specific_imdb_id} not found in the dataset.")


Test TMDB API

In [None]:
import requests

tmdb_api_key = userdata.get('TMDB_API_KEY')

# Function to get movie details from TMDB using IMDb ID
def get_movie_details(imdb_id):
    url = f"https://api.themoviedb.org/3/find/{imdb_id}"
    params = {
        'api_key': tmdb_api_key,
        'external_source': 'imdb_id'
    }
    response = requests.get(url, params=params)
    data = response.json()

    # print("respones data")
    # print(data)

    if response.status_code == 200 and 'movie_results' in data:
        if len(data['movie_results']) > 0:
            return data['movie_results'][0]  # Return the first movie result
        else:
            print("No movie found for this IMDb ID.")
            return None
    else:
        print(f"Error: {response.status_code}")
        return None

# Example usage:
imdb_id = 'tt2514894'  # Example IMDb ID
movie_details = get_movie_details(imdb_id)

print(movie_details)

if movie_details:
    print("Title:", movie_details.get('title'))
    print("Overview:", movie_details.get('overview'))
    print("Release Date:", movie_details.get('release_date'))
    print("Poster Path:", id_to_poster.get(imdb_id, "URL not found"))


# Version 2 - Populate separate indices using "popular" TMDB endpoint

In [None]:
# Pinecone and create an index
from pinecone import Pinecone
from pinecone import ServerlessSpec
from google.colab import userdata

pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))

pc.create_index(
    name="movie-posters-v2-clip",
    dimension=768, # dimensions from CLIP
    metric="cosine",
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    )
)

# pc.create_index(
#     name="movie-posters-v2-resnet-50",
#     dimension=2048,
#     metric="euclidean",
#     spec=ServerlessSpec(
#         cloud="aws",
#         region="us-east-1"
#     )
# )

# pc.create_index(
#     name="movie-posters-v2-vgg16",
#     dimension=4096,
#     metric="euclidean",
#     spec=ServerlessSpec(
#         cloud="aws",
#         region="us-east-1"
#     )
# )

# Access the existing index
#index = pc.Index("movie-posters")

Getting top 10,000 popular movies (limited by page 500 max for tmdb, 20 results per page)

In [None]:
import requests
from google.colab import userdata
import json
import pprint
import time

def get_popular_page(page, TMDB_AUTH):
  '''
  Returns results page as python dict
  20 movie results per page
  '''
  time.sleep(0.02)  # 50 requests per second = 1 request every 0.02 seconds so we don't overload TMDB API
  url = f"https://api.themoviedb.org/3/movie/popular?language=en-US&page={page}"
  headers = {
    "accept": "application/json",
    "Authorization": TMDB_AUTH
  }
  try:
    response = requests.get(url, headers=headers)
  except Exception as e:
    print(f"Problem getting response for page {page} {e}")
    return None
  response_dict = json.loads(response.text)
  return response_dict['results']

In [None]:
TMDB_AUTH = userdata.get("TMDB_AUTHORIZATION")

movies = []
page_max = 2501 # not inclusive
for page in range(1, page_max):
  results = None
  try:
    results = get_popular_page(page, TMDB_AUTH)
  except Exception as e:
    print(f"Broke at {page} with error {e}")
    page_max = page
    break
  if results:
    movies.extend(results)

json_path = f'/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_{page_max}.json'

with open(json_path, 'w') as f:
    json.dump(movies, f)


# # Load the movies list from the JSON file
# with open(json_path, 'r') as f:
#     movies = json.load(f)

# pp = pprint.PrettyPrinter(indent=4)
# #print(len(movies))
# #pp.pprint(movies)
# print(len(movies))
# print(json.dumps(movies))

# https://image.tmdb.org/t/p/original/[poster_path]

# https://image.tmdb.org/t/p/original
# https://image.tmdb.org/t/p/original/vKVUsumbCzK5Kn3aDpKM4EizKCA.jpg
# https://image.tmdb.org/t/p/original/7WsyChQLEftFiDOVTGkv3hFpyyt.jpg

In [None]:
json_path = f'/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_501.json'

# Load the movies list from the JSON file
with open(json_path, 'r') as f:
    movies = json.load(f)

for i, m in enumerate(movies):
  print(m)
  if i == 5:
    break


CLIP Processing on each popular movie

# Version 2 - 3 models (CLIP, VGG16, ResNet-50)

In [None]:
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import requests
import json
from io import BytesIO
import torch
import pinecone
from pinecone import Pinecone
from google.colab import userdata
import time

# Load the model and processor
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
NAMESPACE = "clip-vit-large-patch14"

pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters-v2-clip")

json_path = f'/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_501.json'

# Load the movies list from the JSON file
with open(json_path, 'r') as f:
    movies = json.load(f)

def get_image_embedding(poster_url):
    # Fetch the image from the URL
    try:
      poster_url = "https://image.tmdb.org/t/p/original" + poster_url
      response = requests.get(poster_url)
      image = Image.open(BytesIO(response.content)).convert('RGB')

      # Preprocess the image
      inputs = processor(images=image, return_tensors="pt")

      # Generate image embedding
      with torch.no_grad():
          outputs = model.get_image_features(**inputs)
          embedding = outputs.squeeze().numpy()

      return embedding
    except Exception as e:
      print(f"Failed to get embedding for {poster_url}: {e}")
      return None

def prepare_metadata(movie):
    return {
        "adult": movie.get("adult", False),
        "backdrop_path": movie.get("backdrop_path", "empty") if movie.get("backdrop_path") is not None else "empty",
        "genre_ids": ','.join(map(str, movie.get("genre_ids", []))),
        "id": str(movie.get("id", "")),
        "original_language": movie.get("original_language", ""),
        "original_title": movie.get("original_title", ""),
        "overview": movie.get("overview", ""),
        "popularity": movie.get("popularity", 0.0),
        "poster_path": movie.get("poster_path", ""),
        "release_date": movie.get("release_date", ""),
        "title": movie.get("title", ""),
        "video": movie.get("video", False),
        "vote_average": movie.get("vote_average", 0.0),
        "vote_count": movie.get("vote_count", 0)
    }

def process_and_upload():
    for i in range(0, 10001):
    #for i, movie in enumerate(movies):
        movie = movies[i]
        imdb_id = str(movie.get('id', '')) # Ensure ID is a string
        poster_url = movie.get('poster_path', '')
        print(f"Processing {imdb_id} - {movie.get('title')}")

        # Generate image embedding
        try:
            embedding = get_image_embedding(poster_url)
        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")
            continue

        metadata = prepare_metadata(movie)
        #print(metadata)

        try:
        # Upload to Pinecone with the specified namespace and metadata as a dictionary
          index.upsert(
              vectors=[(imdb_id, embedding.tolist(), metadata)],
              namespace=NAMESPACE
          )
        except Exception as e:
            print(f"Failed to upload to Pinecone: {e}")

process_and_upload()

In [None]:
print("done")

In [None]:
!pip install timm torch torchvision pinecone-client requests

In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.models import vgg16
from PIL import Image
from urllib.request import urlopen
import numpy as np
import pinecone
from pinecone import Pinecone
import json
from google.colab import userdata

# Load the VGG16 model with its classifier
model = vgg16(pretrained=True)
model.eval()

# Remove the final classification layer
model.classifier = torch.nn.Sequential(*list(model.classifier.children())[:-1])

# Define the transform for input images
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

NAMESPACE = "vgg16-tv-in1k"
pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters-v2-vgg16")

json_path = '/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_501.json'

# Load the movies list from the JSON file
with open(json_path, 'r') as f:
    movies = json.load(f)

def get_image_embedding(poster_url):
    try:
        poster_url = "https://image.tmdb.org/t/p/original" + poster_url
        img = Image.open(urlopen(poster_url)).convert('RGB')  # Ensure image is in RGB format
        input_tensor = transform(img).unsqueeze(0)  # Add batch dimension

        with torch.no_grad():
            # Forward pass through the model
            features = model(input_tensor)
            embedding = features.squeeze().numpy()  # Remove batch dimension and convert to NumPy array

        #print(f"Embedding shape: {embedding.shape}")  # Print shape to confirm size

        if embedding.shape[0] != 4096:
            raise ValueError(f"Embedding dimension mismatch: Expected 4096, but got {embedding.shape[0]}")

        # Convert to list of floats
        embedding_list = embedding.tolist()  # Convert NumPy array to list of floats

        return embedding_list  # Return as a list of floats
    except Exception as e:
        print(f"Failed to get embedding for {poster_url}: {e}")
        return None

def prepare_metadata(movie):
    return {
        "adult": movie.get("adult", False),
        "backdrop_path": movie.get("backdrop_path", "empty") if movie.get("backdrop_path") is not None else "empty",
        "genre_ids": ','.join(map(str, movie.get("genre_ids", []))),
        "id": str(movie.get("id", "")),
        "original_language": movie.get("original_language", ""),
        "original_title": movie.get("original_title", ""),
        "overview": movie.get("overview", ""),
        "popularity": movie.get("popularity", 0.0),
        "poster_path": movie.get("poster_path", ""),
        "release_date": movie.get("release_date", ""),
        "title": movie.get("title", ""),
        "video": movie.get("video", False),
        "vote_average": movie.get("vote_average", 0.0),
        "vote_count": movie.get("vote_count", 0)
    }

def process_and_upload():
    for i in range(0, 10001):
        movie = movies[i]
        imdb_id = str(movie.get('id', ''))
        poster_url = movie.get('poster_path', '')
        print(f"Processing {i} - {imdb_id} - {movie.get('title')}")

        try:
            embedding = get_image_embedding(poster_url)
            if embedding is None:
                continue

            metadata = prepare_metadata(movie)

            try:
                # Upload to Pinecone with the specified namespace and metadata as a dictionary
                index.upsert(
                    vectors=[(imdb_id, embedding, metadata)],  # Ensure embedding is a list of floats
                    namespace=NAMESPACE
                )
            except Exception as e:
                print(f"Failed to upload to Pinecone: {e}")

        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")

process_and_upload()

# reprocessed first movie

resnet50

In [None]:
from urllib.request import urlopen
import torch
from transformers import AutoModel, AutoImageProcessor
from PIL import Image
import json
import pinecone
from google.colab import userdata
import numpy as np

# Load the ResNet-50 model and feature extractor
model_name = "microsoft/resnet-50"
model = AutoModel.from_pretrained(model_name)
image_processor = AutoImageProcessor.from_pretrained(model_name)

# Set model to evaluation mode
model.eval()

NAMESPACE = "resnet50"
pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
index = pc.Index("movie-posters-v2-resnet-50")

json_path = '/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_501.json'

# Load the movies list from the JSON file
with open(json_path, 'r') as f:
    movies = json.load(f)

def get_image_embedding(poster_url):
    try:
        poster_url = "https://image.tmdb.org/t/p/original" + poster_url
        img = Image.open(urlopen(poster_url))

        # Preprocess the image
        inputs = image_processor(images=img, return_tensors="pt")

        # Generate embeddings
        with torch.no_grad():
            outputs = model(**inputs)
            pooled_output = outputs.pooler_output  # Get the pooled output
            embedding = pooled_output.numpy()

        #print(f"Embedding shape: {embedding.shape}")  # Print shape to confirm size

        # Convert to list of floats
        if isinstance(embedding, np.ndarray):
            embedding_list = embedding.flatten().tolist()
        else:
            embedding_list = list(map(float, embedding))

        return embedding_list
    except Exception as e:
        print(f"Failed to get embedding for {poster_url}: {e}")
        return None

def prepare_metadata(movie):
    return {
        "adult": movie.get("adult", False),
        "backdrop_path": movie.get("backdrop_path", "empty") if movie.get("backdrop_path") is not None else "empty",
        "genre_ids": ','.join(map(str, movie.get("genre_ids", []))),
        "id": str(movie.get("id", "")),
        "original_language": movie.get("original_language", ""),
        "original_title": movie.get("original_title", ""),
        "overview": movie.get("overview", ""),
        "popularity": movie.get("popularity", 0.0),
        "poster_path": movie.get("poster_path", ""),
        "release_date": movie.get("release_date", ""),
        "title": movie.get("title", ""),
        "video": movie.get("video", False),
        "vote_average": movie.get("vote_average", 0.0),
        "vote_count": movie.get("vote_count", 0)
    }

def process_and_upload():
    for i in range(0, 10001):
        movie = movies[i]
        imdb_id = str(movie.get('id', ''))
        poster_url = movie.get('poster_path', '')
        print(f"Processing {imdb_id} - {movie.get('title')}")

        try:
            embedding = get_image_embedding(poster_url)
            if embedding is None:
                continue

        except Exception as e:
            print(f"Failed to get embedding for {poster_url}: {e}")
            continue

        metadata = prepare_metadata(movie)

        try:
            index.upsert(
                vectors=[(imdb_id, embedding, metadata)],
                namespace=NAMESPACE
            )
        except Exception as e:
            print(f"Failed to upload to Pinecone: {e}")

process_and_upload()

# reprocessed first movie

Tried batches below instead of single instance - found that batching does not significantly change processing time for CLIP

In [None]:
# from transformers import CLIPProcessor, CLIPModel
# from PIL import Image
# import requests
# import json
# from io import BytesIO
# import torch
# import pinecone
# from pinecone import Pinecone
# from google.colab import userdata
# import time

# # Load the model and processor
# model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
# processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
# NAMESPACE = "clip-vit-large-patch14"

# pc = Pinecone(api_key=userdata.get('PINECONE_KEY'))
# index = pc.Index("movie-posters-v2-clip")

# json_path = f'/content/drive/My Drive/Colab_Notebooks/popular_tmdb_endpoint_until_page_501.json'

# # Load the movies list from the JSON file
# with open(json_path, 'r') as f:
#     movies = json.load(f)

# def get_image_embedding(poster_url):
#     # Fetch the image from the URL
#     try:
#       poster_url = "https://image.tmdb.org/t/p/original" + poster_url
#       response = requests.get(poster_url)
#       image = Image.open(BytesIO(response.content)).convert('RGB')

#       # Preprocess the image
#       inputs = processor(images=image, return_tensors="pt")

#       # Generate image embedding
#       with torch.no_grad():
#           outputs = model.get_image_features(**inputs)
#           embedding = outputs.squeeze().numpy()

#       return embedding
#     except Exception as e:
#       print(f"Failed to get embedding for {poster_url}: {e}")
#       return None

# def get_image_embedding_batch(poster_urls):
#     try:
#         images = []
#         for poster_url in poster_urls:
#             poster_url = "https://image.tmdb.org/t/p/original" + poster_url
#             response = requests.get(poster_url)
#             image = Image.open(BytesIO(response.content)).convert('RGB')
#             images.append(image)

#         # Preprocess the images as a batch
#         inputs = processor(images=images, return_tensors="pt")

#         # Generate image embeddings for the batch
#         with torch.no_grad():
#             outputs = model.get_image_features(**inputs)
#             embeddings = outputs.squeeze().numpy()

#         return embeddings
#     except Exception as e:
#         print(f"Failed to get embedding for batch: {e}")
#         return None

# def prepare_metadata(movie):
#     return {
#         "adult": movie.get("adult", False),
#         "backdrop_path": movie.get("backdrop_path", "empty") if movie.get("backdrop_path") is not None else "empty",
#         "genre_ids": ','.join(map(str, movie.get("genre_ids", []))),
#         "id": str(movie.get("id", "")),
#         "original_language": movie.get("original_language", ""),
#         "original_title": movie.get("original_title", ""),
#         "overview": movie.get("overview", ""),
#         "popularity": movie.get("popularity", 0.0),
#         "poster_path": movie.get("poster_path", ""),
#         "release_date": movie.get("release_date", ""),
#         "title": movie.get("title", ""),
#         "video": movie.get("video", False),
#         "vote_average": movie.get("vote_average", 0.0),
#         "vote_count": movie.get("vote_count", 0)
#     }

# def process_and_upload():
#   batch_size = 16
#   for i in range(3904, len(movies), batch_size):
#       batch_movies = movies[i:i+batch_size]
#       poster_urls = [movie.get('poster_path', '') for movie in batch_movies]
#       embeddings = get_image_embedding_batch(poster_urls)

#       if embeddings is not None:
#           for j, movie in enumerate(batch_movies):
#               imdb_id = str(movie.get('id', ''))
#               print(f"Processing {imdb_id} - {movie.get('title')}")
#               metadata = prepare_metadata(movie)
#               index.upsert(
#                   vectors=[(imdb_id, embeddings[j].tolist(), metadata)],
#                   namespace=NAMESPACE
#               )
# process_and_upload()