Dataset Loading

In [5]:
from datasets import load_dataset

# Load the Parquet file from Hugging Face
dataset = load_dataset(
    "parquet",
    data_files={
        "train": "https://huggingface.co/datasets/SathvikVeerapaneni7/CineAI_Dataset/resolve/main/parquet_files/details_df_clean.parquet"
    },
    split="train"
)

# Convert to a Pandas DataFrame
df_details = dataset.to_pandas()
df_details.head()


Unnamed: 0,movie_id,title,overview,release_date,runtime,original_language,popularity,genres_str
0,98.0,Gladiator,"In the year 180, the death of Emperor Marcus A...",2000-05-04,155.0,en,497.649,['Action' 'Drama' 'Adventure']
1,8871.0,How the Grinch Stole Christmas,The Grinch decides to rob Whoville of Christma...,2000-11-17,104.0,en,177.539,['Family' 'Comedy' 'Fantasy']
2,7443.0,Chicken Run,The creators of Wallace & Gromit bring you an ...,2000-06-23,84.0,en,62.638,['Animation' 'Comedy' 'Family']
3,9532.0,Final Destination,After a teenager has a terrifying vision of hi...,2000-03-17,98.0,en,54.097,['Horror']
4,77.0,Memento,Leonard Shelby is tracking down the man who ra...,2000-10-11,113.0,en,95.374,['Mystery' 'Thriller']


In [6]:
#combining title,overview,genres_str into a single text

first_title = df_details.loc[0, "title"]
first_overview = df_details.loc[0, "overview"]
first_genres = df_details.loc[0, "genres_str"]

combined_text = f"{first_title}. {first_overview}. Genres: {first_genres}"
print(combined_text)

Gladiator. In the year 180, the death of Emperor Marcus Aurelius throws the Roman Empire into chaos. Maximus is one of the Roman army's most capable and trusted generals, as well as a key advisor to the emperor. As Marcus' devious son Commodus ascends to the throne, Maximus is sentenced to execution. He escapes but is captured by slave traders. Renamed "Spaniard" and forced to become a gladiator, Maximus must battle to the death against other men for the amusement of paying audiences.. Genres: ['Action' 'Drama' 'Adventure']


In [7]:
df_details.columns

Index(['movie_id', 'title', 'overview', 'release_date', 'runtime',
       'original_language', 'popularity', 'genres_str'],
      dtype='object')

New data frame 'df_combined' after filtering

In [35]:
import re
import pandas as pd

def clean_genres(raw_genres: str) -> str:
    """
    Removes brackets, quotes, and splits on whitespace to produce
    a comma-separated string of genres.
    Example: "['Animation' 'Comedy' 'Family']" -> "Animation, Comedy, Family"
    """
    # 1) Strip leading/trailing square brackets [ ], and remove ' or "
    cleaned = raw_genres.strip("[]").replace("'", "").replace('"', "")
    # 2) Split on any whitespace -> list of genres
    genres_list = re.split(r"\s+", cleaned.strip())
    # 3) Join them with ", " -> "Animation, Comedy, Family"
    genres_str = ", ".join(genres_list)
    return genres_str

def build_combined_text(row):
    """
    Builds a descriptive string using title, overview, and cleaned genres.
    Example outcome:
    "Movie name is <title>. Its description is <overview>. Movie genres are <genre_1, genre_2, ...>"
    """
    title = row["title"]
    overview = row["overview"]
    genres_str = clean_genres(row["genres_str"])
    
    return (
        f"Movie name is {title}. "
        f"Its description is {overview}. "
        f"Movie genres are {genres_str}"
    )

# Create a new DataFrame (df_combined) with 'movie_id' and 'combined_text'
df_combined = pd.DataFrame()
df_combined["movie_id"] = df_details["movie_id"]
df_combined["release_date"] = df_details["release_date"]
df_combined["runtime"] = df_details["runtime"]
df_combined["combined_text"] = df_details.apply(build_combined_text, axis=1)


In [37]:
print(df_combined.head(3))

   movie_id release_date  runtime                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          combined_text
0      98.0   2000-05-04    155.0  Movie name is Gladiator. Its description is In the year 180, the death of Emperor Marcus Aurelius throws the Roman Empire into chaos. Maximus is one of the Roman army's most capable and trusted generals, as well as a key advisor to the emperor. As Marcus' devious son Commodus ascends to the throne, Maximus is sentenced to execution. He escapes but is captured by

In [40]:
df_combined.shape

(230586, 4)

In [None]:
# pd.to_datetime(df_combined['release_date'], errors='raise')

In [84]:
#Increase the output cell sizes

import pandas as pd


