# NeMo Curator: Complete Guide to Preprocessing, Embeddings, and Vector Storage

This comprehensive tutorial will guide you through the complete pipeline of preprocessing data, generating embeddings, and storing vectors for retrieval applications using NVIDIA NeMo Curator.

## 🎯 What You'll Learn

1. **Environment Setup** - Configure NeMo Curator with GPU acceleration
2. **Data Loading & Preprocessing** - Clean and prepare raw text data
3. **Document Chunking** - Optimize text segmentation for embeddings
4. **Embedding Generation** - Create high-quality text embeddings at scale
5. **Vector Storage** - Set up efficient vector databases for retrieval
6. **Similarity Search** - Implement and test retrieval systems
7. **Performance Optimization** - Scale and optimize your pipeline

## 🚀 Prerequisites

- NVIDIA GPU with CUDA support
- Python 3.8+ 
- NeMo Curator installed with GPU acceleration
- Basic familiarity with text processing concepts

Let's get started building your preprocessing and embedding pipeline!

## 1. Environment Setup and Dependencies

First, let's install and configure all necessary libraries for our preprocessing and embedding pipeline.

In [2]:
# Setup for NeMo Curator from source
import os
import sys
import warnings
warnings.filterwarnings('ignore')

# Add the Curator directory to Python path since we're running from source
curator_path = "/Users/productpat/Curator"
if curator_path not in sys.path:
    sys.path.insert(0, curator_path)

print("🔧 Setting up imports from source...")

try:
    # Core NeMo Curator imports
    from nemo_curator.datasets import DocumentDataset
    from nemo_curator.utils.distributed_utils import get_client, get_num_workers
    from nemo_curator.modules import (
        AddId, 
        ExactDuplicates, 
        FuzzyDuplicates,
        Sequential
    )
    print("✅ Core NeMo Curator imports successful!")
except ImportError as e:
    print(f"❌ Failed to import core NeMo Curator: {e}")

try:
    from nemo_curator.modules.semantic_dedup import (
        SemDedup,
        EmbeddingCreator,
    )
    print("✅ Semantic deduplication modules imported!")
except ImportError as e:
    print(f"⚠️  Semantic dedup modules not available: {e}")

try:
    # Text processing imports
    from nemo_curator.modifiers import UnicodeReformatter
    from nemo_curator.filters import FastTextLangId
    from nemo_curator import Modify, ScoreFilter
    print("✅ Text processing modules imported!")
except ImportError as e:
    print(f"⚠️  Text processing modules not available: {e}")

# Standard libraries
import pandas as pd
import numpy as np
import time
from pathlib import Path
import json
from typing import List, Dict, Any

try:
    from tqdm import tqdm
    print("✅ Progress tracking available!")
except ImportError:
    print("⚠️  tqdm not available, progress bars disabled")
    tqdm = lambda x: x  # Simple fallback

print("✅ All imports configured!")

# Check GPU availability
try:
    import torch
    if torch.cuda.is_available():
        print(f"✅ CUDA available: {torch.cuda.device_count()} GPU(s)")
        print(f"   Current device: {torch.cuda.get_device_name()}")
    else:
        print("⚠️  CUDA not available, using CPU")
except ImportError:
    print("⚠️  PyTorch not available")

# Check if cuDF is available for GPU acceleration
try:
    import cudf
    print("✅ GPU acceleration (cuDF) available")
    USE_GPU = True
except ImportError:
    print("⚠️  GPU acceleration not available, using CPU backend")
    USE_GPU = False

🔧 Setting up imports from source...
✅ Core NeMo Curator imports successful!
⚠️  Semantic dedup modules not available: cannot import name 'SemDedup' from 'nemo_curator.modules.semantic_dedup' (/Users/productpat/Curator/nemo_curator/modules/semantic_dedup/__init__.py)
✅ Text processing modules imported!
✅ Progress tracking available!
✅ All imports configured!
⚠️  CUDA not available, using CPU
⚠️  GPU acceleration not available, using CPU backend


In [None]:
# Initialize GPU-accelerated Dask client for distributed processing
print("🚀 Initializing distributed computing environment...")

