In [1]:
# 📦 Install dependencies
!pip install -q pandas numpy tqdm sentence-transformers qdrant-client fastembed torch


In [2]:
# 📚 Imports
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from fastembed.sparse import SparseTextEmbedding
from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, SparseVectorParams, Distance
import torch
import gc
import sys

# ⚙️ Config
BATCH_SIZE = 512
DATA_PATH = "~/TMDB_movie_dataset_v11.csv"
COLLECTION_NAME = "movies"
TEXT_COLUMNS = ["title", "overview", "tagline", "genres", "keywords"]

# 🧠 Init models
device = "cuda" if torch.cuda.is_available() else "cpu"
dense_model = SentenceTransformer("all-MiniLM-L6-v2", device=device)
dense_model = dense_model.eval()

sparse_model = SparseTextEmbedding(
    model_name="Qdrant/bm42-all-minilm-l6-v2-attentions",
    providers=["CPUExecutionProvider"],
    quantize=True
)

# 🧮 [1/4] Load & preprocess dataset
print("[1/4] Loading and preprocessing dataset...")
df = pd.read_csv(DATA_PATH)
df[TEXT_COLUMNS] = df[TEXT_COLUMNS].fillna("")
df["full_text"] = df[TEXT_COLUMNS].agg(" ".join, axis=1)
df["user_id"] = np.random.randint(1, 11, size=len(df))
print(f"✔️ Loaded {len(df)} rows.\n")

[1/4] Loading and preprocessing dataset...
✔️ Loaded 1196770 rows.



In [3]:
# 🔗 [2/4] Connect to Qdrant and [3/4] Create Collection
print("[2/4] Connecting to Qdrant...")
from qdrant_client import QdrantClient, models

qdrant_client = QdrantClient(url="http://localhost:6333")
print("✔️ I'm in!")

print("[3/4] Creating collection...")
if qdrant_client.collection_exists(COLLECTION_NAME):
    qdrant_client.delete_collection(COLLECTION_NAME)

qdrant_client.create_collection(
    collection_name=COLLECTION_NAME,
    vectors_config={
        "dense": models.VectorParams(
            size=384,
            distance=models.Distance.COSINE,
            hnsw_config=models.HnswConfigDiff(m=16, ef_construct=100)
        )
    },
    sparse_vectors_config={
        "sparse": models.SparseVectorParams(
            index=models.SparseIndexParams(
                on_disk=False,
                full_scan_threshold=20000
            )
        )
    },
    optimizers_config=models.OptimizersConfigDiff(
        indexing_threshold=20000,
        memmap_threshold=20000
    ),
    shard_number=3,
    replication_factor=2
)
print(f"✔️ Collection `{COLLECTION_NAME}` created.\n")
print(qdrant_client.get_collection(COLLECTION_NAME))

[2/4] Connecting to Qdrant...
✔️ I'm in!
[3/4] Creating collection...
✔️ Collection `movies` created.

status=<CollectionStatus.GREEN: 'green'> optimizer_status=<OptimizersStatusOneOf.OK: 'ok'> vectors_count=None indexed_vectors_count=0 points_count=0 segments_count=24 config=CollectionConfig(params=CollectionParams(vectors={'dense': VectorParams(size=384, distance=<Distance.COSINE: 'Cosine'>, hnsw_config=HnswConfigDiff(m=16, ef_construct=100, full_scan_threshold=None, max_indexing_threads=None, on_disk=None, payload_m=None), quantization_config=None, on_disk=None, datatype=None, multivector_config=None)}, shard_number=3, sharding_method=None, replication_factor=2, write_consistency_factor=1, read_fan_out_factor=None, on_disk_payload=True, sparse_vectors={'sparse': SparseVectorParams(index=SparseIndexParams(full_scan_threshold=20000, on_disk=False, datatype=None), modifier=None)}), hnsw_config=HnswConfig(m=16, ef_construct=100, full_scan_threshold=10000, max_indexing_threads=0, on_disk

In [4]:
# 📤 [4/4] Batch upload with multiprocessing, timeout protection, and logging
import os
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from qdrant_client import models
import psutil
import sys
import torch
import time


# Optimize for AMD EPYC / multi-core
os.environ["OMP_NUM_THREADS"] = "16"
os.environ["MKL_NUM_THREADS"] = "16"
os.environ["KMP_BLOCKTIME"] = "1"
os.environ["KMP_AFFINITY"] = "granularity=fine,compact,1,0"
os.environ["TOKENIZERS_PARALLELISM"] = "true"

print("[4/4] Uploading data to Qdrant with multiprocessing...")
total = len(df)

def process_batch(batch_start: int):
    import torch
    from sentence_transformers import SentenceTransformer
    from fastembed.sparse import SparseTextEmbedding

    BATCH_SIZE = 512
    batch_end = min(batch_start + BATCH_SIZE, total)
    batch_df = df.iloc[batch_start:batch_end]
    texts = batch_df["full_text"].tolist()

    dense_model = SentenceTransformer("all-MiniLM-L6-v2").eval()
    sparse_model = SparseTextEmbedding(
        model_name="Qdrant/bm42-all-minilm-l6-v2-attentions",
        providers=["CPUExecutionProvider"],
        quantize=True
    )

    t0 = time.time()
    with torch.inference_mode():
        dense_vectors = dense_model.encode(texts, convert_to_numpy=True, batch_size=BATCH_SIZE)
    sparse_vectors = list(sparse_model.embed(texts, batch_size=BATCH_SIZE))
    t1 = time.time()

    points = []
    for i, (row, dense, sparse) in enumerate(zip(batch_df.itertuples(), dense_vectors, sparse_vectors)):
        points.append({
            "id": int(row.Index),
            "vector": {  # ✅ MUST be singular
                "dense": dense.tolist(),
                "sparse": models.SparseVector(
                    indices=sparse.indices.tolist(),
                    values=sparse.values.tolist()
                )
            },
            "payload": {
                "user_id": int(row.user_id),
                "title": row.title,
                "overview": row.overview
            }
        })


    print(f"[✔] Encoded batch {batch_start}–{batch_end} | {len(points)} points | {t1 - t0:.2f}s")
    return points

# Create batches
batch_starts = list(range(0, total, BATCH_SIZE))

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_batch, i) for i in batch_starts]

    for i, future in enumerate(futures):
        try:
            print(f"\n⏳ Waiting for future {i}...")
            batch_points = future.result(timeout=300)  # 5 min timeout

            print(f"📤 Uploading batch {i}, {len(batch_points)} points...")
            qdrant_client.upsert(collection_name=COLLECTION_NAME, points=batch_points)

            mem_usage = psutil.Process().memory_info().rss / 1024**3
            sys.stdout.write(
                f"\r✅ Uploaded {(i + 1) * BATCH_SIZE}/{total} points | RAM: {mem_usage:.1f} GB"
            )
            sys.stdout.flush()

        except TimeoutError:
            print(f"\n⏱️ Batch {i} timed out.")
        except Exception as e:
            print(f"\n❌ Error in batch {i}: {str(e)}")

