<!-- TABS -->
# Multimodal vector search - Video

<!-- TABS -->
## Configure your production system

:::note
If you would like to use the production features 
of SuperDuperDB, then you should set the relevant 
connections and configurations in a configuration 
file. Otherwise you are welcome to use "development" mode 
to get going with SuperDuperDB quickly.
:::

In [1]:
import os

os.makedirs('.superduperdb', exist_ok=True)
os.environ['SUPERDUPERDB_CONFIG'] = '.superduperdb/config.yaml'

In [2]:
# <tab: MongoDB Community>
CFG = '''
data_backend: mongodb://127.0.0.1:27017/documents
artifact_store: filesystem://./artifact_store
'''

In [3]:
with open(os.environ['SUPERDUPERDB_CONFIG'], 'w') as f:
    f.write(CFG)

<!-- TABS -->
## Start your cluster

:::note
Starting a SuperDuperDB cluster is useful in production and model development
if you want to enable scalable compute, access to the models by multiple users for collaboration, 
monitoring.

If you don't need this, then it is simpler to start in development mode.
:::

In [4]:
# <tab: Experimental Cluster>
!python -m superduperdb local-cluster up

/bin/bash: line 1: python: command not found


<!-- TABS -->
## Connect to SuperDuperDB

:::note
Note that this is only relevant if you are running SuperDuperDB in development mode.
Otherwise refer to "Configuring your production system".
:::

In [5]:
# <tab: MongoDB>
from superduperdb import superduper

db = superduper('mongodb://localhost:27017/documents')

2024-Jul-03 22:09:49.10| INFO     | godcreator333| superduperdb.base.build:65   | Data Client is ready. MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, serverselectiontimeoutms=5000)
2024-Jul-03 22:09:49.10| INFO     | godcreator333| superduperdb.base.build:38   | Connecting to Metadata Client with engine:  MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, serverselectiontimeoutms=5000)
2024-Jul-03 22:09:49.10| INFO     | godcreator333| superduperdb.base.build:154  | Connecting to compute client: None
2024-Jul-03 22:09:49.10| INFO     | godcreator333| superduperdb.base.datalayer:87   | Building Data Layer
2024-Jul-03 22:09:50.83| INFO     | godcreator333| superduperdb.base.build:218  | Configuration: 
 +----------------+-------------------------------------+
| Configuration  |                Value                |
+----------------+-------------------------------------+
|  Data Backend  | mongodb://localhos

<!-- TABS -->
## Get useful sample data

In [6]:
# <tab: Video>
import os

data = [f'videos/{x}' for x in os.listdir('./videos')]
sample_datapoint = data[-1]

from superduperdb.ext.pillow import pil_image
chunked_model_datatype = pil_image

In [7]:
datas = [{'x': d} for d in data[:3]]

<!-- TABS -->
## Create datatype

SuperduperDB supports automatic data conversion, so users don’t need to worry about the compatibility of different data formats (`PIL.Image`, `numpy.array`, `pandas.DataFrame`, etc.) with the database.

It also supports custom data conversion methods for transforming data, such as defining the following Datatype.

In [8]:
# <tab: Video>
from superduperdb import DataType

# Create an instance of the Encoder with the identifier 'video_on_file' and load_hybrid set to False
datatype = DataType(
    identifier='video_on_file',
    encodable='file',
)

<!-- TABS -->
## Setup tables or collections

In [9]:
from superduperdb.components.table import Table
from superduperdb import Schema

schema = Schema(identifier="schema", fields={"x": datatype})
table_or_collection = Table("documents", schema=schema)
db.apply(table_or_collection)

2024-Jul-03 22:09:50.84| INFO     | godcreator333| superduperdb.base.document:366  | Building leaf <class 'superduperdb.components.schema.Schema'> with identifier: schema
2024-Jul-03 22:09:50.84| INFO     | godcreator333| superduperdb.base.document:366  | Building leaf <class 'superduperdb.backends.ibis.field_types.FieldType'> with identifier: String
2024-Jul-03 22:09:50.84| INFO     | godcreator333| superduperdb.base.document:366  | Building leaf <class 'superduperdb.components.datatype.DataType'> with identifier: video_on_file


([],
 Table(identifier='documents', uuid='9e9cd311-837b-48fc-b7a0-8a32647768b5', schema=Schema(identifier='schema', uuid='d5b47f1a-bcd3-4642-8992-833c39c2766a', fields={'_fold': FieldType(identifier='String', uuid='0b2b4b14-e637-4f63-ab8b-26fd776c179e'), 'x': DataType(identifier='video_on_file', uuid='26308a0a-412f-4dea-ae8e-700284d89a70', encoder=None, decoder=None, info=None, shape=None, directory=None, encodable='file', bytes_encoding=<BytesEncoding.BYTES: 'Bytes'>, intermediate_type='bytes', media_type=None)}), primary_id='id'))

Inserting data, all fields will be matched with the schema for data conversion.

In [10]:
db['documents'].insert(datas).execute()
select = db['documents'].select()

2024-Jul-03 22:09:50.85| INFO     | godcreator333| superduperdb.backends.local.artifacts:108  | Copying file videos/4.mp4 to ./artifact_store/ccdc38e8e3d7942bd7bc64bb13d4478e273bd6d7/4.mp4
2024-Jul-03 22:09:50.86| INFO     | godcreator333| superduperdb.backends.local.artifacts:108  | Copying file videos/1.mp4 to ./artifact_store/8ba47407c81f1ba49d4fee78709d9f8ef4ea6915/1.mp4
2024-Jul-03 22:09:50.86| INFO     | godcreator333| superduperdb.backends.local.artifacts:108  | Copying file videos/2.mp4 to ./artifact_store/a0923badfc46f7809b21c2fdb3fa14c6d23223d0/2.mp4
2024-Jul-03 22:09:50.87| INFO     | godcreator333| superduperdb.backends.local.compute:42   | Submitting job. function:<function callable_job at 0x745b1a9948b0>
2024-Jul-03 22:09:50.87| INFO     | godcreator333| superduperdb.backends.local.artifacts:121  | Loading file ccdc38e8e3d7942bd7bc64bb13d4478e273bd6d7 from ./artifact_store
2024-Jul-03 22:09:50.87| INFO     | godcreator333| superduperdb.backends.local.artifacts:121  | Load