try:
    # Start GPU cluster for semantic deduplication and embeddings
    client = get_client(cluster_type="gpu", set_torch_to_use_rmm=False)
    print(f"✅ GPU cluster initialized with {get_num_workers(client)} workers")
    print(f"Dashboard link: {client.dashboard_link}")
except Exception as e:
    print(f"⚠️  GPU cluster failed, falling back to CPU: {e}")
    # Fallback to CPU cluster
    client = get_client(cluster_type="cpu", n_workers=4, processes=True, memory_limit="8GB")
    print(f"✅ CPU cluster initialized with {get_num_workers(client)} workers")

# Check GPU availability
try:
    import cudf
    print("✅ GPU acceleration (cuDF) available")
    USE_GPU = True
except ImportError:
    print("⚠️  GPU acceleration not available, using CPU backend")
    USE_GPU = False

client

## 2. Data Loading and Initial Preprocessing

Let's start by loading and validating our raw data. NeMo Curator supports various input formats including JSONL, Parquet, and plain text files.

In [None]:
# Create sample data directory and files for this tutorial
data_dir = Path("tutorial_data")
data_dir.mkdir(exist_ok=True)

# Create sample documents for our tutorial
sample_documents = [
    {
        "text": "Natural language processing (NLP) is a subfield of artificial intelligence that focuses on the interaction between computers and human language. It involves developing algorithms and models that can understand, interpret, and generate human language.",
        "source": "AI_encyclopedia",
        "category": "technology"
    },
    {
        "text": "Machine learning is a method of data analysis that automates analytical model building. It uses algorithms that iteratively learn from data, allowing computers to find hidden insights without being explicitly programmed where to look.",
        "source": "ML_handbook", 
        "category": "technology"
    },
    {
        "text": "Climate change refers to long-term shifts in global or regional climate patterns. The primary cause is increased levels of greenhouse gases produced by human activities, particularly the burning of fossil fuels.",
        "source": "climate_report",
        "category": "environment"
    },
    {
        "text": "Quantum computing leverages quantum mechanical phenomena like superposition and entanglement to process information in fundamentally different ways than classical computers. This enables solving certain complex problems exponentially faster.",
        "source": "quantum_physics",
        "category": "technology"
    },
    {
        "text": "Biodiversity refers to the variety of life on Earth, including the variety of species, ecosystems, and genetic diversity within species. It is essential for ecosystem stability and human well-being.",
        "source": "biology_textbook",
        "category": "environment"
    }
]

# Save sample data as JSONL
sample_file = data_dir / "sample_documents.jsonl"
with open(sample_file, 'w') as f:
    for doc in sample_documents:
        f.write(json.dumps(doc) + '\n')

print(f"✅ Created sample data: {sample_file}")
print(f"📄 Number of documents: {len(sample_documents)}")

# Display first document
print(f"📋 Sample document:")
print(json.dumps(sample_documents[0], indent=2))

In [None]:
# Load data using NeMo Curator's DocumentDataset
print("📚 Loading data with NeMo Curator...")

# Method 1: Load from JSONL files
backend = "cudf" if USE_GPU else "pandas"
dataset = DocumentDataset.read_json(str(data_dir), backend=backend)

print(f"✅ Dataset loaded successfully!")
print(f"📊 Dataset info:")
print(f"   - Number of partitions: {dataset.df.npartitions}")
print(f"   - Backend: {backend}")
print(f"   - Columns: {list(dataset.df.columns)}")

# Display basic statistics
df_sample = dataset.df.head()
print(f"\n📋 First few documents:")
print(df_sample)

## 3. Text Cleaning and Normalization

Before generating embeddings, we need to clean and normalize our text data. This includes Unicode normalization, removing unwanted characters, and filtering by language quality.

In [None]:
# Add unique document IDs first
print("🏷️  Adding unique document IDs...")
add_id = AddId(id_field="doc_id", id_prefix="doc", start_index=0)
dataset_with_ids = add_id(dataset)

print(f"✅ Added IDs. Sample with ID:")
print(dataset_with_ids.df.head(2).compute())

# Unicode normalization to handle various text encodings
print("\n🧹 Applying Unicode normalization...")
unicode_normalizer = UnicodeReformatter()
normalize_step = Modify(unicode_normalizer)
normalized_dataset = normalize_step(dataset_with_ids)

print("✅ Unicode normalization complete")

