Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion docs/reference/DEDUPE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,60 @@ result = await dedupe(
print(result.data.head())
```

## Strategies

Control what happens after clusters are identified using the `strategy` parameter:

### `select` (default)

Picks the best representative from each cluster. Three columns are added:

- `equivalence_class_id` — rows with the same ID are duplicates of each other
- `equivalence_class_name` — human-readable label for the cluster
- `selected` — True for the canonical record in each cluster

```python
result = await dedupe(
input=crm_data,
equivalence_relation="Same legal entity",
strategy="select",
strategy_prompt="Prefer the record with the most complete contact information",
)
deduped = result.data[result.data["selected"] == True]
```

### `identify`

Cluster only — no selection or combining. Useful when you want to review clusters before deciding what to do.

- `equivalence_class_id` — rows with the same ID are duplicates of each other
- `equivalence_class_name` — human-readable label for the cluster

```python
result = await dedupe(
input=crm_data,
equivalence_relation="Same legal entity",
strategy="identify",
)
```

### `combine`

Synthesizes a single combined row per cluster, merging the best information from all duplicates. Original rows are marked `selected=False`, and new combined rows are added with `selected=True`.

```python
result = await dedupe(
input=crm_data,
equivalence_relation="Same legal entity",
strategy="combine",
strategy_prompt="For each field, keep the most recent and complete value",
)
combined = result.data[result.data["selected"] == True]
```

## What you get back

Three columns added to your data:
Three columns added to your data (when using `select` or `combine` strategy):

- `equivalence_class_id` — rows with the same ID are duplicates of each other
- `equivalence_class_name` — human-readable label for the cluster ("Alexandra Butoi", "Naomi Saphra", etc.)
Expand Down Expand Up @@ -73,6 +124,8 @@ Output (selected rows only):
|------|------|-------------|
| `input` | DataFrame | Data with potential duplicates |
| `equivalence_relation` | str | What makes two rows duplicates |
| `strategy` | str | `"identify"`, `"select"` (default), or `"combine"` |
| `strategy_prompt` | str | Optional instructions for selection or combining |
| `session` | Session | Optional, auto-created if omitted |

## Performance
Expand Down
10 changes: 8 additions & 2 deletions src/everyrow/generated/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from .agent_map_operation import AgentMapOperation
from .agent_map_operation_input_type_1_item import AgentMapOperationInputType1Item
from .agent_map_operation_input_type_2 import AgentMapOperationInputType2
from .agent_map_operation_response_schema_type_0 import AgentMapOperationResponseSchemaType0
from .agent_map_operation_response_schema_type_0 import (
AgentMapOperationResponseSchemaType0,
)
from .billing_response import BillingResponse
from .create_artifact_request import CreateArtifactRequest
from .create_artifact_request_data_type_0_item import CreateArtifactRequestDataType0Item
Expand All @@ -13,6 +15,7 @@
from .dedupe_operation import DedupeOperation
from .dedupe_operation_input_type_1_item import DedupeOperationInputType1Item
from .dedupe_operation_input_type_2 import DedupeOperationInputType2
from .dedupe_operation_strategy import DedupeOperationStrategy
from .error_response import ErrorResponse
from .error_response_details_type_0 import ErrorResponseDetailsType0
from .http_validation_error import HTTPValidationError
Expand All @@ -39,7 +42,9 @@
from .single_agent_operation import SingleAgentOperation
from .single_agent_operation_input_type_1_item import SingleAgentOperationInputType1Item
from .single_agent_operation_input_type_2 import SingleAgentOperationInputType2
from .single_agent_operation_response_schema_type_0 import SingleAgentOperationResponseSchemaType0
from .single_agent_operation_response_schema_type_0 import (
SingleAgentOperationResponseSchemaType0,
)
from .task_result_response import TaskResultResponse
from .task_result_response_data_type_0_item import TaskResultResponseDataType0Item
from .task_result_response_data_type_1 import TaskResultResponseDataType1
Expand All @@ -61,6 +66,7 @@
"DedupeOperation",
"DedupeOperationInputType1Item",
"DedupeOperationInputType2",
"DedupeOperationStrategy",
"ErrorResponse",
"ErrorResponseDetailsType0",
"HTTPValidationError",
Expand Down
40 changes: 40 additions & 0 deletions src/everyrow/generated/models/dedupe_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from attrs import define as _attrs_define
from attrs import field as _attrs_field

from ..models.dedupe_operation_strategy import DedupeOperationStrategy
from ..types import UNSET, Unset

if TYPE_CHECKING:
Expand All @@ -26,11 +27,18 @@ class DedupeOperation:
list of JSON objects
equivalence_relation (str): Description of what makes two rows equivalent/duplicates
session_id (None | Unset | UUID): Session ID. If not provided, a new session is auto-created for this task.
strategy (DedupeOperationStrategy | Unset): Controls what happens after duplicate clusters are identified.
IDENTIFY = cluster only (no selection/combining), SELECT = pick best representative per cluster (default),
COMBINE = synthesize a merged row per cluster from all duplicates. Default: DedupeOperationStrategy.SELECT.
strategy_prompt (None | str | Unset): Optional natural-language instructions guiding how the LLM selects or
combines rows (only used with SELECT and COMBINE strategies). Example: "Prefer the most complete record".
"""

input_: DedupeOperationInputType2 | list[DedupeOperationInputType1Item] | UUID
equivalence_relation: str
session_id: None | Unset | UUID = UNSET
strategy: DedupeOperationStrategy | Unset = DedupeOperationStrategy.SELECT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does unset work? would it work if we have a default already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Unset is how the generated client distinguishes "not provided" from an explicit value. When UNSET is passed, the field is omitted from the serialized JSON request, letting the server apply its own default. The default of DedupeOperationStrategy.SELECT here is the server's documented default — it's used when someone instantiates the model directly without specifying strategy. But from ops.py, when the user passes strategy=None, we pass UNSET to let the server decide. So | Unset in the type is needed because it's a valid runtime value (meaning "omit from request"), even though the attr default is SELECT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above was Claude :)

strategy_prompt: None | str | Unset = UNSET
additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)

def to_dict(self) -> dict[str, Any]:
Expand All @@ -56,6 +64,16 @@ def to_dict(self) -> dict[str, Any]:
else:
session_id = self.session_id

strategy: str | Unset = UNSET
if not isinstance(self.strategy, Unset):
strategy = self.strategy.value

strategy_prompt: None | str | Unset
if isinstance(self.strategy_prompt, Unset):
strategy_prompt = UNSET
else:
strategy_prompt = self.strategy_prompt

field_dict: dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
Expand All @@ -66,6 +84,10 @@ def to_dict(self) -> dict[str, Any]:
)
if session_id is not UNSET:
field_dict["session_id"] = session_id
if strategy is not UNSET:
field_dict["strategy"] = strategy
if strategy_prompt is not UNSET:
field_dict["strategy_prompt"] = strategy_prompt

return field_dict

Expand Down Expand Up @@ -125,10 +147,28 @@ def _parse_session_id(data: object) -> None | Unset | UUID:

session_id = _parse_session_id(d.pop("session_id", UNSET))

_strategy = d.pop("strategy", UNSET)
strategy: DedupeOperationStrategy | Unset
if isinstance(_strategy, Unset):
strategy = UNSET
else:
strategy = DedupeOperationStrategy(_strategy)

def _parse_strategy_prompt(data: object) -> None | str | Unset:
if data is None:
return data
if isinstance(data, Unset):
return data
return cast(None | str | Unset, data)

strategy_prompt = _parse_strategy_prompt(d.pop("strategy_prompt", UNSET))

dedupe_operation = cls(
input_=input_,
equivalence_relation=equivalence_relation,
session_id=session_id,
strategy=strategy,
strategy_prompt=strategy_prompt,
)

dedupe_operation.additional_properties = d
Expand Down
10 changes: 10 additions & 0 deletions src/everyrow/generated/models/dedupe_operation_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class DedupeOperationStrategy(str, Enum):
COMBINE = "combine"
IDENTIFY = "identify"
SELECT = "select"

def __str__(self) -> str:
return str(self.value)
38 changes: 35 additions & 3 deletions src/everyrow/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CreateArtifactRequestDataType1,
DedupeOperation,
DedupeOperationInputType1Item,
DedupeOperationStrategy,
LLMEnumPublic,
MergeOperation,
MergeOperationLeftInputType1Item,
Expand Down Expand Up @@ -633,16 +634,39 @@ async def dedupe(
equivalence_relation: str,
session: Session | None = None,
input: DataFrame | UUID | TableResult | None = None,
strategy: Literal["identify", "select", "combine"] | None = None,
strategy_prompt: str | None = None,
) -> TableResult:
"""Dedupe a table by removing duplicates using AI.

Args:
equivalence_relation: Description of what makes items equivalent
equivalence_relation: Natural-language description of what makes two rows
equivalent/duplicates. Be as specific as needed — the LLM uses this to
reason about equivalence, handling abbreviations, typos, name variations,
and entity relationships that string matching cannot capture.
session: Optional session. If not provided, one will be created automatically.
input: The input table (DataFrame, UUID, or TableResult)
input: The input table (DataFrame, UUID, or TableResult).
strategy: Controls what happens after duplicate clusters are identified.
- "identify": Cluster only. Adds `equivalence_class_id` and
`equivalence_class_name` columns but does NOT select or remove any rows.
Use this when you want to review clusters before deciding what to do.
- "select" (default): Picks the best representative row from each cluster.
Adds `equivalence_class_id`, `equivalence_class_name`, and `selected`
columns. Rows with `selected=True` are the canonical records. To get the
deduplicated table: `result.data[result.data["selected"] == True]`.
- "combine": Synthesizes a single combined row per cluster by merging the
best information from all duplicates. Original rows are kept with
`selected=False`, and new combined rows are appended with `selected=True`.
Useful when no single row has all the information (e.g., one row has the
email, another has the phone number).
strategy_prompt: Optional natural-language instructions that guide how the LLM
selects or combines rows. Only used with "select" and "combine" strategies.
Examples: "Prefer the record with the most complete contact information",
"For each field, keep the most recent and complete value",
"Prefer records from the CRM system over spreadsheet imports".

Returns:
TableResult containing the deduped table
TableResult containing the deduped table with cluster metadata columns.
"""
if input is None:
raise EveryrowError("input is required for dedupe")
Expand All @@ -652,6 +676,8 @@ async def dedupe(
session=internal_session,
input=input,
equivalence_relation=equivalence_relation,
strategy=strategy,
strategy_prompt=strategy_prompt,
)
result = await cohort_task.await_result()
if isinstance(result, TableResult):
Expand All @@ -661,6 +687,8 @@ async def dedupe(
session=session,
input=input,
equivalence_relation=equivalence_relation,
strategy=strategy,
strategy_prompt=strategy_prompt,
)
result = await cohort_task.await_result()
if isinstance(result, TableResult):
Expand All @@ -672,6 +700,8 @@ async def dedupe_async(
session: Session,
input: DataFrame | UUID | TableResult,
equivalence_relation: str,
strategy: Literal["identify", "select", "combine"] | None = None,
strategy_prompt: str | None = None,
) -> EveryrowTask[BaseModel]:
"""Submit a dedupe task asynchronously."""
input_data = _prepare_table_input(input, DedupeOperationInputType1Item)
Expand All @@ -680,6 +710,8 @@ async def dedupe_async(
input_=input_data, # type: ignore
equivalence_relation=equivalence_relation,
session_id=session.session_id,
strategy=DedupeOperationStrategy(strategy) if strategy is not None else UNSET,
strategy_prompt=strategy_prompt if strategy_prompt is not None else UNSET,
)

response = await dedupe_operations_dedupe_post.asyncio(client=session.client, body=body)
Expand Down
99 changes: 99 additions & 0 deletions tests/integration/test_dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,102 @@ async def test_dedupe_unique_items_all_selected():
assert result.data["equivalence_class_id"].nunique() == 3
# All should be selected since they're unique
assert result.data["selected"].all() # pyright: ignore[reportGeneralTypeIssues]


async def test_dedupe_identify_strategy_no_selection():
"""Test that identify strategy clusters but does not add a 'selected' column."""
input_df = pd.DataFrame(
[
{"title": "Paper A - Preprint", "version": "v1"},
{"title": "Paper A - Published", "version": "v2"},
{"title": "Paper B", "version": "v1"},
]
)

result = await dedupe(
equivalence_relation="""
Two entries are duplicates if they are versions of the same paper.
"Paper A - Preprint" and "Paper A - Published" are the same paper.
""",
input=input_df,
strategy="identify",
)

assert isinstance(result, TableResult)
assert "equivalence_class_id" in result.data.columns
# identify strategy should NOT add a selected column
assert "selected" not in result.data.columns


async def test_dedupe_combine_strategy_creates_combined_rows():
"""Test that combine strategy produces combined rows marked as selected."""
input_df = pd.DataFrame(
[
{"title": "Paper A - Preprint", "year": "2023"},
{"title": "Paper A - Published", "year": "2024"},
{"title": "Paper B", "year": "2023"},
]
)

result = await dedupe(
equivalence_relation="""
Two entries are duplicates if they are versions of the same paper.
"Paper A - Preprint" and "Paper A - Published" are the same paper.
""",
input=input_df,
strategy="combine",
)

assert isinstance(result, TableResult)
assert "equivalence_class_id" in result.data.columns
assert "selected" in result.data.columns

# There should be at least one selected row
selected_rows = result.data[result.data["selected"] == True] # noqa: E712
assert len(selected_rows) >= 1


async def test_dedupe_select_strategy_explicit():
"""Test that explicitly passing strategy='select' works the same as the default."""
input_df = pd.DataFrame(
[
{"item": "Apple"},
{"item": "Banana"},
]
)

result = await dedupe(
equivalence_relation="Items are duplicates only if they are the exact same fruit name.",
input=input_df,
strategy="select",
)

assert isinstance(result, TableResult)
assert "equivalence_class_id" in result.data.columns
assert "selected" in result.data.columns
# All unique items should be selected
assert result.data["selected"].all() # pyright: ignore[reportGeneralTypeIssues]


async def test_dedupe_with_strategy_prompt():
"""Test that strategy_prompt parameter is accepted."""
input_df = pd.DataFrame(
[
{"title": "Paper A - Preprint", "version": "v1"},
{"title": "Paper A - Published", "version": "v2"},
{"title": "Paper B", "version": "v1"},
]
)

result = await dedupe(
equivalence_relation="""
Two entries are duplicates if they are versions of the same paper.
"Paper A - Preprint" and "Paper A - Published" are the same paper.
""",
input=input_df,
strategy="select",
strategy_prompt="Always prefer the published version over the preprint.",
)

assert isinstance(result, TableResult)
assert "selected" in result.data.columns