Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions scripts/flag_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
import uuid
from pathlib import Path

from dex_python.deduplication import (
cluster_duplicates,
find_email_duplicates,
find_fuzzy_name_duplicates,
find_name_and_title_duplicates,
find_phone_duplicates,
)
from dex_python.deduplication import find_all_duplicates

DATA_DIR = Path(os.getenv("DEX_DATA_DIR", "output"))
DEFAULT_DB_PATH = DATA_DIR / "dex_contacts.db"
Expand All @@ -38,18 +32,10 @@ def main(db_path: str = str(DEFAULT_DB_PATH)) -> None:

print("Finding all potential duplicates...")

# Collect all signals
matches = []
matches.extend(find_email_duplicates(conn))
matches.extend(find_phone_duplicates(conn))
matches.extend(find_name_and_title_duplicates(conn))
# Consistent threshold with previous run
matches.extend(find_fuzzy_name_duplicates(conn, threshold=0.98))
# Use shared duplicate detection function
matches, clusters = find_all_duplicates(conn, fuzzy_threshold=0.98)

print(f"Found {len(matches)} duplicate signals.")

# Cluster into entities
clusters = cluster_duplicates(matches)
print(f"Clustered into {len(clusters)} unique duplicate groups.")

if not clusters:
Expand Down
21 changes: 3 additions & 18 deletions scripts/resolve_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@
import sqlite3
from pathlib import Path

from dex_python.deduplication import (
cluster_duplicates,
find_email_duplicates,
find_fuzzy_name_duplicates,
find_name_and_title_duplicates,
find_phone_duplicates,
merge_cluster,
)
from dex_python.deduplication import find_all_duplicates, merge_cluster

DATA_DIR = Path(os.getenv("DEX_DATA_DIR", "output"))
DEFAULT_DB_PATH = DATA_DIR / "dex_contacts.db"
Expand All @@ -26,18 +19,10 @@ def main(db_path: str = str(DEFAULT_DB_PATH)) -> None:

print("Finding all potential duplicates...")

# Collect all signals
matches = []
matches.extend(find_email_duplicates(conn))
matches.extend(find_phone_duplicates(conn))
matches.extend(find_name_and_title_duplicates(conn))
# Using a very high threshold for auto-merging fuzzy matches
matches.extend(find_fuzzy_name_duplicates(conn, threshold=0.98))
# Use shared duplicate detection function
matches, clusters = find_all_duplicates(conn, fuzzy_threshold=0.98)

print(f"Found {len(matches)} duplicate signals.")

# Cluster into entities
clusters = cluster_duplicates(matches)
print(f"Clustered into {len(clusters)} unique entities to be merged.")

if not clusters:
Expand Down
85 changes: 7 additions & 78 deletions src/dex_python/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@

import httpx

from .client_utils import handle_error, should_retry
from .config import Settings
from .exceptions import (
AuthenticationError,
ContactNotFoundError,
DexAPIError,
NoteNotFoundError,
RateLimitError,
ReminderNotFoundError,
ValidationError,
)
from .models import (
ContactCreate,
ContactUpdate,
Expand All @@ -47,9 +39,6 @@
extract_reminders_total,
)

# HTTP status codes that indicate transient failures worth retrying
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}


class AsyncDexClient:
"""Asynchronous client for the Dex CRM API.
Expand Down Expand Up @@ -94,66 +83,6 @@ def __init__(
timeout=30.0,
)

def _should_retry(self, status_code: int) -> bool:
"""Check if a request should be retried based on HTTP status code."""
return status_code in RETRYABLE_STATUS_CODES

def _handle_error(self, response: httpx.Response, endpoint: str) -> None:
"""Convert HTTP error response to appropriate exception.

Args:
response: The HTTP response with error status.
endpoint: The API endpoint that was called.

Raises:
AuthenticationError: For 401 responses.
RateLimitError: For 429 responses.
ValidationError: For 400 responses.
ContactNotFoundError: For 404 on /contacts endpoints.
ReminderNotFoundError: For 404 on /reminders endpoints.
NoteNotFoundError: For 404 on /timeline_items endpoints.
DexAPIError: For all other error responses.
"""
status_code = response.status_code
try:
data = response.json()
except Exception:
data = {}

if status_code == 401:
raise AuthenticationError(
"Invalid API key", status_code=401, response_data=data
)
elif status_code == 429:
retry_after = response.headers.get("Retry-After")
raise RateLimitError(
"Rate limit exceeded",
retry_after=int(retry_after) if retry_after else None,
)
elif status_code == 400:
raise ValidationError(
data.get("error", "Validation error"),
status_code=400,
response_data=data,
)
elif status_code == 404:
if "/contacts/" in endpoint:
contact_id = endpoint.split("/contacts/")[-1].split("/")[0]
raise ContactNotFoundError(contact_id)
elif "/reminders/" in endpoint:
reminder_id = endpoint.split("/reminders/")[-1].split("/")[0]
raise ReminderNotFoundError(reminder_id)
elif "/timeline_items/" in endpoint:
note_id = endpoint.split("/timeline_items/")[-1].split("/")[0]
raise NoteNotFoundError(note_id)
raise DexAPIError("Not found", status_code=404, response_data=data)
else:
raise DexAPIError(
data.get("error", f"API error: {status_code}"),
status_code=status_code,
response_data=data,
)

async def _request_with_retry(
self, method: str, endpoint: str, **kwargs: Any
) -> httpx.Response:
Expand All @@ -177,7 +106,7 @@ async def _request_with_retry(
return response

is_last_attempt = attempt == self.max_retries
if not self._should_retry(response.status_code) or is_last_attempt:
if not should_retry(response.status_code) or is_last_attempt:
return response

# Exponential backoff
Expand Down Expand Up @@ -206,7 +135,7 @@ async def _request(
"""
response = await self._request_with_retry(method, endpoint, **kwargs)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
result: dict[str, Any] = response.json()
return result