# Check for text improvements
original_sample = dataset_with_ids.df.head(1)['text'].compute().iloc[0]
normalized_sample = normalized_dataset.df.head(1)['text'].compute().iloc[0]

print(f"\n📝 Text normalization example:")
print(f"Original: {original_sample[:100]}...")
print(f"Normalized: {normalized_sample[:100]}...")

print(f"\n📊 Dataset size after normalization: {len(normalized_dataset.df)} documents")

## 4. Document Chunking and Segmentation

For effective embedding generation, we need to split long documents into smaller, semantically coherent chunks. This is crucial for retrieval applications where you want to find specific relevant passages.

In [None]:
# Import document splitting modules
from nemo_curator.modules import DocumentSplitter, DocumentJoiner

print("✂️  Setting up document chunking pipeline...")

# For this example, let's create some longer documents to demonstrate chunking
longer_docs = [
    {
        "text": "Natural language processing (NLP) is a subfield of artificial intelligence. It focuses on interaction between computers and human language. NLP involves developing algorithms that can understand text. These algorithms can interpret human language patterns. They can also generate human-like text responses. Modern NLP uses deep learning techniques. Transformers have revolutionized the field. Applications include translation, summarization, and chatbots.",
        "doc_id": "doc_long_1",
        "source": "AI_encyclopedia"
    },
    {
        "text": "Machine learning automates analytical model building. It uses algorithms that learn from data iteratively. Computers find hidden insights automatically. No explicit programming is needed for pattern discovery. Supervised learning uses labeled training data. Unsupervised learning finds patterns in unlabeled data. Reinforcement learning learns through trial and error. Deep learning uses neural networks with multiple layers.",
        "doc_id": "doc_long_2", 
        "source": "ML_handbook"
    }
]

# Create a new dataset with longer documents for chunking demonstration
longer_df = pd.DataFrame(longer_docs)
chunking_dataset = DocumentDataset.from_pandas(longer_df, backend=backend)

print(f"📄 Created dataset with {len(longer_docs)} longer documents for chunking")
print(f"📝 Sample document length: {len(longer_docs[0]['text'])} characters")

# Split documents by sentences (using period as separator)
print("\n🔪 Splitting documents into smaller chunks...")
splitter = DocumentSplitter(
    separator=".", 
    text_field="text",
    segment_id_field="segment_id"
)

chunked_dataset = splitter(chunking_dataset)
print(f"✅ Document splitting complete")

# Show the results
chunked_sample = chunked_dataset.df.compute()
print(f"\n📊 Chunking results:")
print(f"   - Original documents: {len(longer_docs)}")
print(f"   - Total chunks created: {len(chunked_sample)}")
print(f"\n📋 Sample chunks:")
print(chunked_sample[['doc_id', 'segment_id', 'text']].head())

## 5. Embedding Model Selection and Loading

Now we'll configure and load an embedding model to convert our text chunks into dense vector representations. We'll use sentence-transformers which integrates well with NeMo Curator.

In [None]:
# Configure embedding model settings
print("🤖 Configuring embedding model...")

# Model configuration - using a lightweight, high-quality sentence transformer
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_BATCH_SIZE = 128  # Adjust based on your GPU memory
EMBEDDING_DIM = 384  # Dimension of the all-MiniLM-L6-v2 model

print(f"📋 Embedding Configuration:")
print(f"   - Model: {EMBEDDING_MODEL}")
print(f"   - Batch size: {EMBEDDING_BATCH_SIZE}")
print(f"   - Embedding dimension: {EMBEDDING_DIM}")
print(f"   - GPU acceleration: {USE_GPU}")

# Test embedding model loading
try:
    from sentence_transformers import SentenceTransformer
    
    print(f"\n🔄 Loading embedding model...")
    test_model = SentenceTransformer(EMBEDDING_MODEL)
    
    # Test with a sample text
    test_text = "This is a test sentence for embedding generation."
    test_embedding = test_model.encode([test_text])
    
    print(f"✅ Model loaded successfully!")
    print(f"📊 Test embedding shape: {test_embedding.shape}")
    print(f"🔢 Sample embedding values: {test_embedding[0][:5]}...")
    
    # Clean up test model
    del test_model
    
except Exception as e:
    print(f"❌ Error loading embedding model: {e}")
    print("Please ensure sentence-transformers is installed: pip install sentence-transformers")