print("\n✔️ All batches processed.")


[4/4] Uploading data to Qdrant with multiprocessing...

⏳ Waiting for future 0...
[✔] Encoded batch 0–512 | 512 points | 94.85s
[✔] Encoded batch 1024–1536 | 512 points | 97.07s
📤 Uploading batch 0, 512 points...
[✔] Encoded batch 512–1024 | 512 points | 96.31s
✅ Uploaded 512/1196770 points | RAM: 2.5 GB
⏳ Waiting for future 1...
📤 Uploading batch 1, 512 points...
✅ Uploaded 1024/1196770 points | RAM: 2.5 GB
⏳ Waiting for future 2...
📤 Uploading batch 2, 512 points...
✅ Uploaded 1536/1196770 points | RAM: 2.5 GB
⏳ Waiting for future 3...
[✔] Encoded batch 1536–2048 | 512 points | 97.66s
📤 Uploading batch 3, 512 points...
✅ Uploaded 2048/1196770 points | RAM: 2.5 GB
⏳ Waiting for future 4...
[✔] Encoded batch 2560–3072 | 512 points | 96.04s
[✔] Encoded batch 3584–4096 | 512 points | 92.51s
[✔] Encoded batch 3072–3584 | 512 points | 96.87s
[✔] Encoded batch 2048–2560 | 512 points | 98.63s
📤 Uploading batch 4, 512 points...
✅ Uploaded 2560/1196770 points | RAM: 2.5 GB
⏳ Waiting for future

In [6]:
import json

# Read all data from the Qdrant collection
print("📥 Reading all data from the Qdrant collection...")
all_points = []
scroll_result = qdrant_client.scroll(collection_name=COLLECTION_NAME, scroll_filter=None, limit=1000)

while scroll_result:
    points, next_page_offset = scroll_result
    all_points.extend(points)
    if next_page_offset is None:
        break
    scroll_result = qdrant_client.scroll(
        collection_name=COLLECTION_NAME,
        scroll_filter=None,
        limit=1000,
        offset=next_page_offset
    )

print(f"✔️ Retrieved {len(all_points)} points from the collection.")

# Save the data to a local JSON file
output_file = "movies_backup.json"
with open(output_file, "w") as f:
    json.dump([point.dict() for point in all_points], f)

print(f"✔️ Data saved to {output_file}.")

📥 Reading all data from the Qdrant collection...
✔️ Retrieved 1196770 points from the collection.


/tmp/ipykernel_192196/3409099285.py:25: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  json.dump([point.dict() for point in all_points], f)


✔️ Data saved to movies_backup.json.


# 📤 [4/4] Snigle-thread processing
print("[4/4] Uploading data to Qdrant...")
total = len(df)

for batch_start in range(0, total, BATCH_SIZE):
    batch_end = min(batch_start + BATCH_SIZE, total)
    batch_df = df.iloc[batch_start:batch_end]
    texts = batch_df["full_text"].tolist()

    with torch.inference_mode(), torch.autocast(device_type=device):
        dense_vectors = dense_model.encode(texts, convert_to_numpy=True)
    sparse_vectors = list(sparse_model.embed(texts))

points = []
for i, (row, dense, sparse) in enumerate(zip(batch_df.itertuples(), dense_vectors, sparse_vectors)):
    points.append({
        "id": int(row.Index),
        "vectors": {
            "dense": dense.tolist()
        },
        "sparse_vectors": {
            "sparse": {
                "indices": sparse.indices.tolist(),
                "values": sparse.values.tolist()
            }
        },
        "payload": {
            "user_id": int(row.user_id),
            "title": row.title,
            "overview": row.overview
        }
    })


    qdrant_client.upsert(collection_name=COLLECTION_NAME, points=points)

    del dense_vectors, sparse_vectors
    torch.cuda.empty_cache() if device == "cuda" else None
    gc.collect()

    sys.stdout.write(f"\rUploaded {batch_end}/{total} points")
    sys.stdout.flush()

print("\n✔️ Upload complete.")