pd.set_option('display.max_columns', 40)
pd.set_option('display.max_rows', 40)
pd.set_option('display.max_colwidth', 40)
pd.set_option('display.width', 40)

In [45]:
# df_combined[df_combined['movie_id']==7290.0]

movie_collection_test_sample_1  - 1st sample of the movie collection dataset with payload

Qdrant Collection

In [None]:
#Qdrant Testing

from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance

# Connect to Qdrant
qdrant_client = QdrantClient(url="http://localhost:6333")

# Create or reset a test collection
collection_name = "movie_collection_test_sample_1"


qdrant_client.recreate_collection(
    collection_name=collection_name, 
    vectors_config=VectorParams(size=384, distance="Cosine"
    ))

print(f"Collection '{collection_name}' ready for 384-dim embeddings.")


Collection 'movie_collection_test_sample_1' ready for 384-dim embeddings.


  qdrant_client.recreate_collection(collection_name=collection_name, vectors_config=VectorParams(size=384, distance="Cosine"))


Ray Franework

In [14]:
import ray
import torch
from sentence_transformers import SentenceTransformer

In [15]:
# Shut down any existing Ray session
ray.shutdown()

# Start Ray (optionally with a dashboard)
ray.init(include_dashboard=True)

2024-12-27 11:56:18,786	INFO worker.py:1812 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.4
Ray version:,2.40.0
Dashboard:,http://127.0.0.1:8265


In [16]:
device = torch.device("mps")

In [19]:
@ray.remote
class EmbeddingActor:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        # Load the SentenceTransformer on MPS
        self.model = SentenceTransformer(model_name, device="mps")

    def embed_texts(self, texts):
        # Return Python list of lists for Qdrant
        embeddings = self.model.encode(
            texts, 
            convert_to_numpy=True,  # -> NumPy array
            show_progress_bar=False
        )
        # Convert to list of floats per row
        return [emb.tolist() for emb in embeddings]

In [94]:
df_combined[df_combined['movie_id']==10637.0]

Unnamed: 0,movie_id,release_date,runtime,combined_text
20,10637.0,2000-09-29,113.0,Movie name is Remember the Titans. I...


In [93]:
row

movie_id                                         10637.0
release_date                         2000-09-29 00:00:00
runtime                                            113.0
combined_text    Movie name is Remember the Titans. I...
Name: 20, dtype: object

In [97]:
df_test_2.iloc[20]['release_date']
print(df_test_2.iloc[20]['release_date'].strftime('%Y-%m-%d'))

2000-09-29


In [100]:
row = df_test_2.iloc[10]
text_list = [row['combined_text']]

emb_ref=embed_actor.embed_texts.remote(text_list)
final_emb_test=ray.get(emb_ref)

Single Vector

In [65]:
# final_emb_test[0]

Single User with payload to qdrant with other metadata


such as, runtime of movie

In [101]:
# # testing with on upsert
# import uuid
# from qdrant_client.http.models import PointStruct

# doc_id=str(uuid.uuid4())
# # everytime it creates new string

# print(doc_id)

# points=[]

# points.append(PointStruct(
#             id=doc_id,                 # Must be a valid int or a UUID string
#             vector=final_emb_test[0],
#             payload={
#                 "movie_id": row["movie_id"],
#                 "runtime": row["runtime"],
#                 "release_date": row["release_date"].strftime('%Y-%m-%d'),
#                 "combined_text": row["combined_text"]
#             }
#         ))


b1a51e7b-0ed1-482c-9a08-8c32fa97c00d


In [102]:
# 4) Upsert into Qdrant
qdrant_client.upsert(collection_name="movie_collection_test_2_with_metadata_payloads", points=points)

UpdateResult(operation_id=5, status=<UpdateStatus.COMPLETED: 'completed'>)

In [115]:
# Rows upsert to Qdrant

import uuid
from qdrant_client.http.models import PointStruct

def upsert_batch_in_chunks(df_subset, embed_actor, qdrant_client, collection_name, batch_size=100):
    total_rows = len(df_subset)
    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        batch_df = df_subset.iloc[start_idx:end_idx]

        text_list = batch_df["combined_text"].tolist()
        embedding_ref = embed_actor.embed_texts.remote(text_list)
        embeddings = ray.get(embedding_ref)

        points = []
        for i, row in batch_df.iterrows():
            doc_id = str(uuid.uuid4())

            # Handle NaT (missing dates)
            if pd.notnull(row["release_date"]):
                release_date_str = row["release_date"].strftime('%Y-%m-%d')
            else:
                release_date_str = None

            points.append(PointStruct(
                id=doc_id,
                vector=embeddings[i - start_idx],
                payload={
                    "movie_id": row["movie_id"],
                    "runtime": row["runtime"],
                    "release_date": release_date_str,
                    "combined_text": row["combined_text"]
                }
            ))

        qdrant_client.upsert(collection_name=collection_name, points=points)
        print(f"Upserted rows {start_idx + 1} to {end_idx} of {total_rows}.")