## 6. Batch Embedding Generation

Now we'll use NeMo Curator's EmbeddingCreator to generate embeddings for all our text chunks at scale. This module handles distributed processing, GPU acceleration, and memory management automatically.

In [None]:
# Set up embedding generation using NeMo Curator's EmbeddingCreator
print("🚀 Starting distributed embedding generation...")

# Create output directory for embeddings
embeddings_dir = data_dir / "embeddings"
embeddings_dir.mkdir(exist_ok=True)

try:
    # Initialize the EmbeddingCreator
    embedding_creator = EmbeddingCreator(
        model_name_or_path=EMBEDDING_MODEL,
        text_field="text",
        embedding_field="embeddings",
        batch_size=EMBEDDING_BATCH_SIZE,
        embedding_save_loc=str(embeddings_dir),
        write_embeddings_to_disk=True,
        id_field="doc_id"
    )
    
    print(f"✅ EmbeddingCreator initialized")
    print(f"   - Input field: {embedding_creator.text_field}")
    print(f"   - Output field: {embedding_creator.embedding_field}")
    print(f"   - Batch size: {embedding_creator.batch_size}")
    
    # For this tutorial, let's use our normalized dataset (smaller for demonstration)
    print(f"\n🔄 Generating embeddings for {len(normalized_dataset.df)} documents...")
    
    # Generate embeddings
    start_time = time.time()
    dataset_with_embeddings = embedding_creator(normalized_dataset)
    embedding_time = time.time() - start_time
    
    print(f"✅ Embedding generation complete!")
    print(f"⏱️  Time taken: {embedding_time:.2f} seconds")
    
    # Inspect the results
    embeddings_sample = dataset_with_embeddings.df.head(3).compute()
    print(f"\n📊 Results preview:")
    print(f"   - Total documents with embeddings: {len(dataset_with_embeddings.df)}")
    print(f"   - Columns: {list(embeddings_sample.columns)}")
    
    # Check embedding shape
    if 'embeddings' in embeddings_sample.columns:
        sample_embedding = embeddings_sample['embeddings'].iloc[0]
        print(f"   - Embedding shape: {np.array(sample_embedding).shape}")
        print(f"   - Sample embedding values: {np.array(sample_embedding)[:5]}...")
    
except Exception as e:
    print(f"❌ Error during embedding generation: {e}")
    print("📝 Note: EmbeddingCreator requires GPU setup. Using alternative approach...")
    
    # Alternative: Generate embeddings manually for tutorial purposes
    from sentence_transformers import SentenceTransformer
    
    model = SentenceTransformer(EMBEDDING_MODEL)
    
    # Get text data
    text_data = normalized_dataset.df['text'].compute().tolist()
    print(f"🔄 Generating embeddings for {len(text_data)} texts using fallback method...")
    
    # Generate embeddings in batches
    all_embeddings = []
    for i in range(0, len(text_data), EMBEDDING_BATCH_SIZE):
        batch_texts = text_data[i:i+EMBEDDING_BATCH_SIZE]
        batch_embeddings = model.encode(batch_texts)
        all_embeddings.extend(batch_embeddings.tolist())
    
    # Add embeddings to dataset
    df_with_embeddings = normalized_dataset.df.compute()
    df_with_embeddings['embeddings'] = all_embeddings
    dataset_with_embeddings = DocumentDataset.from_pandas(df_with_embeddings, backend=backend)
    
    print(f"✅ Fallback embedding generation complete!")
    print(f"📊 Generated {len(all_embeddings)} embeddings")

## 7. Vector Storage Setup and Configuration

With our embeddings generated, we need to store them in a vector database for efficient similarity search and retrieval. We'll demonstrate multiple vector storage options including FAISS and ChromaDB.

In [None]:
# Set up vector storage using FAISS
print("🗄️  Setting up vector storage with FAISS...")

try:
    import faiss
    faiss_available = True
    print("✅ FAISS available")
except ImportError:
    faiss_available = False
    print("⚠️  FAISS not available. Install with: pip install faiss-gpu (for GPU) or pip install faiss-cpu")

