In [1]:
# Install required packages
%pip install chromadb
%pip install langchain-google-genai
%pip install langchain-chroma
%pip install python-dotenv

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## 1. Setup and Imports

In [2]:
import chromadb
from chromadb.config import Settings
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from dotenv import load_dotenv
import os
import re
import json
from typing import List, Dict
from datetime import datetime

# Load environment variables (GOOGLE_API_KEY)
load_dotenv()

print("‚úÖ All imports loaded successfully")

‚úÖ All imports loaded successfully


## 2. Initialize ChromaDB (Unified Database)

In [3]:
# Initialize ChromaDB with persistent storage
DB_PATH = "./unified_chroma_db"

# Create directory if it doesn't exist
os.makedirs(DB_PATH, exist_ok=True)

# Create persistent client
chroma_client = chromadb.PersistentClient(
    path=DB_PATH,
    settings=Settings(
        anonymized_telemetry=False,
        allow_reset=True
    )
)

print(f"‚úÖ ChromaDB initialized at: {DB_PATH}")
print(f"üìä Existing collections: {[col.name for col in chroma_client.list_collections()]}")

‚úÖ ChromaDB initialized at: ./unified_chroma_db
üìä Existing collections: []


## 3. Initialize Embedding Model (Google Gemini)

**Critical:** Using the same embedding model as the document pipeline ensures semantic compatibility.

In [4]:
# Initialize Google Gemini embedding model (same as document pipeline)
embedding_model = GoogleGenerativeAIEmbeddings(
    model="models/text-embedding-004",
    task_type="retrieval_document"  # Use for storing embeddings
)

print(f"‚úÖ Embedding model loaded: Google Gemini text-embedding-004")
print(f"üìê Task type: retrieval_document")

‚úÖ Embedding model loaded: Google Gemini text-embedding-004
üìê Task type: retrieval_document


## 4. VTT Parser Functions

Parse WebVTT format and create speaker-turn chunks with configurable overlap.

In [5]:
def parse_vtt_to_turns(vtt_text: str) -> List[Dict]:
    """
    Parse VTT transcript into individual speaker turns.
    
    Args:
        vtt_text: Raw VTT transcript text
        
    Returns:
        List of dictionaries containing timestamp, speaker, and text
    """
    # Pattern to match: timestamp --> timestamp\nSpeaker: Text
    pattern = re.compile(
        r'(\d{2}:\d{2}:\d{2}\.\d{3})\s*-->\s*\d{2}:\d{2}:\d{2}\.\d{3}\s*\n'
        r'(.*?):\s*(.*?)(?=\n\n|\n\d|\Z)',
        re.DOTALL
    )
    
    turns = []
    matches = pattern.findall(vtt_text)
    
    for timestamp, speaker, text in matches:
        turns.append({
            "timestamp": timestamp,
            "speaker": speaker.strip(),
            "text": text.strip()
        })
    
    return turns


def create_speaker_turn_chunks(
    vtt_text: str,
    meeting_name: str,
    meeting_date: str,
    project_name: str = "default_project",
    turns_per_chunk: int = 8,
    overlap: int = 3
) -> List[Dict]:
    """
    Create overlapping chunks from VTT transcript based on speaker turns.
    
    Args:
        vtt_text: Raw VTT transcript text
        meeting_name: Name/title of the meeting
        meeting_date: Date of meeting (YYYY-MM-DD format)
        project_name: Name of the project this meeting belongs to
        turns_per_chunk: Number of speaker turns per chunk
        overlap: Number of overlapping turns between chunks
        
    Returns:
        List of chunk dictionaries with text, enhanced_content, and metadata
    """
    print(f"üéôÔ∏è  Parsing VTT transcript: {meeting_name}")
    
    # Parse VTT into speaker turns
    turns = parse_vtt_to_turns(vtt_text)
    
    if not turns:
        print("‚ö†Ô∏è  No speaker turns found in transcript")
        return []
    
    print(f"‚úÖ Parsed {len(turns)} speaker turns")
    
    # Create overlapping chunks
    chunks = []
    step = max(1, turns_per_chunk - overlap)  # Ensure step is at least 1
    
    for i in range(0, len(turns), step):
        window = turns[i : i + turns_per_chunk]
        
        # Skip very small chunks at the end
        if len(window) < 2:
            break
        
        # Extract metadata
        speakers_list = list(set(t['speaker'] for t in window))
        start_time = window[0]['timestamp']
        end_time = window[-1]['timestamp']
        
        # Combine turn contents
        combined_text = "\n".join([
            f"{t['speaker']}: {t['text']}" for t in window
        ])
        
        # Create enhanced content for better searchability
        enhanced_text = f"""Meeting: {meeting_name}
        Project: {project_name}
        Date: {meeting_date}
        Time Range: {start_time} - {end_time}
        Speakers: {', '.join(speakers_list)}

        Transcript:
        {combined_text}"""
        
        chunks.append({
            "text": combined_text,
            "enhanced_content": enhanced_text,
            "metadata": {
                "source_type": "meeting_transcript",
                "project_name": project_name,
                "meeting_name": meeting_name,
                "meeting_date": meeting_date,
                "start_time": start_time,
                "end_time": end_time,
                "speakers_in_chunk": json.dumps(speakers_list),  # Serialize list for ChromaDB
                "turn_count": len(window),
                "chunk_index": len(chunks)
            }
        })
    
    print(f"‚úÖ Created {len(chunks)} chunks (turns_per_chunk={turns_per_chunk}, overlap={overlap})")
    return chunks