FileNotFoundError: [Errno 2] No such file or directory: './artifact_store/cd833bf885ab66657d3281eab04c320a9641ce94'

<!-- TABS -->
## Apply a chunker for search

:::note
Note that applying a chunker is ***not*** mandatory for search.
If your data is already chunked (e.g. short text snippets or audio) or if you
are searching through something like images, which can't be chunked, then this
won't be necessary.
:::

In [None]:
# <tab: Video>
!pip install opencv-python
import cv2
import tqdm
from PIL import Image
from superduperdb.ext.pillow import pil_image
from superduperdb import model, Schema


@model(
    flatten=True,
    model_update_kwargs={'document_embedded': False},
)
def chunker(video_file):
    # Set the sampling frequency for frames
    sample_freq = 10
    
    # Open the video file using OpenCV
    cap = cv2.VideoCapture(video_file)
    
    # Initialize variables
    frame_count = 0
    fps = cap.get(cv2.CAP_PROP_FPS)
    extracted_frames = []
    progress = tqdm.tqdm()

    # Iterate through video frames
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Get the current timestamp based on frame count and FPS
        current_timestamp = frame_count // fps
        
        # Sample frames based on the specified frequency
        if frame_count % sample_freq == 0:
            extracted_frames.append({
                'image': Image.fromarray(frame[:,:,::-1]),  # Convert BGR to RGB
                'current_timestamp': current_timestamp,
            })
        frame_count += 1
        progress.update(1)
    
    # Release resources
    cap.release()
    cv2.destroyAllWindows()
    
    # Return the list of extracted frames
    return extracted_frames

Now we apply this chunker to the data by wrapping the chunker in `Listener`:

In [None]:
from superduperdb import Listener

upstream_listener = Listener(
    model=chunker,
    select=select,
    key='x',
    uuid="chunk",
)

db.apply(upstream_listener)

## Build multimodal embedding models

We define the output data type of a model as a vector for vector transformation.

In [None]:
# <tab: MongoDB>
from superduperdb.components.vector_index import vector
output_datatpye = vector(shape=(1024,))

Then define two models, one for text embedding and one for image embedding.

In [None]:
# <tab: Text-Image>
!pip install git+https://github.com/openai/CLIP.git
import clip
from superduperdb import vector
from superduperdb.ext.torch import TorchModel

# Load the CLIP model and obtain the preprocessing function
model, preprocess = clip.load("ViT-B/32", device='cpu')

# Create a TorchModel for text encoding
compatible_model = TorchModel(
    identifier='clip_text', # Unique identifier for the model
    object=model, # CLIP model
    preprocess=lambda x: clip.tokenize(x)[0],  # Model input preprocessing using CLIP 
    postprocess=lambda x: x.tolist(), # Convert the model output to a list
    datatype=output_datatpye,  # Vector encoder with shape (1024,)
    forward_method='encode_text', # Use the 'encode_text' method for forward pass 
)

# Create a TorchModel for visual encoding
model = TorchModel(
    identifier='clip_image',  # Unique identifier for the model
    object=model.visual,  # Visual part of the CLIP model    
    preprocess=preprocess, # Visual preprocessing using CLIP
    postprocess=lambda x: x.tolist(), # Convert the output to a list 
    datatype=output_datatpye, # Vector encoder with shape (1024,)
)

Because we use multimodal models, we define different keys to specify which model to use for embedding calculations in the vector_index.

In [None]:
compatible_key = 'text' # we use text key for text embedding
indexing_key = upstream_listener.outputs_key + '.image' # we use indexing_key for image embedding, use the image field of the result
select = upstream_listener.outputs_select

## Create vector-index

In [None]:
vector_index_name = 'my-vector-index'

In [None]:
# <tab: 2-Modalities>
from superduperdb import VectorIndex, Listener

jobs, _ = db.add(
    VectorIndex(
        vector_index_name,
        indexing_listener=Listener(
            key=indexing_key,      # the `Document` key `model` should ingest to create embedding
            select=select,       # a `Select` query telling which data to search over
            model=model,         # a `_Predictor` how to convert data to embeddings
        ),
        compatible_listener=Listener(
            key=compatible_key,      # the `Document` key `model` should ingest to create embedding
            model=compatible_model,         # a `_Predictor` how to convert data to embeddings
            active=False,
            select=None,
        )
    )
)

In [None]:
query_table_or_collection = select.table_or_collection

## Perform a vector search

We can perform the vector searches using text description:

In [None]:
# <tab: Text>
from superduperdb import Document
item = Document({compatible_key: "The moment of a soccer shot"})

Once we have this search target, we can execute a search as follows.

In [None]:
select = query_table_or_collection.like(item, vector_index=vector_index_name, n=5).select()
results = list(db.execute(select))

## Visualize Results

In [None]:
from IPython.display import display
for result in results:
    display(Document(result.unpack())[indexing_key])

## Check the system stays updated

You can add new data; once the data is added, all related models will perform calculations according to the underlying constructed model and listener, simultaneously updating the vector index to ensure that each query uses the latest data.

In [None]:
new_datas = [{'x': data[-1]}]
ids = db['documents'].insert(new_datas).execute()