In [116]:
# df_test_2 = df_combined.head(100).copy()

In [117]:
# df_test_2.head(5)

In [None]:
# Create the Ray actor
embed_actor = EmbeddingActor.remote()


# Upsert the test batch
upsert_batch_in_chunks(
    df_subset=df_combined,
    embed_actor=embed_actor,
    qdrant_client=qdrant_client,
    collection_name=collection_name,
    batch_size=100
)

Testing

In [121]:
# Query Test
# Step 1: If using Ray actor, embed the query
test_query = "Movie released in the year 2001"
query_embedding_ref = embed_actor.embed_texts.remote([test_query])
query_embedding = ray.get(query_embedding_ref)[0]

# Step 2: Qdrant search
results = qdrant_client.search(
    collection_name="movie_collection_test_sample_1",
    query_vector=query_embedding,
    limit=2
)

# Step 3: Display results
for i, r in enumerate(results):
    print(f"Rank {i+1}, Score: {r.score}")
    print(f"  ID: {r.id}")
    print(f"  Payload: {r.payload}")
    print()


Rank 1, Score: 0.6742587
  ID: cecb522e-6dfd-462b-83da-0441de527f5c
  Payload: {'movie_id': 383224.0, 'runtime': 0.0, 'release_date': '2011-08-23', 'combined_text': 'Movie name is 11 septembre 2001. Its description is . Movie genres are Documentary, History'}

Rank 2, Score: 0.63421804
  ID: 5e1e0c7e-ee35-4c53-acdc-f030209103f9
  Payload: {'movie_id': 710172.0, 'runtime': 4.0, 'release_date': '2001-08-15', 'combined_text': 'Movie name is 2001. Its description is A short animation visualizing images of the year 2001.. Movie genres are Animation'}



Query Sample

In [None]:
from collections import defaultdict

# 2) Example Query
####################################################################################

# Suppose we have some data upserted in Qdrant already (100 rows or more).
# Let's do a simple query:
query_text = "Movies with more action"

# Embed the query using the actor
query_embedding_ref = embed_actor.embed_texts.remote([query_text])
query_embedding = ray.get(query_embedding_ref)[0]  # single vector


# 3) Search Qdrant: chunk-level results
####################################################################################

# We retrieve top 50 chunk hits. Qdrant will return chunk-level results.
search_results = qdrant_client.search(
    collection_name=collection_name,
    query_vector=query_embedding,
    limit=50  # or some number
)


# Step A: Group all chunk hits by their movie_id
aggregated = defaultdict(list)

for r in search_results:
    # Each r is a ScoredPoint
    # We'll assume you stored "movie_id" in the payload
    movie_id = r.payload["movie_id"]
    aggregated[movie_id].append(r)

# Step B: For each movie, pick the chunk with the highest score
final_movie_results = []
for movie_id, chunk_list in aggregated.items():
    # Sort chunk_list in descending order by score
    chunk_list.sort(key=lambda x: x.score, reverse=True)
    best_chunk = chunk_list[0]  # the single best chunk
    final_movie_results.append(best_chunk)

# Step C: Sort the final list of best-chunk-per-movie by score again
final_movie_results.sort(key=lambda x: x.score, reverse=True)

# 5) Display Aggregated Results
####################################################################################

for i, result in enumerate(final_movie_results[:10]):
    # top 10 movie-level results
    movie_id = result.payload["movie_id"]
    chunk_text = result.payload["chunk_text"]
    score = result.score

    print(f"Rank {i+1}, Movie ID: {movie_id}, Score: {score:.3f}")
    print(f"  Chunk snippet: {chunk_text[:150]}...")
    print("-" * 60)



Qdrant

In [24]:
# from qdrant_client import QdrantClient
# from qdrant_client.http.models import VectorParams, Distance

# def setup_collection(
#     collection_name: str = "movie_collection_semantic_retrieval",
#     vector_size: int = 768,  # match your embedding dimension
#     distance: str = "Cosine"
# ):
#     """
#     Creates or recreates a Qdrant collection with the specified size and distance metric.
#     """
#     client = QdrantClient(url="http://localhost:6333")  # Qdrant is in Docker
#     client.recreate_collection(
#         collection_name=collection_name,
#         vectors_config=VectorParams(size=vector_size, distance=distance)
#     )
#     print(f"Collection '{collection_name}' created/reset.")
#     return client


Main