if faiss_available:
    # Extract embeddings and metadata
    df_with_emb = dataset_with_embeddings.df.compute()
    embeddings_array = np.array(df_with_emb['embeddings'].tolist()).astype('float32')
    
    print(f"📊 Preparing FAISS index:")
    print(f"   - Number of vectors: {embeddings_array.shape[0]}")
    print(f"   - Vector dimension: {embeddings_array.shape[1]}")
    
    # Create FAISS index
    dimension = embeddings_array.shape[1]
    
    # Use L2 distance for similarity (can also use IP for inner product)
    index = faiss.IndexFlatL2(dimension)
    
    # Optionally use GPU acceleration if available
    if USE_GPU and hasattr(faiss, 'StandardGpuResources'):
        try:
            res = faiss.StandardGpuResources()
            index = faiss.index_cpu_to_gpu(res, 0, index)
            print("🚀 Using GPU-accelerated FAISS index")
        except:
            print("⚠️  GPU FAISS failed, using CPU index")
    
    # Add vectors to index
    print("🔄 Adding vectors to FAISS index...")
    index.add(embeddings_array)
    
    print(f"✅ FAISS index created successfully!")
    print(f"   - Index type: {type(index)}")
    print(f"   - Total vectors: {index.ntotal}")
    print(f"   - Is trained: {index.is_trained}")
    
    # Save the index to disk
    faiss_index_path = data_dir / "faiss_index.bin"
    if hasattr(index, 'cpu_index'):  # GPU index
        faiss.write_index(index.cpu_index, str(faiss_index_path))
    else:  # CPU index
        faiss.write_index(index, str(faiss_index_path))
    
    print(f"💾 FAISS index saved to: {faiss_index_path}")
    
    # Save metadata mapping
    metadata_mapping = df_with_emb[['doc_id', 'text', 'source', 'category']].reset_index(drop=True)
    metadata_path = data_dir / "metadata_mapping.parquet"
    metadata_mapping.to_parquet(metadata_path)
    
    print(f"💾 Metadata mapping saved to: {metadata_path}")
    
else:
    print("⚠️  Skipping FAISS setup due to missing dependency")

In [None]:
# Alternative: Set up ChromaDB vector database
print("\n🔮 Setting up alternative vector storage with ChromaDB...")

try:
    import chromadb
    from chromadb.config import Settings
    
    # Create ChromaDB client
    chroma_client = chromadb.Client(Settings(
        persist_directory=str(data_dir / "chroma_db"),
        anonymized_telemetry=False
    ))
    
    # Create or get collection
    collection_name = "tutorial_embeddings"
    try:
        collection = chroma_client.create_collection(name=collection_name)
        print(f"✅ Created new ChromaDB collection: {collection_name}")
    except:
        collection = chroma_client.get_collection(name=collection_name)
        print(f"✅ Using existing ChromaDB collection: {collection_name}")
    
    # Prepare data for ChromaDB
    df_with_emb = dataset_with_embeddings.df.compute()
    
    # ChromaDB requires specific data format
    documents = df_with_emb['text'].tolist()
    embeddings = [emb.tolist() if isinstance(emb, np.ndarray) else emb for emb in df_with_emb['embeddings']]
    ids = [str(i) for i in range(len(documents))]
    metadatas = [
        {
            "doc_id": row['doc_id'],
            "source": row['source'],
            "category": row['category']
        }
        for _, row in df_with_emb.iterrows()
    ]
    
    print(f"🔄 Adding {len(documents)} documents to ChromaDB...")
    
    # Add to ChromaDB collection
    collection.add(
        embeddings=embeddings,
        documents=documents,
        metadatas=metadatas,
        ids=ids
    )
    
    print(f"✅ ChromaDB setup complete!")
    print(f"   - Collection: {collection_name}")
    print(f"   - Documents stored: {collection.count()}")
    print(f"   - Persist directory: {data_dir / 'chroma_db'}")
    
    chromadb_available = True
    
except ImportError:
    print("⚠️  ChromaDB not available. Install with: pip install chromadb")
    chromadb_available = False
except Exception as e:
    print(f"❌ Error setting up ChromaDB: {e}")
    chromadb_available = False

## 8. Indexing and Metadata Management

Effective metadata management is crucial for retrieval systems. We need to efficiently store and retrieve both the vector embeddings and their associated metadata (document IDs, sources, categories, etc.).