# Change data capture with MongoDB

In a standalone MongoDB deployment, users are required to insert data directly through the 
SuperDuperDB `Datalayer` or client (which triggers the `Datalayer`). For use-cases 
with multiple users, stakeholders, and potentially automated data-updates on the database,
this is not sufficient. For that reason SuperDuperDB supports a paradigm known as 
change-data-capture (CDC). 

In change-data-capture, a service is deployed which watchers the data deployment for changes, and 
reacts to these changes, activating models which are configured to compute outputs over new data.

In this notebook, we demonstrate how to use CDC with SuperDuperDB.

In [1]:
import pymongo
import torch
import sys

sys.path.append('../')

from superduperdb.encoders.numpy.array import array
from superduperdb.models.sentence_transformers.wrapper import SentenceTransformer
from superduperdb.datalayer.mongodb.query import Collection
from superduperdb.misc.superduper import superduper
from superduperdb.core.watcher import Watcher
from superduperdb.core.vector_index import VectorIndex
from superduperdb.datalayer.base.cdc import DatabaseWatcher
from superduperdb.core.documents import Document as D
from sentence_transformers import SentenceTransformer as _SentenceTransformer

INFO:numexpr.utils:NumExpr defaulting to 8 threads.


In [2]:
pymongo.MongoClient().drop_database('documents')
pymongo.MongoClient().drop_database('_filesystem:documents')

In [None]:
db = pymongo.MongoClient().documents
db = superduper(db)

collectino = Collection('cdc_example')

Insert the data into `documents` collection

In [3]:
data = [
  {
    "title": "Politics of Armenia",
    "abstract": "The politics of Armenia take place in the framework of the parliamentary representative democratic republic of Armenia, whereby the President of Armenia is the head of state and the Prime Minister of Armenia the head of government, and of a multi-party system. Executive power is exercised by the President and the Government."
  },
  {
    "title": "Foreign relations of Armenia",
    "abstract": "Since its independence, Armenia has maintained a policy of complementarism by trying to have positive and friendly relations with Iran, Russia, and the West, including the United States and the European Union.– \"Armenian Foreign Policy Between Russia, Iran And U."
  },
  {
    "title": "Demographics of American Samoa",
    "abstract": "This article is about the demographics of American Samoa, including population density, ethnicity, education level, health of the populace, economic status, religious affiliations and other aspects of the population. American Samoa is an unincorporated territory of the United States located in the South Pacific Ocean."
  },
  {
    "title": "Analysis",
    "abstract": "Analysis is the process of breaking a complex topic or substance into smaller parts in order to gain a better understanding of it. The technique has been applied in the study of mathematics and logic since before Aristotle (384–322 B."
  }
]

data = [D(d) for d in data]

db.execute(collection.insert_many(data, encoders= [i]))

INFO:root:found 0 uris


(<pymongo.results.InsertManyResult at 0x7ff01f4d8d60>,
 TaskWorkflow(database=<superduperdb.datalayer.base.database.BaseDatabase object at 0x7ff029927070>, G=<networkx.classes.digraph.DiGraph object at 0x7ff01f4d8be0>))

Create a vector index watcher.
This consist a indexing watcher (SentenceTransformer) model to vectorize a key.

In [4]:
def configure_text_search(model):
    return db.add(VectorIndex(
        identifier='my-index',
        indexing_watcher=Watcher(
            model=model,
            key='abstract',
            select=Collection(name='documents').find()
        ),
    ))

configure_text_search(
    SentenceTransformer(
        identifier="test-st",
        object=_SentenceTransformer('all-MiniLM-L6-v2'),
        encoder= array('float32', shape=(384,))
    )
)

INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:sentence_transformers.SentenceTransformer:Use pytorch device: cpu


Create instance of `DatabaseWatcher`
Start watching `documents` collection.

In [26]:
database_watcher = DatabaseWatcher(
    db=db,
    identifier='basic-cdc-watcher',
    on=Collection(name='documents')
)
database_watcher.watch()

INFO:root:Database watch service started at 2023-07-21 19:21:35.649003
INFO:root:Started listening database with identity basic-cdc-watcher/documents...


Check the watcher status

In [27]:
database_watcher.is_available()

True

You can check info of the watcher.

In [28]:
database_watcher.info()

{
  "inserts": 0,
  "updates": 0
}


{'inserts': 0, 'updates': 0}

Add 2 documents and check the info again

In [29]:
data = [
    {
        "title": "Politics of India",
        "abstract": "Some despriction 1",
    }, 
    {
        "title": "Politics of Asia",
        "abstract": "some description 2",
    }
]
doc = db_mongo.test_db.documents.insert_many(data)

INFO:root:found 0 uris


Batches:   0%|          | 0/1 [00:00<?, ?it/s]



Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Check the inserts info again

In [10]:
database_watcher.info()

{
  "inserts": 1,
  "updates": 0
}


{'inserts': 1, 'updates': 0}

Check that the vectors synced between LanceDB and MongoDB

In [11]:
from superduperdb.vector_search.lancedb_client import LanceDBClient
from superduperdb import CFG

In [12]:
client = db.vector_database.client

Use the identifier which is `model/key`

In [None]:
table = client.get_table('test-st/abstract')

In [14]:
table.table.to_pandas()

Unnamed: 0,vector,id
0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",__SEEDKEY__
1,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b9970f87282bb1ebd74a82
2,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b9970f87282bb1ebd74a83
3,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b9989b6f03202cfcf0f784
4,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b9989b6f03202cfcf0f785
5,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b998fc9740cbc5d6c06555
6,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b998fc9740cbc5d6c06556
7,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b99d8b10919213a90f7b50
8,"[-0.057723213, 0.09273246, -0.028482832, 0.014...",64b99d8b10919213a90f7b51
9,"[-0.10046107, -0.004194115, 0.050496113, -0.01...",64b99d8c10919213a90f7b52
