# **UGC Content Embedding Creation and Qdrant**

This tutorial cookbook is for automating video content processing, embedding, and storage into the Qdrant vector database. This tutorial makes use of Qdrant for vector-based search, Twelve Labs for video embeddings, and AWS S3 for secure cloud storage.


Operations -


- Video Upload to AWS S3 – Uploads videos and generates public URLs.

- Video Embeddings Generation – Uses Twelve Labs model (Marengo-retrieval-2.7) for creating embeddings.

- Qdrant Integration – Stores video embeddings for efficient vector based retrieval.


Installing the necessary modules

In [None]:
!pip install yt-dlp
!pip install google-api-python-client

In [None]:
!pip install qdrant-client boto3

In [None]:
!pip install twelvelabs pytube

Importing the necessary library

In [None]:
import os
import uuid
import boto3
from botocore.exceptions import ClientError
import requests
from IPython.display import display, HTML
import shutil
from google.colab import files
import pandas as pd
from twelvelabs import TwelveLabs
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct
import glob

download_path = "downloads/disney_kids_shorts"
os.makedirs(download_path, exist_ok=True)

Setting up the API Keys and the client

In [None]:
AWS_ACCESS_KEY = "<Your AWS ACCESS API KEY>"
AWS_SECRET_KEY = "<Your AWS SECRET API KEY>"
AWS_BUCKET_NAME = "<Your AWS BUCKET NAME"
AWS_REGION = "eu-north-1"

TWELVE_LABS_API_KEY = "<Your Twelve Labs API KEY>"
QDRANT_HOST = "<Your QDRANT CLOUD HOST DOMAIN>"
QDRANT_API_KEY = "<Your QDRANT API KEY>"

s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)


Setting up the Qdrant Client

In [None]:
qdrant_client = None

if QDRANT_HOST and QDRANT_API_KEY:
    QDRANT_HOST = QDRANT_HOST.split(':')[0] if ':' in QDRANT_HOST else QDRANT_HOST
    qdrant_client = QdrantClient(
        url=f"https://{QDRANT_HOST}",
        api_key=QDRANT_API_KEY,
        timeout=20,
        prefer_grpc=False
    )

COLLECTION_NAME = "content_collection"
VECTOR_SIZE = 1024

Upload a video file to S3 and return its public URL, which will be provided in the metadata of the embedding to the Qdrant.

In [None]:
def upload_to_s3(file_path, filename):

    try:
        # Upload the file
        s3_client.upload_file(
            file_path,
            AWS_BUCKET_NAME,
            f"videos-embed/{filename}",
            ExtraArgs={
                'ACL': 'public-read',
                'ContentType': 'video/mp4'
            }
        )

        # Generate the public URL
        url = f"https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/videos-embed/{filename}"
        print(f"Uploaded to S3: {url}")
        return url

      except ClientError as e:
        print(f"Error uploading to S3: {str(e)}")
        raise


# **Video Embedding Creation**

Create whole video embedding using Twelve Labs

In [None]:
def create_video_embedding(video_path, max_retries=3, retry_delay=5):

    if not twelvelabs_client:
        raise ValueError("Twelve Labs API key not configured")

    retries = 0
    while retries < max_retries:
        try:
            print(f"Creating whole video embedding for {video_path}... (Attempt {retries+1}/{max_retries})")

            # Use video_embedding_scopes parameter set to ["clip", "video"] to get whole video embedding
            task = twelvelabs_client.embed.task.create(
                model_name="Marengo-retrieval-2.7",
                video_file=video_path,
                video_embedding_scopes=["clip","video"]
            )

            print(f"Created task: id={task.id}, status={task.status}")
            task.wait_for_done(sleep_interval=3)
            task_result = twelvelabs_client.embed.task.retrieve(task.id)

            if task_result.status != 'ready':
                raise ValueError(f"Task failed with status: {task_result.status}")

            return task_result

        except Exception as e:
            print(f"Error creating embedding (attempt {retries+1}): {str(e)}")
            retries += 1
            if retries < max_retries:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2
            else:
                print("Max retries reached, giving up.")
                raise