print("‚úÖ VTT parser functions loaded")

‚úÖ VTT parser functions loaded


## 5. ChromaDB Storage Functions

In [6]:
def get_or_create_project_collection(project_name: str):
    """
    Get or create a ChromaDB collection for a specific project.
    Each project has its own collection for documents and transcripts.
    
    Args:
        project_name: Name of the project
        
    Returns:
        ChromaDB collection object
    """
    collection_name = f"project_{project_name.lower().replace(' ', '_')}"
    
    try:
        collection = chroma_client.get_collection(name=collection_name)
        print(f"‚úÖ Using existing collection: {collection_name}")
    except:
        collection = chroma_client.create_collection(
            name=collection_name,
            metadata={"description": f"Unified document and transcript chunks for {project_name}"}
        )
        print(f"‚úÖ Created new collection: {collection_name}")
    
    return collection


def store_transcript_chunks(
    chunks: List[Dict],
    project_name: str = "default_project"
):
    """
    Store transcript chunks in ChromaDB with Google Gemini embeddings.
    
    Args:
        chunks: List of chunk dictionaries from create_speaker_turn_chunks
        project_name: Name of the project collection
    """
    if not chunks:
        print("‚ö†Ô∏è  No chunks to store")
        return
    
    print(f"üíæ Storing {len(chunks)} transcript chunks...")
    
    # Get or create collection for this project
    collection = get_or_create_project_collection(project_name)
    
    # Prepare data for batch insertion
    ids = []
    documents = []
    metadatas = []
    
    for chunk in chunks:
        # Generate unique ID
        chunk_id = (
            f"transcript_{chunk['metadata']['meeting_name']}_"
            f"{chunk['metadata']['chunk_index']}"
        ).replace(" ", "_").lower()
        
        ids.append(chunk_id)
        documents.append(chunk['enhanced_content'])
        metadatas.append(chunk['metadata'])
    
    # Generate embeddings in batch (more efficient with Gemini API)
    print("üîÆ Generating embeddings via Google Gemini API...")
    batch_embeddings = embedding_model.embed_documents(documents)
    
    # Batch insert into ChromaDB
    collection.add(
        ids=ids,
        embeddings=batch_embeddings,
        documents=documents,
        metadatas=metadatas
    )
    
    print(f"‚úÖ Successfully stored {len(chunks)} transcript chunks")
    print(f"üìä Collection '{collection.name}' now has {collection.count()} total chunks")


print("‚úÖ Storage functions loaded")

‚úÖ Storage functions loaded


## 6. Main Ingestion Function

Complete pipeline to load VTT file and store in ChromaDB.

