In [1]:
!pip install pymongo



# Realtime MongoDB ➜ Pinecone Vector Sync (Change Streams) Demo

This notebook demonstrates a minimal realtime synchronization pipeline:

- Watch a MongoDB collection with a change stream
- Embed incoming / updated documents using a SentenceTransformer
- Upsert vectors into a Pinecone index (and delete on removals)
- Maintain an always-fresh semantic search space with low code

Security: All secrets (MongoDB credentials, Pinecone API key) are replaced with placeholders. Supply them securely via environment variables or a secret manager. Each code cell below now includes a short explanation of its purpose.

In [2]:
!pip install transformers



In [None]:
from pymongo.mongo_client import MongoClient
from urllib.parse import quote_plus

# Securely supply via environment variables or secret store in real usage
username = quote_plus("Write your own password")  # MongoDB username placeholder
password = quote_plus("Write your own password")  # MongoDB password placeholder

uri = f"mongodb+srv://{username}:{password}@cluster0.eh7hb.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

client = MongoClient(uri)
try:
    client.admin.command('ping')
    print("MongoDB ping attempted with placeholder credentials. Replace securely for real runs.")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [4]:
!pip install pinecone



In [None]:
PINECONE_API_KEY = "Write your own password"  # Set via environment variable in practice

In [6]:
#!pip install pinecone --upgrade

In [7]:
from pinecone import Pinecone

pc = Pinecone(api_key=PINECONE_API_KEY)
index = pc.Index("defult")

In [8]:
index

<pinecone.db_data.index.Index at 0x7e5a1fb30510>

In [9]:
db=client["mytestdb"]

In [10]:
collection=db["mytestcollection"]

In [11]:
!pip install sentence_transformers

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Using cached nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Using cached nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Using cached nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence_transformers)
  Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.11.0->sentence_transformers)
  Using cached nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch>=1.11.0->sentence_transforme

In [None]:
from sentence_transformers import SentenceTransformer, util

In [None]:
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [14]:
# open up change stream cursor
cursor = collection.watch(full_document='updateLookup')
print("Change stream is now open.")
while True:
    change = next(cursor)
    # If a new document is inserted into the collection, replicate its vector in Pinecone
    if change['operationType'] == 'insert':
      document = change['fullDocument']
      # convert the document's name into an embedding
      vector = embedding_model.encode(document['fullplot'])
      # Ensure the vector is a flat list of floats (and possibly convert to float64)
      vector = vector.tolist()  # Convert from numpy array to list
      vector = [float(x) for x in vector]  # Convert elements to float (usually float64)
      # Prepare the data for Pinecone upsert, which requires a tuple of (id, vector)
      # Assuming 'document['_id']' is the unique ID for the upsert operation
      upsert_data = (str(document['_id']), vector)
      # Insert into Pinecone
      index.upsert([upsert_data])  # Note that upsert_data is enclosed in a list

    elif change['operationType'] == 'update':
      document = change['fullDocument']
      document_id = document['_id']
      updated_fields = change['updateDescription']['updatedFields']

      # if the change is in the name field, generate the embedding and insert
      if updated_fields.get('fullplot'):
        vector = embedding_model.encode(updated_fields['fullplot'])
        upsert_data = (str(document_id), vector)
        # Insert into Pinecone
        index.upsert([upsert_data])  # Note that upsert_data is enclosed in a list

        #pinecone.upsert(index_name="myindex", data=vector, ids=[str(document_id)])

    # If a document is deleted from the collection, remove its vector from Pinecone
    elif change['operationType'] == 'delete':
      index.delete(ids=[str(change['documentKey']['_id'])])

Change stream is now open.


KeyboardInterrupt: 

## 🔄 MongoDB Change Stream to Pinecone Sync

This script listens to real-time changes in a MongoDB collection and keeps a Pinecone vector index in sync. It handles **inserts**, **updates**, and **deletes**, converting text fields to embeddings and syncing them with Pinecone.

---

### 🚀 How It Works:

1. **Start Change Stream:**
   - Opens a change stream on the MongoDB collection with `full_document='updateLookup'` to fetch full updated documents.
   - Prints a confirmation message once the stream is open.

2. **Listen for Changes (Loop):**
   - Enters an infinite `while True` loop.
   - Uses `next(cursor)` to fetch each new change event.

3. **On Insert:**
   - When a new document is inserted:
     - Extracts the document and encodes the `fullplot` field into a vector using `embedding_model`.
     - Converts the vector to a list of floats.
     - Prepares a tuple `(document_id, vector)` for Pinecone.
     - Upserts this data into the Pinecone index.

