Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions daft_lance/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def create_scalar_index(
fragment_group_size: int | None = None,
num_partitions: int | None = None,
max_concurrency: int | None = None,
segmented: bool = False,
**kwargs: Any,
) -> None:
"""Build a distributed scalar index using Daft's distributed execution.
Expand Down Expand Up @@ -406,6 +407,11 @@ def create_scalar_index(
greater than 1 enable additional parallelism on distributed runners; values <= 1 or None will use the default partitioning.
max_concurrency: Maximum number of concurrent tasks to use for processing fragment batches.
If None, Daft will use its default concurrency setting. Must be a positive integer.
segmented: If True and ``index_type`` is ``"BTREE"``, use the segmented index
workflow where each worker builds a fully independent index segment and the
coordinator commits them atomically via ``commit_existing_index_segments``.
This produces proper ``index_details`` metadata so ``describe_indices()``
works correctly. Defaults to False (legacy partitioned-and-merged flow).
**kwargs: Additional keyword arguments forwarded to ``lance.LanceDataset.create_scalar_index``.

Returns:
Expand Down Expand Up @@ -436,6 +442,11 @@ def create_scalar_index(
... "s3://my-bucket/dataset/", column="price", index_type="BTREE", name="price_idx"
... )

Create a segmented BTREE index (supports describe_indices):
>>> daft_lance.create_scalar_index(
... "s3://my-bucket/dataset/", column="price", index_type="BTREE", segmented=True
... )

Create an index with custom fragment grouping and partitioning:
>>> daft_lance.create_scalar_index(
... "s3://my-bucket/dataset/", column="description", fragment_group_size=8, num_partitions=16
Expand Down Expand Up @@ -470,6 +481,7 @@ def create_scalar_index(
fragment_group_size=fragment_group_size,
num_partitions=num_partitions,
max_concurrency=max_concurrency,
segmented=segmented,
**kwargs,
)

Expand Down
190 changes: 186 additions & 4 deletions daft_lance/lance_scalar_index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import pickle
import uuid
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -58,6 +59,58 @@ def __call__(self, fragment_ids: list[int]) -> bool:
return True


class SegmentedFragmentIndexHandler:
"""Handler for segmented scalar index creation on fragment batches.

Unlike ``FragmentIndexHandler``, which writes partial index files sharing
a single UUID, this handler builds a fully independent index segment per
worker via the low-level ``_ds.create_index`` binding. The returned
``lance.Index`` metadata (including ``index_details``) is serialised
(pickled) so it can cross Daft process/serialisation boundaries. The
coordinator then commits all segments atomically with
``commit_existing_index_segments``.
"""

def __init__(
self,
lance_ds: lance.LanceDataset,
column: str,
index_type: str,
name: str,
replace: bool,
**kwargs: Any,
) -> None:
self.lance_ds = lance_ds
self.column = column
self.index_type = index_type
self.name = name
self.replace = replace
self.kwargs = kwargs

def __call__(self, fragment_ids: list[int]) -> bytes:
"""Build an independent index segment and return its pickled metadata."""
logger.info(
"Building segmented index segment for fragments %s (column=%s, type=%s)",
fragment_ids,
self.column,
self.index_type,
)

# _ds.create_index returns a lance.Index dataclass when fragment_ids
# is provided (uncommitted segment mode).
index_meta: lance.Index = self.lance_ds._ds.create_index( # type: ignore[call-arg]
[self.column],
self.index_type,
name=self.name,
replace=self.replace,
train=True,
storage_options=None,
kwargs={"fragment_ids": fragment_ids, **self.kwargs},
)

return pickle.dumps(index_meta)


def create_scalar_index_internal(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
Expand All @@ -70,6 +123,7 @@ def create_scalar_index_internal(
fragment_group_size: int | None = None,
num_partitions: int | None = None,
max_concurrency: int | None = None,
segmented: bool = False,
**kwargs: Any,
) -> None:
"""Internal implementation of distributed scalar index creation.
Expand All @@ -78,6 +132,13 @@ def create_scalar_index_internal(
merge_index_metadata, then commit). ``FTS`` is normalized to ``INVERTED`` (same Lance
index); see Lance Rust/Python bindings: ``INVERTED`` and ``FTS`` map to the same
inverted full-text index type.