In [7]:
def ingest_transcript(
    vtt_file_path: str,
    meeting_name: str,
    meeting_date: str,
    project_name: str = "default_project",
    turns_per_chunk: int = 8,
    overlap: int = 3
) -> List[Dict]:
    """
    Complete ingestion pipeline: Load VTT file, chunk, and store in ChromaDB.
    
    Args:
        vtt_file_path: Path to the VTT transcript file
        meeting_name: Name/title of the meeting
        meeting_date: Date of meeting (YYYY-MM-DD format)
        project_name: Name of the project
        turns_per_chunk: Number of speaker turns per chunk
        overlap: Number of overlapping turns between chunks
        
    Returns:
        List of created chunks
    """
    print("=" * 80)
    print("üöÄ STARTING TRANSCRIPT INGESTION PIPELINE")
    print("=" * 80)
    
    # Step 1: Load VTT file
    print(f"\nüìÇ Loading VTT file: {vtt_file_path}")
    
    if not os.path.exists(vtt_file_path):
        print(f"‚ùå Error: File not found at {vtt_file_path}")
        return []
    
    with open(vtt_file_path, 'r', encoding='utf-8') as f:
        vtt_text = f.read()
    
    print(f"‚úÖ Loaded {len(vtt_text)} characters from file")
    
    # Step 2: Create chunks
    print(f"\nüî® Creating chunks...")
    chunks = create_speaker_turn_chunks(
        vtt_text=vtt_text,
        meeting_name=meeting_name,
        meeting_date=meeting_date,
        project_name=project_name,
        turns_per_chunk=turns_per_chunk,
        overlap=overlap
    )
    
    if not chunks:
        print("‚ùå No chunks created")
        return []
    
    # Step 3: Store in database
    print(f"\nüíæ Storing chunks in database...")
    store_transcript_chunks(chunks, project_name=project_name)
    
    print("\n" + "=" * 80)
    print("‚úÖ TRANSCRIPT INGESTION COMPLETE!")
    print("=" * 80)
    
    return chunks


print("‚úÖ Main ingestion function loaded")

‚úÖ Main ingestion function loaded


## 7. Ingest Your Transcript

**Update the file path below to your actual VTT transcript file.**

In [8]:
# === CONFIGURE YOUR TRANSCRIPT HERE ===

# Path to your VTT transcript file
VTT_FILE_PATH = "./test_transcript.vtt"  # Update this path

# Meeting metadata
MEETING_NAME = "transformer_meeting"
MEETING_DATE = "2026-01-11"  # YYYY-MM-DD format
PROJECT_NAME = "attention_transformer_project"  # Should match your document pipeline project

# Chunking parameters
TURNS_PER_CHUNK = 8  # Number of speaker turns per chunk
OVERLAP = 3  # Number of overlapping turns between chunks

# Run the ingestion pipeline
chunks = ingest_transcript(
    vtt_file_path=VTT_FILE_PATH,
    meeting_name=MEETING_NAME,
    meeting_date=MEETING_DATE,
    project_name=PROJECT_NAME,
    turns_per_chunk=TURNS_PER_CHUNK,
    overlap=OVERLAP
)

# Display summary
if chunks:
    print(f"\nüìä INGESTION SUMMARY:")
    print(f"   ‚Ä¢ Total chunks created: {len(chunks)}")
    print(f"   ‚Ä¢ Meeting: {MEETING_NAME}")
    print(f"   ‚Ä¢ Date: {MEETING_DATE}")
    print(f"   ‚Ä¢ Project: {PROJECT_NAME}")
    print(f"   ‚Ä¢ Speakers: {set([s for chunk in chunks for s in json.loads(chunk['metadata']['speakers_in_chunk'])])}")

üöÄ STARTING TRANSCRIPT INGESTION PIPELINE

üìÇ Loading VTT file: ./test_transcript.vtt
‚úÖ Loaded 2762 characters from file

üî® Creating chunks...
üéôÔ∏è  Parsing VTT transcript: transformer_meeting
‚úÖ Parsed 15 speaker turns
‚úÖ Created 3 chunks (turns_per_chunk=8, overlap=3)

üíæ Storing chunks in database...
üíæ Storing 3 transcript chunks...
‚úÖ Created new collection: project_attention_transformer_project
üîÆ Generating embeddings via Google Gemini API...
‚úÖ Successfully stored 3 transcript chunks
üìä Collection 'project_attention_transformer_project' now has 3 total chunks

‚úÖ TRANSCRIPT INGESTION COMPLETE!

üìä INGESTION SUMMARY:
   ‚Ä¢ Total chunks created: 3
   ‚Ä¢ Meeting: transformer_meeting
   ‚Ä¢ Date: 2026-01-11
   ‚Ä¢ Project: attention_transformer_project
   ‚Ä¢ Speakers: {'Ashish', 'Illia', 'Niki', 'Noam', 'Jakob'}


## 8. Verify Ingestion

Check what was stored in the database.