Expand All @@ -233,7 +162,7 @@ async def get_contacts(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
result: list[dict[str, Any]] = data.get("contacts", [])
return result
Expand All @@ -257,7 +186,7 @@ async def get_contacts_paginated(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedContacts(
contacts=data.get("contacts", []),
Expand Down Expand Up @@ -386,7 +315,7 @@ async def get_reminders_paginated(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedReminders(
reminders=data.get("reminders", []),
Expand Down Expand Up @@ -484,7 +413,7 @@ async def get_notes_paginated(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedNotes(
notes=data.get("timeline_items", []),
Expand Down
83 changes: 7 additions & 76 deletions src/dex_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@

import httpx

from .client_utils import handle_error, should_retry
from .config import Settings
from .exceptions import (
AuthenticationError,
ContactNotFoundError,
DexAPIError,
NoteNotFoundError,
RateLimitError,
ReminderNotFoundError,
ValidationError,
)
from .models import (
ContactCreate,
ContactUpdate,
Expand All @@ -47,9 +39,6 @@
extract_reminders_total,
)

# HTTP status codes that indicate transient failures worth retrying
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}


class DexClient:
"""Synchronous client for the Dex CRM API.
Expand Down Expand Up @@ -94,65 +83,7 @@ def __init__(
timeout=30.0,
)

def _handle_error(self, response: httpx.Response, endpoint: str) -> None:
"""Convert HTTP error response to appropriate exception.

Args:
response: The HTTP response with error status.
endpoint: The API endpoint that was called.

Raises:
AuthenticationError: For 401 responses.
RateLimitError: For 429 responses.
ValidationError: For 400 responses.
ContactNotFoundError: For 404 on /contacts endpoints.
ReminderNotFoundError: For 404 on /reminders endpoints.
NoteNotFoundError: For 404 on /timeline_items endpoints.
DexAPIError: For all other error responses.
"""
status_code = response.status_code
try:
data = response.json()
except Exception:
data = {}

if status_code == 401:
raise AuthenticationError(
"Invalid API key", status_code=401, response_data=data
)
elif status_code == 429:
retry_after = response.headers.get("Retry-After")
raise RateLimitError(
"Rate limit exceeded",
retry_after=int(retry_after) if retry_after else None,
)
elif status_code == 400:
raise ValidationError(
data.get("error", "Validation error"),
status_code=400,
response_data=data,
)
elif status_code == 404:
if "/contacts/" in endpoint:
contact_id = endpoint.split("/contacts/")[-1].split("/")[0]
raise ContactNotFoundError(contact_id)
elif "/reminders/" in endpoint:
reminder_id = endpoint.split("/reminders/")[-1].split("/")[0]
raise ReminderNotFoundError(reminder_id)
elif "/timeline_items/" in endpoint:
note_id = endpoint.split("/timeline_items/")[-1].split("/")[0]
raise NoteNotFoundError(note_id)
raise DexAPIError("Not found", status_code=404, response_data=data)
else:
raise DexAPIError(
data.get("error", f"API error: {status_code}"),
status_code=status_code,
response_data=data,
)

def _should_retry(self, status_code: int) -> bool:
"""Check if a request should be retried based on HTTP status code."""
return status_code in RETRYABLE_STATUS_CODES

def _request_with_retry(
self, method: str, endpoint: str, **kwargs: Any
Expand All @@ -177,7 +108,7 @@ def _request_with_retry(
return response

is_last_attempt = attempt == self.max_retries
if not self._should_retry(response.status_code) or is_last_attempt:
if not should_retry(response.status_code) or is_last_attempt:
return response

# Exponential backoff
Expand All @@ -204,7 +135,7 @@ def _request(self, method: str, endpoint: str, **kwargs: Any) -> dict[str, Any]:
"""
response = self._request_with_retry(method, endpoint, **kwargs)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
result: dict[str, Any] = response.json()
return result

Expand All @@ -229,7 +160,7 @@ def get_contacts(self, limit: int = 100, offset: int = 0) -> list[dict[str, Any]
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
result: list[dict[str, Any]] = data.get("contacts", [])
return result
Expand All @@ -255,7 +186,7 @@ def get_contacts_paginated(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedContacts(
contacts=data.get("contacts", []),
Expand Down Expand Up @@ -382,7 +313,7 @@ def get_reminders_paginated(
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedReminders(
reminders=data.get("reminders", []),
Expand Down Expand Up @@ -476,7 +407,7 @@ def get_notes_paginated(self, limit: int = 100, offset: int = 0) -> PaginatedNot
params={"limit": limit, "offset": offset},
)
if response.status_code >= 400:
self._handle_error(response, endpoint)
handle_error(response, endpoint)
data: dict[str, Any] = response.json()
return PaginatedNotes(
notes=data.get("timeline_items", []),
Expand Down
Loading