When ``segmented=True`` and ``index_type`` is ``BTREE``, a cleaner segmented workflow
is used instead: each worker builds a fully independent index segment via the low-level
``_ds.create_index`` binding (which returns ``lance.Index`` metadata including
``index_details``), and the coordinator commits them atomically with
``commit_existing_index_segments``. This resolves a known issue where ``index_details``
was left empty in the legacy path, preventing ``describe_indices()`` from working.
"""
if not column:
raise ValueError("Column name cannot be empty")
Expand Down Expand Up @@ -166,17 +227,138 @@ def create_scalar_index_internal(
logger.info("No fragments found for dataset at %s; skipping scalar index creation.", uri)
return

# Generate unique index ID
index_id = str(uuid.uuid4())

logger.info(
"Starting distributed scalar index creation: column=%s, type=%s, name=%s, fragment_group_size=%s, max_concurrency=%s",
"Starting distributed scalar index creation: column=%s, type=%s, name=%s, fragment_group_size=%s, max_concurrency=%s, segmented=%s",
column,
index_type,
name,
fragment_group_size,
max_concurrency,
segmented,
)

# Choose between the segmented workflow and the legacy partitioned-and-merged
# workflow. Segmented mode produces proper ``index_details`` so
# ``describe_indices()`` works correctly.
if segmented and index_type == "BTREE":
_create_segmented_index(
lance_ds=lance_ds,
uri=uri,
column=column,
index_type=index_type,
name=name,
replace=replace,
storage_options=storage_options,
fragment_data=fragment_data,
fragment_ids_to_use=fragment_ids_to_use,
num_partitions=num_partitions,
max_concurrency=max_concurrency,
**kwargs,
)
else:
_create_partitioned_index(
lance_ds=lance_ds,
uri=uri,
column=column,
index_type=index_type,
name=name,
replace=replace,
storage_options=storage_options,
fragment_data=fragment_data,
fragment_ids_to_use=fragment_ids_to_use,
num_partitions=num_partitions,
max_concurrency=max_concurrency,
**kwargs,
)


def _create_segmented_index(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
*,
column: str,
index_type: str,
name: str,
replace: bool,
storage_options: dict[str, Any] | None,
fragment_data: list[dict[str, list[int]]],
fragment_ids_to_use: list[int],
num_partitions: int | None,
max_concurrency: int | None,
**kwargs: Any,
) -> None:
"""Segmented index workflow: each worker builds an independent segment.

Workers call the low-level ``_ds.create_index`` binding (which returns
``lance.Index`` metadata with ``index_details`` populated), pickle the
result so it can traverse Daft serialisation boundaries, and return it.
The coordinator unpickles all segments and commits them atomically via
``commit_existing_index_segments``.
"""
handler_cls = daft.cls(
SegmentedFragmentIndexHandler,
max_concurrency=max_concurrency,
)
handler = handler_cls(
lance_ds=lance_ds,
column=column,
index_type=index_type,
name=name,
replace=replace,
**kwargs,
)

with execution_config_ctx(maintain_order=False):
if num_partitions is not None and num_partitions > 1:
df = from_pylist(fragment_data).repartition(num_partitions)
else:
df = from_pylist(fragment_data)

df = df.select(handler(df["fragment_ids"]).alias("index_meta"))
collected = df.collect()

# Deserialise the Index metadata returned by each worker.
index_metas: list[lance.Index | lance.indices.IndexSegment] = [
pickle.loads(raw) for raw in collected.to_pydict()["index_meta"]
]

logger.info(
"Collected %d index segments; committing as segmented index %s",
len(index_metas),
name,
)

# Reload dataset to pick up the latest version (segment files were written
# by workers against the version that was current at their invocation time).
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)
lance_ds.commit_existing_index_segments(name, column, index_metas)

logger.info("Segmented index %s committed successfully", name)


def _create_partitioned_index(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
*,
column: str,
index_type: str,
name: str,
replace: bool,
storage_options: dict[str, Any] | None,
fragment_data: list[dict[str, list[int]]],
fragment_ids_to_use: list[int],
num_partitions: int | None,
max_concurrency: int | None,
**kwargs: Any,
) -> None:
"""Legacy partitioned-and-merged index workflow.

Workers build partial index files sharing the same UUID, then the
coordinator merges them with ``merge_index_metadata`` and commits via a
manual ``CreateIndex`` transaction.
"""
# Generate unique index ID (shared across all partitions)
index_id = str(uuid.uuid4())

handler_cls = daft.cls(
FragmentIndexHandler,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ readme = "README.md"
dependencies = [
"lance-namespace>=0.6.0",
"lance-namespace-urllib3-client>=0.6.0",
"pylance>=6.0.0"
"pylance>=7.0.0"
]

[dependency-groups]
Expand Down
Loading
Loading