In [9]:
def verify_ingestion(project_name: str):
    """Verify transcript ingestion by checking the collection"""
    collection_name = f"project_{project_name.lower().replace(' ', '_')}"
    
    try:
        collection = chroma_client.get_collection(name=collection_name)
    except:
        print(f"‚ùå Collection '{collection_name}' not found")
        return
    
    # Get collection stats
    total_count = collection.count()
    all_data = collection.get(include=['metadatas'])
    metadatas = all_data['metadatas']
    
    # Count by source type
    transcript_count = sum(1 for m in metadatas if m.get('source_type') == 'meeting_transcript')
    document_count = sum(1 for m in metadatas if m.get('source_type') == 'document')
    
    # Get unique meetings
    unique_meetings = set(
        m.get('meeting_name') 
        for m in metadatas 
        if m.get('source_type') == 'meeting_transcript'
    )
    
    print("\n" + "=" * 80)
    print(f"üìä DATABASE VERIFICATION: {collection_name}")
    print("=" * 80)
    print(f"Total chunks: {total_count}")
    print(f"  üìÑ Document chunks: {document_count}")
    print(f"  üéôÔ∏è  Transcript chunks: {transcript_count}")
    print(f"\nMeetings stored: {len(unique_meetings)}")
    
    if unique_meetings:
        for meeting in sorted(unique_meetings):
            meeting_chunks = sum(
                1 for m in metadatas 
                if m.get('meeting_name') == meeting
            )
            print(f"  ‚Ä¢ {meeting}: {meeting_chunks} chunks")
    
    print("=" * 80)


# Verify the ingestion
verify_ingestion(PROJECT_NAME)


üìä DATABASE VERIFICATION: project_attention_transformer_project
Total chunks: 3
  üìÑ Document chunks: 0
  üéôÔ∏è  Transcript chunks: 3

Meetings stored: 1
  ‚Ä¢ transformer_meeting: 3 chunks


## 9. Preview Sample Chunks

View what was stored in the database.

In [10]:
def preview_chunks(chunks: List[Dict], num_chunks: int = 2):
    """Preview the first few chunks"""
    print("\n" + "=" * 80)
    print("üì¶ CHUNK PREVIEW")
    print("=" * 80)
    
    for i, chunk in enumerate(chunks[:num_chunks]):
        print(f"\n--- CHUNK {i + 1} ---")
        print(f"Meeting: {chunk['metadata']['meeting_name']}")
        print(f"Date: {chunk['metadata']['meeting_date']}")
        print(f"Time: {chunk['metadata']['start_time']} - {chunk['metadata']['end_time']}")
        print(f"Speakers: {json.loads(chunk['metadata']['speakers_in_chunk'])}")
        print(f"Turn count: {chunk['metadata']['turn_count']}")
        print(f"\nContent preview:")
        print(chunk['text'][:300] + "..." if len(chunk['text']) > 300 else chunk['text'])
        print("=" * 80)


# Preview the chunks
if chunks:
    preview_chunks(chunks, num_chunks=2)


üì¶ CHUNK PREVIEW

--- CHUNK 1 ---
Meeting: transformer_meeting
Date: 2026-01-11
Time: 00:00:01.120 - 00:01:25.500
Speakers: ['Ashish', 'Illia', 'Niki', 'Noam', 'Jakob']
Turn count: 8

Content preview:
Ashish: We need to finalize the "Why Self-Attention" section. We're arguing that it's superior to recurrent and convolutional layers for long-range dependencies.
Noam: Right, because the maximum path length between any two positions is a constant O(1) in the Transformer, whereas it's O(n) for recurr...

--- CHUNK 2 ---
Meeting: transformer_meeting
Date: 2026-01-11
Time: 00:01:02.500 - 00:02:32.100
Speakers: ['Illia', 'Ashish', 'Niki', 'Noam', 'Jakob']
Turn count: 8

Content preview:
Illia: Let's discuss the encoder-decoder structure. The encoder maps input $x$ to continuous representations $z$, then the decoder generates output $y$ auto-regressively.
Jakob: I think we should emphasize that each encoder layer has two sub-layers: multi-head self-attention and a position-wise feed...


## Summary

This notebook provides a complete transcript ingestion pipeline:

‚úÖ **Same Embedding Model**: Uses Google Gemini (same as document pipeline)  
‚úÖ **Unified Database**: Stores in `unified_chroma_db` alongside documents  
‚úÖ **Project Collections**: Uses `project_{name}` collections for organization  
‚úÖ **Speaker-Turn Chunking**: Intelligent chunking with overlap  
‚úÖ **Rich Metadata**: Tracks meetings, speakers, timestamps  
‚úÖ **Batch Processing**: Handle multiple transcripts efficiently  

**Next Steps:**
1. Update `VTT_FILE_PATH` to your actual transcript file
2. Run the ingestion
3. Query both documents and transcripts together using your existing RAG pipeline