4. **On Update:**
   - If a document is updated and the `fullplot` field is among the changes:
     - Re-encodes the updated `fullplot` text.
     - Upserts the updated vector into Pinecone using the document’s ID.

5. **On Delete:**
   - If a document is deleted from MongoDB:
     - Removes its corresponding vector from Pinecone using its ID.

---

### ✅ Summary

| MongoDB Operation | Pinecone Action                                 |
|-------------------|--------------------------------------------------|
| `insert`          | Encode `fullplot` ➝ Vector ➝ Upsert             |
| `update`          | If `fullplot` updated ➝ Re-encode ➝ Upsert      |
| `delete`          | Delete vector using document ID                 |

This enables **seamless, real-time vector synchronization** between MongoDB and Pinecone, ideal for building dynamic semantic search or recommendation systems.


# About the Author

<div style="background-color: #f8f9fa; border-left: 5px solid #28a745; padding: 20px; margin-bottom: 20px; border-radius: 5px;">
  <h2 style="color: #28a745; margin-top: 0; font-family: 'Poppins', sans-serif;"> Muhammad Atif Latif</h2>
  <p style="font-size: 16px; color: #495057;">Data Scientist & Machine Learning Engineer</p>
  
  <p style="font-size: 15px; color: #6c757d; margin-top: 15px;">
    Passionate about building AI solutions that solve real-world problems. Specialized in machine learning,
    deep learning, and data analytics with experience implementing production-ready models.
  </p>
</div>

## Connect With Me

<div style="display: flex; flex-wrap: wrap; gap: 10px; margin-top: 15px;">
  <a href="https://github.com/m-Atif-Latif" target="_blank">
    <img src="https://img.shields.io/badge/GitHub-Follow-212121?style=for-the-badge&logo=github" alt="GitHub">
  </a>
  <a href="https://www.kaggle.com/matiflatif" target="_blank">
    <img src="https://img.shields.io/badge/Kaggle-Profile-20BEFF?style=for-the-badge&logo=kaggle" alt="Kaggle">
  </a>
  <a href="https://www.linkedin.com/in/muhammad-atif-latif-13a171318" target="_blank">
    <img src="https://img.shields.io/badge/LinkedIn-Connect-0077B5?style=for-the-badge&logo=linkedin" alt="LinkedIn">
  </a>
  <a href="https://x.com/mianatif5867" target="_blank">
    <img src="https://img.shields.io/badge/Twitter-Follow-1DA1F2?style=for-the-badge&logo=twitter" alt="Twitter">
  </a>
  <a href="https://www.instagram.com/its_atif_ai/" target="_blank">
    <img src="https://img.shields.io/badge/Instagram-Follow-E4405F?style=for-the-badge&logo=instagram" alt="Instagram">
  </a>
  <a href="mailto:muhammadatiflatif67@gmail.com">
    <img src="https://img.shields.io/badge/Email-Contact-D14836?style=for-the-badge&logo=gmail" alt="Email">
  </a>
</div>

---

## Notebook Summary

Workflow Recap:
1. Installed required packages (`pymongo`, `pinecone`, `sentence_transformers`, `transformers`).
2. Established a (placeholder) MongoDB connection to watch a target collection.
3. Initialized a Pinecone index client for vector storage.
4. Loaded a compact embedding model (`all-MiniLM-L6-v2`) for fast real-time encoding.
5. Opened a MongoDB change stream and continuously:
   - Insert: Encoded new document `fullplot` text ➜ upsert vector.
   - Update: Re-embedded modified `fullplot` ➜ upsert.
   - Delete: Removed vector by document ID.
6. Provided an explanatory markdown table clarifying each operation mapping.

Why it Matters:
Real-time sync lets semantic search & recommendation systems stay fresh without full batch re-indexing. This pattern scales to multi-tenant or sharded deployments by parallelizing watchers or using a queue.

Hardening Ideas:
- Add exponential backoff & reconnection logic for transient network failures.
- Batch upserts for throughput (micro-batching).
- Add observability: metrics (latency, upsert counts), structured logs.
- Add filtering to limit which fields trigger re-embedding.
- Persist an embedding version to support model upgrades.

Crafted and documented by **Muhammad Atif Latif** — delivering practical, production-minded GenAI data infrastructure. Star the repo & connect below for more real-time vector and RAG engineering content.