# **Insert Embeddings into Qdrant**

Store whole video embedding and the metadata in Qdrant.

In [1]:
def store_in_qdrant(task_result, video_id, s3_url, original_filename):

    if not qdrant_client:
        raise ValueError("Qdrant client not configured")

    try:
        print(f"Processing video embedding for {video_id}...")

        # the embedding will be in the segments with embedding_scope="video"
        if task_result.video_embedding and task_result.video_embedding.segments:
            video_segments = [s for s in task_result.video_embedding.segments
                             if hasattr(s, 'embedding_scope') and s.embedding_scope == 'video']

            if video_segments:
                print(f"Found video-scope embedding")
                embedding_vector = video_segments[0].embeddings_float
            else:
                # If no video scope segment is found, use the first segment as fallback
                print(f"No video-scope embedding found, using first available segment")
                embedding_vector = task_result.video_embedding.segments[0].embeddings_float
        else:
            raise ValueError("No embeddings found in the response")
        point = PointStruct(
            id=uuid.uuid4().int & ((1<<64)-1),
            vector=embedding_vector,
            payload={
                'video_id': video_id,
                'video_url': s3_url,
                'is_url': True,
                'original_filename': original_filename,
                'confidence': 'high'
            }
        )

        qdrant_client.upsert(collection_name=COLLECTION_NAME, points=[point])
        print(f"Stored whole video embedding in Qdrant")
        return 1
    except Exception as e:
        print(f"Error storing in Qdrant: {str(e)}")
        raise

Process all videos in the specified directory, Stepwise Calling the function.

In [None]:
def process_existing_videos(directory_path=download_path, limit=None, upload_only=False):

    # Get all video files in the directory
    all_files = os.listdir(directory_path)
    video_files = [os.path.join(directory_path, f) for f in all_files if f.lower().endswith('.mp4')]

    if not video_files:
        raise ValueError(f"No MP4 files found in {directory_path}")

    print(f"Found {len(video_files)} videos in {directory_path}")

    if limit and isinstance(limit, int) and limit > 0:
        video_files = video_files[:limit]
        print(f"Processing {limit} videos (limited)")

    results = []

    for file_path in video_files:
        try:
            original_filename = os.path.basename(file_path)
o
            video_id = f"{str(uuid.uuid4())}_{original_filename}"

            # Step 1 Upload to S3
            s3_url = upload_to_s3(file_path, video_id)

            # Save initial results
            result = {
                "file_path": file_path,
                "original_filename": original_filename,
                "video_id": video_id,
                "s3_url": s3_url,
                "status": "success"
            }

            if upload_only:
                result["embedding_status"] = "skipped"
                result["qdrant_status"] = "skipped"
            else:
                # Step 2 Create embedding
                try:
                    task_result = create_video_embedding(file_path)
                    result["embedding_status"] = "success"

                    # Step 3 Store in Qdrant
                    try:
                        segments = store_in_qdrant(
                            task_result,
                            video_id,
                            s3_url,
                            original_filename
                        )
                        result["segments"] = segments
                        result["qdrant_status"] = "success"
                    except Exception as qdrant_error:
                        result["qdrant_status"] = "failed"
                        result["qdrant_error"] = str(qdrant_error)

                except Exception as embedding_error:
                    result["embedding_status"] = "failed"
                    result["embedding_error"] = str(embedding_error)
                    result["qdrant_status"] = "skipped"

            results.append(result)

        except Exception as e:
            results.append({
                "file_path": file_path,
                "original_filename": os.path.basename(file_path),
                "status": "failed",
                "error": str(e)
            })

    df = pd.DataFrame(results)
    print(f"\nProcessed {len(results)} videos. Summary:")
    display(df)


    results_path = os.path.join(directory_path, "processing_results.csv")
    df.to_csv(results_path, index=False)
    print(f"Results saved to {results_path}")

    return df

Uncomment the next cell, to execute the function `process_existing_videos()` to see the downloading of videos, embedding creation and the inseration into the Qdrant.

In [None]:
# process_existing_videos()