Skip to content

Multiple source support broken on assertion failure #1295

@georgeh0

Description

@georgeh0

Hello, after updating to the newest cocoindex version (from 0.2 to 0.3.7) my flow, which was previously working perfect, unfortunately break. Issue is that if I try to import from multiple sources (this is done by configuration), I now run into an exception:
Exception: Import op count does not match export op count

with one source, it work as expected and as before.

Here is the flow:

"""CocoRAG flow definitions for embedding and indexing.

This module defines the core data processing pipeline using CocoIndex framework:
- Sentence transformer embedding integration
- File extension extraction utilities
- Dataflow transformation definitions
- PostgreSQL table management for vector storage
- Integration with CocoIndex's incremental processing system
"""

import logging
import os

import cocoindex
import numpy as np
from numpy.typing import NDArray

from .config import get_config, get_table_name

logger = logging.getLogger(__name__)


@cocoindex.op.function()
def extract_extension(filename: str) -> str:
    """Extract the extension of a filename."""
    return os.path.splitext(filename)[1]


@cocoindex.transform_flow()
def code_to_embedding(
    text: cocoindex.DataSlice[str],
) -> cocoindex.DataSlice[NDArray[np.float32]]:
    """
    Embed the text using sentence transformer model.
    """
    # Use a simple sentence transformer model without any special args
    return text.transform(cocoindex.functions.SentenceTransformerEmbed(model="sentence-transformers/all-MiniLM-L6-v2"))


@cocoindex.flow_def(name="CodeEmbedding")
def code_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope) -> None:
    """
    Define a flow that embeds files from multiple sources into a vector database.

    Args:
        flow_builder: CocoIndex flow builder
        data_scope: CocoIndex data scope
    """

    # Get the global configuration instance (auto-initialized if needed)
    config = get_config()

    # Create a single collector for all sources but export it only once
    # This ensures we have 1 export operation for the entire flow
    code_embeddings = data_scope.add_collector()

    # Process each source individually
    # !! Works with multiple source in 0.2 without any problems, 
    # !! with 0.3, run into Exception: Import op count does not match export op count
    # !! with only one source it works
    #
    for source_config in config.sources:
        source_name = source_config.get("name", "unnamed_source")
        topic = source_config.get("topic", None)
        # Create a CocoIndex LocalFile source from the configuration
        # This source contains file system connector with path, patterns, and exclusion rules
        source = config.create_source_from_config(source_config)
        # Add the source to the flow with a unique name to avoid conflicts
        data_scope[f"files_{source_name}"] = flow_builder.add_source(source)

        # Process files from this source
        with data_scope[f"files_{source_name}"].row() as file:
            file["extension"] = file["filename"].transform(extract_extension)
            file["chunks"] = file["content"].transform(
                cocoindex.functions.SplitRecursively(),
                language=file["extension"],
                chunk_size=config.chunk_size,
                min_chunk_size=config.min_chunk_size,
                chunk_overlap=config.chunk_overlap,
            )
            with file["chunks"].row() as chunk:
                # Use the code_to_embedding transform flow directly
                chunk["embedding"] = chunk["text"].call(code_to_embedding)
                code_embeddings.collect(
                    source_name=source_name,
                    filename=file["filename"],
                    location=chunk["location"],
                    topic=topic,
                    code=chunk["text"],
                    embedding=chunk["embedding"],
                    start=chunk["start"],
                    end=chunk["end"],
                )

    # Export once for all sources combined
    code_embeddings.export(
        get_table_name(),
        cocoindex.targets.Postgres(),
        primary_key_fields=["source_name", "filename", "location"],
        vector_indexes=[
            cocoindex.VectorIndexDef(
                field_name="embedding",
                metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
            )
        ],
    )

Any idea how to make that runnable again.
Thank you again for the great work here!

Originally posted by @petrarca in #841 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions