diff --git a/COMPLEXITY_IMPROVEMENTS.md b/COMPLEXITY_IMPROVEMENTS.md new file mode 100644 index 0000000..cabcb5a --- /dev/null +++ b/COMPLEXITY_IMPROVEMENTS.md @@ -0,0 +1,98 @@ +# Code Complexity Improvements Summary + +## Overview +This PR reduces code complexity by extracting helper functions and removing code duplication between the synchronous and asynchronous API clients. + +## Changes Made + +### 1. Created Shared Error Handler Module (`src/dex_python/error_handler.py`) +**Problem:** The `_handle_error` method was duplicated identically in both `client.py` and `async_client.py` (51 lines each, 102 lines total). + +**Solution:** +- Created a new `error_handler.py` module with `handle_error_response()` function +- Both clients now delegate to this shared function +- Reduced duplication by ~100 lines +- Improved maintainability - error handling logic is now in one place + +### 2. Simplified `merge_cluster` Function (deduplication.py) +**Problem:** Cyclomatic complexity of 16 (Grade C), making it hard to understand and test. + +**Solution:** Extracted three helper functions: +- `_select_primary_row()`: Selects which contact should be primary (CC: 7) +- `_merge_contact_fields()`: Merges data from multiple contacts (CC: 6) +- `_consolidate_related_records()`: Consolidates emails and phones (CC: 3) + +**Result:** `merge_cluster` reduced from CC 16 → 3 (Grade A) + +### 3. Simplified `find_fuzzy_name_duplicates` Function (deduplication.py) +**Problem:** Cyclomatic complexity of 12 (Grade C), complex nested loops. + +**Solution:** Extracted two helper functions: +- `_create_soundex_blocks()`: Creates soundex-based blocks for efficient matching (CC: 7) +- `_find_matches_in_block()`: Finds fuzzy matches within a single block (CC: 4) + +**Result:** `find_fuzzy_name_duplicates` reduced from CC 12 → 3 (Grade A) + +### 4. Simplified `main` Function (scripts/review_duplicates.py) +**Problem:** Cyclomatic complexity of 20 (Grade C), handling too many concerns. + +**Solution:** Extracted three helper functions: +- `_fetch_unresolved_groups()`: Fetches duplicate groups from database (CC: 2) +- `_display_contact_group()`: Displays contacts as a table (CC: 10) +- `_handle_user_choice()`: Processes user's selection (CC: 2) + +**Result:** `main` reduced from CC 20 → 10 (Grade B, 50% improvement) + +### 5. Simplified `save_contacts_batch` Function (scripts/sync_with_integrity.py) +**Problem:** Cyclomatic complexity of 12 (Grade C), doing too much in one function. + +**Solution:** Extracted three helper functions: +- `_check_contact_changed()`: Checks if contact needs updating (CC: 3) +- `_enrich_contact_data()`: Parses names and extracts job data (CC: 1) +- `_save_contact_related_data()`: Saves emails and phones (CC: 5) + +**Result:** `save_contacts_batch` reduced from CC 12 → 8 (Grade B, 33% improvement) + +## Test Coverage + +Added comprehensive tests for all new helper functions: + +### Error Handler Tests (`tests/unit/test_error_handler.py`) +- 13 new tests covering all error scenarios +- Tests for 401, 429, 400, 404, 500 status codes +- Tests for entity-specific 404 errors (contacts, reminders, notes) +- Tests for edge cases (invalid JSON, missing error messages) + +### Deduplication Helper Tests (`tests/unit/deduplication/test_helpers.py`) +- 13 new tests for helper functions +- Tests for `_select_primary_row()` with explicit and auto-selection +- Tests for `_merge_contact_fields()` field merging logic +- Tests for `_create_soundex_blocks()` blocking algorithm +- Tests for `_find_matches_in_block()` fuzzy matching +- Tests for `_consolidate_related_records()` data consolidation + +## Testing Results + +- All 186 tests pass (160 original + 26 new) +- No regressions in existing functionality +- Type checking passes with `mypy --strict` +- Linting passes with `ruff` + +## Benefits + +1. **Improved Maintainability**: Functions are smaller and focused on single responsibilities +2. **Better Testability**: Helper functions can be tested independently +3. **Reduced Duplication**: Shared error handler eliminates ~100 lines of duplicate code +4. **Enhanced Readability**: Lower complexity makes code easier to understand +5. **Easier Debugging**: Smaller functions are easier to reason about and debug + +## Cyclomatic Complexity Summary + +| Function | Before | After | Improvement | +|----------|--------|-------|-------------| +| `merge_cluster` | 16 (C) | 3 (A) | 81% ↓ | +| `find_fuzzy_name_duplicates` | 12 (C) | 3 (A) | 75% ↓ | +| `review_duplicates.main` | 20 (C) | 10 (B) | 50% ↓ | +| `save_contacts_batch` | 12 (C) | 8 (B) | 33% ↓ | + +All functions that were Grade C (high complexity) are now Grade A or B. diff --git a/scripts/review_duplicates.py b/scripts/review_duplicates.py index f7f28d1..89e2dbc 100644 --- a/scripts/review_duplicates.py +++ b/scripts/review_duplicates.py @@ -22,6 +22,142 @@ def setup_db(cursor: sqlite3.Cursor) -> None: pass +def _fetch_unresolved_groups(cursor: sqlite3.Cursor) -> list[str]: + """Fetch all duplicate groups that haven't been resolved yet. + + Args: + cursor: Database cursor. + + Returns: + List of group IDs. + """ + cursor.execute(""" + SELECT DISTINCT duplicate_group_id + FROM contacts + WHERE duplicate_group_id IS NOT NULL + AND (duplicate_resolution IS NULL OR duplicate_resolution = '') + """) + return [row[0] for row in cursor.fetchall()] + + +def _display_contact_group( + console: Console, cursor: sqlite3.Cursor, group_id: str, group_num: int, total: int +) -> list[str]: + """Display contacts in a duplicate group as a table. + + Args: + console: Rich console for output. + cursor: Database cursor. + group_id: Group ID to display. + group_num: Current group number. + total: Total number of groups. + + Returns: + List of contact IDs in this group. + """ + console.rule(f"Group {group_num}/{total} (ID: {group_id})") + + # Fetch contacts in this group + cursor.execute( + """ + SELECT id, first_name, last_name, job_title + FROM contacts + WHERE duplicate_group_id = ? + """, + (group_id,), + ) + contacts = cursor.fetchall() + + if len(contacts) < 2: + return [] + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("#", style="dim", width=4) + table.add_column("ID", style="cyan", width=12, overflow="ellipsis") + table.add_column("Name", style="bold") + table.add_column("Job Title") + table.add_column("Emails") + table.add_column("Phones") + + contact_ids = [] + + for idx, row in enumerate(contacts): + c_id, first, last, job = row + contact_ids.append(c_id) + + cursor.execute("SELECT email FROM emails WHERE contact_id = ?", (c_id,)) + emails = "\n".join([r[0] for r in cursor.fetchall() if r[0]]) + + cursor.execute("SELECT phone_number FROM phones WHERE contact_id = ?", (c_id,)) + phones = "\n".join([r[0] for r in cursor.fetchall() if r[0]]) + + table.add_row( + str(idx + 1), + c_id, + f"{first or ''} {last or ''}".strip(), + job or "", + emails, + phones, + ) + + console.print(table) + return contact_ids + + +def _handle_user_choice( + cursor: sqlite3.Cursor, + conn: sqlite3.Connection, + console: Console, + choice: str, + contact_ids: list[str], + group_id: str, +) -> tuple[int, int]: + """Handle user's choice for a duplicate group. + + Args: + cursor: Database cursor. + conn: Database connection. + console: Rich console for output. + choice: User's choice ('s', 'q', or a number). + contact_ids: List of contact IDs in the group. + group_id: Group ID. + + Returns: + Tuple of (labeled_count, rejected_count) increments. + """ + if choice == "s": + # Mark as false positive + cursor.execute( + """ + UPDATE contacts + SET duplicate_resolution = 'false_positive' + WHERE duplicate_group_id = ? + """, + (group_id,), + ) + conn.commit() + console.print("[yellow]✔ Marked as false positive.[/yellow]\n") + return 0, 1 + + # Label as confirmed with selected primary + selected_idx = int(choice) - 1 + primary_id = contact_ids[selected_idx] + + cursor.execute( + """ + UPDATE contacts + SET duplicate_resolution = 'confirmed', + primary_contact_id = ? + WHERE duplicate_group_id = ? + """, + (primary_id, group_id), + ) + conn.commit() + res_msg = f"[green]✔ Confirmed. Primary: ...{primary_id[-8:]}[/green]\n" + console.print(res_msg) + return 1, 0 + + def main() -> None: data_dir = Path(os.getenv("DEX_DATA_DIR", "output")) db_path = data_dir / "dex_contacts.db" @@ -42,14 +178,7 @@ def main() -> None: setup_db(cursor) - # Get all distinct group IDs that haven't been resolved yet - cursor.execute(""" - SELECT DISTINCT duplicate_group_id - FROM contacts - WHERE duplicate_group_id IS NOT NULL - AND (duplicate_resolution IS NULL OR duplicate_resolution = '') - """) - groups = [row[0] for row in cursor.fetchall()] + groups = _fetch_unresolved_groups(cursor) console = Console() console.clear() @@ -68,57 +197,15 @@ def main() -> None: try: for i, group_id in enumerate(groups): - console.rule(f"Group {i + 1}/{len(groups)} (ID: {group_id})") - - # Fetch contacts in this group - cursor.execute( - """ - SELECT id, first_name, last_name, job_title - FROM contacts - WHERE duplicate_group_id = ? - """, - (group_id,), + contact_ids = _display_contact_group( + console, cursor, group_id, i + 1, len(groups) ) - contacts = cursor.fetchall() - if len(contacts) < 2: + if len(contact_ids) < 2: continue - table = Table(show_header=True, header_style="bold magenta") - table.add_column("#", style="dim", width=4) - table.add_column("ID", style="cyan", width=12, overflow="ellipsis") - table.add_column("Name", style="bold") - table.add_column("Job Title") - table.add_column("Emails") - table.add_column("Phones") - - contact_ids = [] - - for idx, row in enumerate(contacts): - c_id, first, last, job = row - contact_ids.append(c_id) - - cursor.execute("SELECT email FROM emails WHERE contact_id = ?", (c_id,)) - emails = "\n".join([r[0] for r in cursor.fetchall() if r[0]]) - - cursor.execute( - "SELECT phone_number FROM phones WHERE contact_id = ?", (c_id,) - ) - phones = "\n".join([r[0] for r in cursor.fetchall() if r[0]]) - - table.add_row( - str(idx + 1), - c_id, - f"{first or ''} {last or ''}".strip(), - job or "", - emails, - phones, - ) - - console.print(table) - - # Options - choices = [str(x + 1) for x in range(len(contacts))] + ["s", "q"] + # Get user choice + choices = [str(x + 1) for x in range(len(contact_ids))] + ["s", "q"] choice = Prompt.ask( "\n[bold]Actions:[/bold]\n" " [cyan][1-N][/cyan] Label this ID as PRIMARY (Confirm Duplicates)\n" @@ -133,38 +220,12 @@ def main() -> None: console.print("\n[bold red]Exiting...[/bold red]") break - elif choice == "s": - # Mark as false positive - cursor.execute( - """ - UPDATE contacts - SET duplicate_resolution = 'false_positive' - WHERE duplicate_group_id = ? - """, - (group_id,), - ) - conn.commit() - console.print("[yellow]✔ Marked as false positive.[/yellow]\n") - rejected_count += 1 - - else: - # Label as confirmed with selected primary - selected_idx = int(choice) - 1 - primary_id = contact_ids[selected_idx] - - cursor.execute( - """ - UPDATE contacts - SET duplicate_resolution = 'confirmed', - primary_contact_id = ? - WHERE duplicate_group_id = ? - """, - (primary_id, group_id), - ) - conn.commit() - res_msg = f"[green]✔ Confirmed. Primary: ...{primary_id[-8:]}[/green]\n" - console.print(res_msg) - labeled_count += 1 + # Handle the choice + labeled_inc, rejected_inc = _handle_user_choice( + cursor, conn, console, choice, contact_ids, group_id + ) + labeled_count += labeled_inc + rejected_count += rejected_inc except KeyboardInterrupt: console.print("\n[bold red]Interrupted![/bold red]") diff --git a/scripts/sync_with_integrity.py b/scripts/sync_with_integrity.py index f553c72..75ae33d 100644 --- a/scripts/sync_with_integrity.py +++ b/scripts/sync_with_integrity.py @@ -146,6 +146,87 @@ def compute_hash(data: dict[str, Any]) -> str: return hashlib.sha256(json_str.encode("utf-8")).hexdigest() +def _check_contact_changed( + cursor: sqlite3.Cursor, c_id: str, new_hash: str +) -> tuple[bool, dict[str, Any] | None]: + """Check if contact has changed and retrieve existing dedup metadata. + + Args: + cursor: Database cursor. + c_id: Contact ID. + new_hash: New hash of contact data. + + Returns: + Tuple of (has_changed, existing_dedup_metadata). + """ + cursor.execute( + """SELECT record_hash, duplicate_group_id, duplicate_resolution, + primary_contact_id FROM contacts WHERE id = ?""", + (c_id,), + ) + row = cursor.fetchone() + + if not row: + return True, None + + if row[0] == new_hash: + return False, None + + existing_dedup = { + "duplicate_group_id": row[1], + "duplicate_resolution": row[2], + "primary_contact_id": row[3], + } + return True, existing_dedup + + +def _enrich_contact_data( + item: dict[str, Any] +) -> tuple[dict[str, Any], dict[str, Any]]: + """Parse name and extract company/role from contact data. + + Args: + item: Contact data dictionary. + + Returns: + Tuple of (name_data, job_data). + """ + name_data = parse_contact_name(item) + job_data = parse_job_title(item.get("job_title")) + return name_data, job_data + + +def _save_contact_related_data( + cursor: sqlite3.Cursor, c_id: str, item: dict[str, Any] +) -> None: + """Save emails and phones for a contact. + + Args: + cursor: Database cursor. + c_id: Contact ID. + item: Contact data dictionary. + """ + # Refresh derived tables + cursor.execute("DELETE FROM emails WHERE contact_id = ?", (c_id,)) + cursor.execute("DELETE FROM phones WHERE contact_id = ?", (c_id,)) + + for email_item in item.get("emails", []): + if e := email_item.get("email"): + cursor.execute( + "INSERT INTO emails (contact_id, email) VALUES (?, ?)", (c_id, e) + ) + + for phone_item in item.get("phones", []): + if p := phone_item.get("phone_number"): + cursor.execute( + """ + INSERT INTO phones (contact_id, phone_number, label) + VALUES (?, ?, ?) + """, + (c_id, p, phone_item.get("label")), + ) + + def save_contacts_batch( conn: sqlite3.Connection, items: list[dict[str, Any]], @@ -162,28 +243,18 @@ def save_contacts_batch( continue new_hash = compute_hash(item) - cursor.execute( - """SELECT record_hash, duplicate_group_id, duplicate_resolution, - primary_contact_id FROM contacts WHERE id = ?""", - (c_id,), - ) - row = cursor.fetchone() + has_changed, existing_dedup = _check_contact_changed(cursor, c_id, new_hash) - # Preserve dedup metadata from existing record - existing_dedup = None - if row: - if row[0] == new_hash: - unchanged += 1 - continue + if not has_changed: + unchanged += 1 + continue + + if existing_dedup: updated += 1 - existing_dedup = { - "duplicate_group_id": row[1], - "duplicate_resolution": row[2], - "primary_contact_id": row[3], - } else: added += 1 + # Extract basic fields first = item.get("first_name") last = item.get("last_name") job = item.get("job_title") @@ -191,16 +262,8 @@ def save_contacts_batch( website = item.get("website") now = datetime.now(UTC).isoformat() - # Parse name - name_data = parse_contact_name(item) - name_given = name_data.get("name_given") - name_surname = name_data.get("name_surname") - name_parsed = name_data.get("name_parsed") - - # Extract company/role from job title - job_data = parse_job_title(job) - company = job_data.get("company") - role = job_data.get("role") + # Enrich contact data + name_data, job_data = _enrich_contact_data(item) # Use preserved dedup metadata or None for new contacts dup_group = existing_dedup["duplicate_group_id"] if existing_dedup else None @@ -231,33 +294,16 @@ def save_contacts_batch( dup_group, dup_resolution, primary_id, - name_given, - name_surname, - name_parsed, - company, - role, + name_data.get("name_given"), + name_data.get("name_surname"), + name_data.get("name_parsed"), + job_data.get("company"), + job_data.get("role"), ), ) - # Refresh derived tables - cursor.execute("DELETE FROM emails WHERE contact_id = ?", (c_id,)) - cursor.execute("DELETE FROM phones WHERE contact_id = ?", (c_id,)) - - for email_item in item.get("emails", []): - if e := email_item.get("email"): - cursor.execute( - "INSERT INTO emails (contact_id, email) VALUES (?, ?)", (c_id, e) - ) - - for phone_item in item.get("phones", []): - if p := phone_item.get("phone_number"): - cursor.execute( - """ - INSERT INTO phones (contact_id, phone_number, label) - VALUES (?, ?, ?) - """, - (c_id, p, phone_item.get("label")), - ) + # Save related data + _save_contact_related_data(cursor, c_id, item) conn.commit() return added, updated, unchanged diff --git a/src/dex_python/async_client.py b/src/dex_python/async_client.py index 8a98b25..220c366 100644 --- a/src/dex_python/async_client.py +++ b/src/dex_python/async_client.py @@ -21,15 +21,7 @@ import httpx from .config import Settings -from .exceptions import ( - AuthenticationError, - ContactNotFoundError, - DexAPIError, - NoteNotFoundError, - RateLimitError, - ReminderNotFoundError, - ValidationError, -) +from .error_handler import handle_error_response from .models import ( ContactCreate, ContactUpdate, @@ -101,58 +93,16 @@ def _should_retry(self, status_code: int) -> bool: def _handle_error(self, response: httpx.Response, endpoint: str) -> None: """Convert HTTP error response to appropriate exception. + This method delegates to the shared error handler. + Args: response: The HTTP response with error status. endpoint: The API endpoint that was called. Raises: - AuthenticationError: For 401 responses. - RateLimitError: For 429 responses. - ValidationError: For 400 responses. - ContactNotFoundError: For 404 on /contacts endpoints. - ReminderNotFoundError: For 404 on /reminders endpoints. - NoteNotFoundError: For 404 on /timeline_items endpoints. - DexAPIError: For all other error responses. + Various DexAPIError subclasses based on status code. """ - status_code = response.status_code - try: - data = response.json() - except Exception: - data = {} - - if status_code == 401: - raise AuthenticationError( - "Invalid API key", status_code=401, response_data=data - ) - elif status_code == 429: - retry_after = response.headers.get("Retry-After") - raise RateLimitError( - "Rate limit exceeded", - retry_after=int(retry_after) if retry_after else None, - ) - elif status_code == 400: - raise ValidationError( - data.get("error", "Validation error"), - status_code=400, - response_data=data, - ) - elif status_code == 404: - if "/contacts/" in endpoint: - contact_id = endpoint.split("/contacts/")[-1].split("/")[0] - raise ContactNotFoundError(contact_id) - elif "/reminders/" in endpoint: - reminder_id = endpoint.split("/reminders/")[-1].split("/")[0] - raise ReminderNotFoundError(reminder_id) - elif "/timeline_items/" in endpoint: - note_id = endpoint.split("/timeline_items/")[-1].split("/")[0] - raise NoteNotFoundError(note_id) - raise DexAPIError("Not found", status_code=404, response_data=data) - else: - raise DexAPIError( - data.get("error", f"API error: {status_code}"), - status_code=status_code, - response_data=data, - ) + handle_error_response(response, endpoint) async def _request_with_retry( self, method: str, endpoint: str, **kwargs: Any diff --git a/src/dex_python/client.py b/src/dex_python/client.py index b1c9ca9..59d3bb6 100644 --- a/src/dex_python/client.py +++ b/src/dex_python/client.py @@ -21,15 +21,7 @@ import httpx from .config import Settings -from .exceptions import ( - AuthenticationError, - ContactNotFoundError, - DexAPIError, - NoteNotFoundError, - RateLimitError, - ReminderNotFoundError, - ValidationError, -) +from .error_handler import handle_error_response from .models import ( ContactCreate, ContactUpdate, @@ -97,58 +89,16 @@ def __init__( def _handle_error(self, response: httpx.Response, endpoint: str) -> None: """Convert HTTP error response to appropriate exception. + This method delegates to the shared error handler. + Args: response: The HTTP response with error status. endpoint: The API endpoint that was called. Raises: - AuthenticationError: For 401 responses. - RateLimitError: For 429 responses. - ValidationError: For 400 responses. - ContactNotFoundError: For 404 on /contacts endpoints. - ReminderNotFoundError: For 404 on /reminders endpoints. - NoteNotFoundError: For 404 on /timeline_items endpoints. - DexAPIError: For all other error responses. + Various DexAPIError subclasses based on status code. """ - status_code = response.status_code - try: - data = response.json() - except Exception: - data = {} - - if status_code == 401: - raise AuthenticationError( - "Invalid API key", status_code=401, response_data=data - ) - elif status_code == 429: - retry_after = response.headers.get("Retry-After") - raise RateLimitError( - "Rate limit exceeded", - retry_after=int(retry_after) if retry_after else None, - ) - elif status_code == 400: - raise ValidationError( - data.get("error", "Validation error"), - status_code=400, - response_data=data, - ) - elif status_code == 404: - if "/contacts/" in endpoint: - contact_id = endpoint.split("/contacts/")[-1].split("/")[0] - raise ContactNotFoundError(contact_id) - elif "/reminders/" in endpoint: - reminder_id = endpoint.split("/reminders/")[-1].split("/")[0] - raise ReminderNotFoundError(reminder_id) - elif "/timeline_items/" in endpoint: - note_id = endpoint.split("/timeline_items/")[-1].split("/")[0] - raise NoteNotFoundError(note_id) - raise DexAPIError("Not found", status_code=404, response_data=data) - else: - raise DexAPIError( - data.get("error", f"API error: {status_code}"), - status_code=status_code, - response_data=data, - ) + handle_error_response(response, endpoint) def _should_retry(self, status_code: int) -> bool: """Check if a request should be retried based on HTTP status code.""" diff --git a/src/dex_python/deduplication.py b/src/dex_python/deduplication.py index 083ed2e..aed4769 100644 --- a/src/dex_python/deduplication.py +++ b/src/dex_python/deduplication.py @@ -182,6 +182,70 @@ def find_name_and_title_duplicates(conn: sqlite3.Connection) -> list[dict[str, A return results +def _create_soundex_blocks( + rows: list[tuple[str, str, str]] +) -> dict[str, list[dict[str, str]]]: + """Create soundex-based blocks for efficient fuzzy matching. + + Groups contacts by the soundex/metaphone code of their last name to + reduce O(n²) comparisons to near-linear time. + + Args: + rows: List of (id, first_name, last_name) tuples. + + Returns: + Dictionary mapping soundex keys to lists of contact records. + """ + blocks: dict[str, list[dict[str, str]]] = {} + for rid, first, last in rows: + first, last = first.strip(), last.strip() + + # Skip empty names after stripping + if not first or not last: + continue + + try: + key = jellyfish.metaphone(last) or last.lower()[:2] + except Exception: + key = last.lower()[:2] + + if key not in blocks: + blocks[key] = [] + blocks[key].append({"id": rid, "full_name": f"{first} {last}"}) + + return blocks + + +def _find_matches_in_block( + items: list[dict[str, str]], threshold: float +) -> list[dict[str, Any]]: + """Find fuzzy name matches within a single block. + + Args: + items: List of contacts in the same soundex block. + threshold: Minimum Jaro-Winkler similarity score (0.0-1.0). + + Returns: + List of match dictionaries. + """ + matches = [] + for i in range(len(items)): + for j in range(i + 1, len(items)): + p1, p2 = items[i], items[j] + score = jellyfish.jaro_winkler_similarity(p1["full_name"], p2["full_name"]) + if score >= threshold: + matches.append( + { + "match_type": "fuzzy_name", + "match_value": ( + f"{p1['full_name']} <-> {p2['full_name']} ({score:.2f})" + ), + "contact_ids": [p1["id"], p2["id"]], + } + ) + return matches + + def find_fuzzy_name_duplicates( conn: sqlite3.Connection, threshold: float = 0.9 ) -> list[dict[str, Any]]: @@ -208,43 +272,16 @@ def find_fuzzy_name_duplicates( cursor.execute(query) rows = cursor.fetchall() - blocks: dict[str, list[dict[str, str]]] = {} - for rid, first, last in rows: - first, last = first.strip(), last.strip() - - # Skip empty names after stripping - if not first or not last: - continue - - try: - key = jellyfish.metaphone(last) or last.lower()[:2] - except Exception: - key = last.lower()[:2] - - if key not in blocks: - blocks[key] = [] - blocks[key].append({"id": rid, "full_name": f"{first} {last}"}) + # Create soundex-based blocks for efficient comparison + blocks = _create_soundex_blocks(rows) + # Find matches within each block results = [] for items in blocks.values(): if len(items) < 2: continue - for i in range(len(items)): - for j in range(i + 1, len(items)): - p1, p2 = items[i], items[j] - score = jellyfish.jaro_winkler_similarity( - p1["full_name"], p2["full_name"] - ) - if score >= threshold: - results.append( - { - "match_type": "fuzzy_name", - "match_value": ( - f"{p1['full_name']} <-> {p2['full_name']} ({score:.2f})" - ), - "contact_ids": [p1["id"], p2["id"]], - } - ) + results.extend(_find_matches_in_block(items, threshold)) + return results @@ -269,38 +306,22 @@ def cluster_duplicates(matches: list[dict[str, Any]]) -> list[list[str]]: return [list(c) for c in nx.connected_components(graph)] -def merge_cluster( - conn: sqlite3.Connection, contact_ids: list[str], primary_id: str | None = None -) -> str: - """Merge multiple contacts into a single primary record. - - Consolidates data from all contacts into the primary record, keeping - the most complete data. Moves all emails and phones to the primary, - deduplicates them, and deletes the merged contacts. +def _select_primary_row( + rows: list[tuple[Any, ...]], primary_id: str | None +) -> tuple[tuple[Any, ...], list[tuple[Any, ...]], str]: + """Select the primary contact row from a list of contact rows. Args: - conn: SQLite database connection. - contact_ids: List of contact IDs to merge. - primary_id: Optional ID to use as the primary record. - If None, auto-selects the most complete record. + rows: List of contact rows from database. + primary_id: Optional ID to use as primary. If None, auto-selects + the most complete record. Returns: - The ID of the primary (surviving) contact. + Tuple of (primary_row, sorted_rows, primary_id). Raises: - ValueError: If no contact IDs provided or contacts not found. + ValueError: If primary_id is provided but not found in rows. """ - if not contact_ids: - raise ValueError("No contact IDs provided") - - cursor = conn.cursor() - placeholders = ",".join(["?"] * len(contact_ids)) - cursor.execute(f"SELECT * FROM contacts WHERE id IN ({placeholders})", contact_ids) - rows = cursor.fetchall() - - if not rows: - raise ValueError("Contacts not found in database") - if primary_id: # Find the row corresponding to primary_id primary_row_list = [r for r in rows if r[0] == primary_id] @@ -311,7 +332,7 @@ def merge_cluster( other_rows = [r for r in rows if r[0] != primary_id] sorted_rows = [primary_row] + other_rows else: - # Auto-select best primary + # Auto-select best primary based on completeness def score_row(row: tuple[Any, ...]) -> int: return sum(1 for field in row if field is not None and field != "") @@ -319,36 +340,51 @@ def score_row(row: tuple[Any, ...]) -> int: primary_row = sorted_rows[0] primary_id = primary_row[0] + return primary_row, sorted_rows, primary_id + + +def _merge_contact_fields( + primary_row: tuple[Any, ...], other_rows: list[tuple[Any, ...]] +) -> list[Any]: + """Merge contact fields, filling in missing data from other rows. + + Args: + primary_row: The primary contact row. + other_rows: Other contact rows to merge data from. + + Returns: + Merged contact data as a list. + """ current_primary = list(primary_row) - for other_row in sorted_rows[1:]: + for other_row in other_rows: for i in range(len(current_primary)): if (current_primary[i] is None or current_primary[i] == "") and other_row[ i ]: current_primary[i] = other_row[i] + return current_primary - cursor.execute( - """ - UPDATE contacts - SET first_name=?, last_name=?, job_title=?, linkedin=?, website=?, full_data=? - WHERE id=? - """, - ( - current_primary[1], - current_primary[2], - current_primary[3], - current_primary[4], - current_primary[5], - current_primary[6], - primary_id, - ), - ) +def _consolidate_related_records( + cursor: sqlite3.Cursor, primary_id: str, contact_ids: list[str], placeholders: str +) -> None: + """Consolidate emails and phones for merged contacts. + + Moves all emails and phones to the primary contact and deduplicates them. + + Args: + cursor: Database cursor. + primary_id: The primary contact ID. + contact_ids: All contact IDs being merged. + placeholders: SQL placeholders for contact_ids. + """ for table in ["emails", "phones"]: + # Move all records to primary contact cursor.execute( f"UPDATE {table} SET contact_id = ? WHERE contact_id IN ({placeholders})", [primary_id] + contact_ids, ) + # Deduplicate if table == "emails": cursor.execute(""" DELETE FROM emails WHERE id NOT IN ( @@ -362,6 +398,67 @@ def score_row(row: tuple[Any, ...]) -> int: ) """) + +def merge_cluster( + conn: sqlite3.Connection, contact_ids: list[str], primary_id: str | None = None +) -> str: + """Merge multiple contacts into a single primary record. + + Consolidates data from all contacts into the primary record, keeping + the most complete data. Moves all emails and phones to the primary, + deduplicates them, and deletes the merged contacts. + + Args: + conn: SQLite database connection. + contact_ids: List of contact IDs to merge. + primary_id: Optional ID to use as the primary record. + If None, auto-selects the most complete record. + + Returns: + The ID of the primary (surviving) contact. + + Raises: + ValueError: If no contact IDs provided or contacts not found. + """ + if not contact_ids: + raise ValueError("No contact IDs provided") + + cursor = conn.cursor() + placeholders = ",".join(["?"] * len(contact_ids)) + cursor.execute(f"SELECT * FROM contacts WHERE id IN ({placeholders})", contact_ids) + rows = cursor.fetchall() + + if not rows: + raise ValueError("Contacts not found in database") + + # Select primary and order rows + primary_row, sorted_rows, primary_id = _select_primary_row(rows, primary_id) + + # Merge fields from all contacts + merged_data = _merge_contact_fields(primary_row, sorted_rows[1:]) + + # Update primary contact with merged data + cursor.execute( + """ + UPDATE contacts + SET first_name=?, last_name=?, job_title=?, linkedin=?, website=?, full_data=? + WHERE id=? + """, + ( + merged_data[1], + merged_data[2], + merged_data[3], + merged_data[4], + merged_data[5], + merged_data[6], + primary_id, + ), + ) + + # Consolidate emails and phones + _consolidate_related_records(cursor, primary_id, contact_ids, placeholders) + + # Delete non-primary contacts cursor.execute( f"DELETE FROM contacts WHERE id IN ({placeholders}) AND id != ?", contact_ids + [primary_id], diff --git a/src/dex_python/error_handler.py b/src/dex_python/error_handler.py new file mode 100644 index 0000000..2c64fbb --- /dev/null +++ b/src/dex_python/error_handler.py @@ -0,0 +1,92 @@ +"""Shared error handling logic for Dex API clients. + +This module provides error handling utilities shared between +the synchronous and asynchronous clients. +""" + +from typing import Any + +import httpx + +from .exceptions import ( + AuthenticationError, + ContactNotFoundError, + DexAPIError, + NoteNotFoundError, + RateLimitError, + ReminderNotFoundError, + ValidationError, +) + + +def handle_error_response(response: httpx.Response, endpoint: str) -> None: + """Convert HTTP error response to appropriate exception. + + Args: + response: The HTTP response with error status. + endpoint: The API endpoint that was called. + + Raises: + AuthenticationError: For 401 responses. + RateLimitError: For 429 responses. + ValidationError: For 400 responses. + ContactNotFoundError: For 404 on /contacts endpoints. + ReminderNotFoundError: For 404 on /reminders endpoints. + NoteNotFoundError: For 404 on /timeline_items endpoints. + DexAPIError: For all other error responses. + """ + status_code = response.status_code + try: + data = response.json() + except Exception: + data = {} + + if status_code == 401: + raise AuthenticationError( + "Invalid API key", status_code=401, response_data=data + ) + elif status_code == 429: + retry_after = response.headers.get("Retry-After") + raise RateLimitError( + "Rate limit exceeded", + retry_after=int(retry_after) if retry_after else None, + ) + elif status_code == 400: + raise ValidationError( + data.get("error", "Validation error"), + status_code=400, + response_data=data, + ) + elif status_code == 404: + _handle_404_error(endpoint, data) + else: + raise DexAPIError( + data.get("error", f"API error: {status_code}"), + status_code=status_code, + response_data=data, + ) + + +def _handle_404_error(endpoint: str, data: dict[str, Any]) -> None: + """Handle 404 errors with entity-specific exceptions. + + Args: + endpoint: The API endpoint that was called. + data: The error response data. + + Raises: + ContactNotFoundError: For /contacts endpoints. + ReminderNotFoundError: For /reminders endpoints. + NoteNotFoundError: For /timeline_items endpoints. + DexAPIError: For other 404 errors. + """ + if "/contacts/" in endpoint: + contact_id = endpoint.split("/contacts/")[-1].split("/")[0] + raise ContactNotFoundError(contact_id) + elif "/reminders/" in endpoint: + reminder_id = endpoint.split("/reminders/")[-1].split("/")[0] + raise ReminderNotFoundError(reminder_id) + elif "/timeline_items/" in endpoint: + note_id = endpoint.split("/timeline_items/")[-1].split("/")[0] + raise NoteNotFoundError(note_id) + raise DexAPIError("Not found", status_code=404, response_data=data) diff --git a/tests/unit/deduplication/test_helpers.py b/tests/unit/deduplication/test_helpers.py new file mode 100644 index 0000000..a5da34d --- /dev/null +++ b/tests/unit/deduplication/test_helpers.py @@ -0,0 +1,234 @@ +"""Tests for deduplication helper functions.""" + +import sqlite3 + +import pytest + +from dex_python.deduplication import ( + _consolidate_related_records, + _create_soundex_blocks, + _find_matches_in_block, + _merge_contact_fields, + _select_primary_row, +) + + +class TestSelectPrimaryRow: + """Tests for _select_primary_row helper function.""" + + def test_select_primary_row_with_explicit_id(self) -> None: + """Test selecting primary row with explicit ID.""" + rows = [ + ("id1", "John", "Doe", "Engineer", None, None, "{}"), + ("id2", "Jane", "Doe", "Manager", "linkedin", "website", "{}"), + ("id3", "Jim", "Doe", "Director", None, None, "{}"), + ] + primary_row, sorted_rows, primary_id = _select_primary_row(rows, "id2") + assert primary_id == "id2" + assert primary_row[0] == "id2" + assert len(sorted_rows) == 3 + assert sorted_rows[0][0] == "id2" + + def test_select_primary_row_auto_selects_most_complete(self) -> None: + """Test auto-selection of most complete row.""" + rows = [ + ("id1", "John", None, None, None, None, "{}"), + ("id2", "Jane", "Doe", "Manager", "linkedin", "website", "{}"), + ("id3", "Jim", "Doe", None, None, None, "{}"), + ] + primary_row, sorted_rows, primary_id = _select_primary_row(rows, None) + assert primary_id == "id2" # Most complete row + assert primary_row[0] == "id2" + + def test_select_primary_row_raises_on_invalid_id(self) -> None: + """Test that ValueError is raised for invalid primary_id.""" + rows = [ + ("id1", "John", "Doe", None, None, None, "{}"), + ("id2", "Jane", "Doe", None, None, None, "{}"), + ] + with pytest.raises(ValueError, match="Primary ID id3 not found"): + _select_primary_row(rows, "id3") + + +class TestMergeContactFields: + """Tests for _merge_contact_fields helper function.""" + + def test_merge_fills_missing_fields(self) -> None: + """Test that merge fills in missing fields from other rows.""" + primary_row = ("id1", "John", None, None, None, None, "{}") + other_rows = [ + ("id2", "Jane", "Doe", "Manager", "linkedin", None, "{}"), + ("id3", "Jim", "Doe", None, None, "website", "{}"), + ] + merged = _merge_contact_fields(primary_row, other_rows) + assert merged[0] == "id1" # ID from primary + assert merged[1] == "John" # First name from primary + assert merged[2] == "Doe" # Last name from first other + assert merged[3] == "Manager" # Job title from first other + assert merged[4] == "linkedin" # LinkedIn from first other + assert merged[5] == "website" # Website from second other + + def test_merge_preserves_primary_non_empty_fields(self) -> None: + """Test that merge preserves non-empty fields from primary.""" + primary_row = ("id1", "John", "Smith", "Engineer", "primary-li", None, "{}") + other_rows = [ + ("id2", "Jane", "Doe", "Manager", "other-li", "website", "{}"), + ] + merged = _merge_contact_fields(primary_row, other_rows) + assert merged[1] == "John" # Kept from primary + assert merged[2] == "Smith" # Kept from primary + assert merged[3] == "Engineer" # Kept from primary + assert merged[4] == "primary-li" # Kept from primary + assert merged[5] == "website" # Filled from other + + +class TestCreateSoundexBlocks: + """Tests for _create_soundex_blocks helper function.""" + + def test_creates_blocks_by_metaphone(self) -> None: + """Test that contacts are grouped by metaphone of last name.""" + rows = [ + ("id1", "John", "Smith"), + ("id2", "Jane", "Smyth"), # Similar sound + ("id3", "Jim", "Jones"), + ] + blocks = _create_soundex_blocks(rows) + assert len(blocks) > 0 + # Smith and Smyth should likely be in the same block (similar metaphone) + # But we can't guarantee exact metaphone results, so just check structure + for key, contacts in blocks.items(): + assert isinstance(key, str) + assert isinstance(contacts, list) + for contact in contacts: + assert "id" in contact + assert "full_name" in contact + + def test_skips_empty_names(self) -> None: + """Test that empty names are skipped.""" + rows = [ + ("id1", "John", "Smith"), + ("id2", "", "Doe"), # Empty first name + ("id3", "Jane", ""), # Empty last name + ("id4", " ", "Jones"), # Whitespace only + ] + blocks = _create_soundex_blocks(rows) + # Should only have id1 and id4 after filtering (but id4 will be filtered too) + total_contacts = sum(len(contacts) for contacts in blocks.values()) + assert total_contacts == 1 # Only id1 + + def test_handles_metaphone_errors_gracefully(self) -> None: + """Test that metaphone errors fall back to first 2 chars.""" + rows = [ + ("id1", "John", "Xyz123!@#"), # Non-standard characters + ] + blocks = _create_soundex_blocks(rows) + # Should still create blocks, just using fallback + total_contacts = sum(len(contacts) for contacts in blocks.values()) + assert total_contacts == 1 + + +class TestFindMatchesInBlock: + """Tests for _find_matches_in_block helper function.""" + + def test_finds_matches_above_threshold(self) -> None: + """Test that similar names are matched.""" + items = [ + {"id": "id1", "full_name": "John Smith"}, + {"id": "id2", "full_name": "Jon Smith"}, # Typo + ] + matches = _find_matches_in_block(items, threshold=0.85) + assert len(matches) == 1 + assert matches[0]["match_type"] == "fuzzy_name" + assert "id1" in matches[0]["contact_ids"] + assert "id2" in matches[0]["contact_ids"] + + def test_ignores_matches_below_threshold(self) -> None: + """Test that dissimilar names are not matched.""" + items = [ + {"id": "id1", "full_name": "John Smith"}, + {"id": "id2", "full_name": "Jane Doe"}, + ] + matches = _find_matches_in_block(items, threshold=0.9) + assert len(matches) == 0 + + def test_finds_multiple_matches_in_block(self) -> None: + """Test finding multiple matches in same block.""" + items = [ + {"id": "id1", "full_name": "John Smith"}, + {"id": "id2", "full_name": "Jon Smith"}, + {"id": "id3", "full_name": "Johnny Smith"}, + ] + matches = _find_matches_in_block(items, threshold=0.80) + # Should find multiple pairs + assert len(matches) >= 2 + + +class TestConsolidateRelatedRecords: + """Tests for _consolidate_related_records helper function.""" + + def test_consolidates_emails_and_phones(self) -> None: + """Test that emails and phones are moved to primary contact.""" + conn = sqlite3.connect(":memory:") + cursor = conn.cursor() + + # Create tables + cursor.execute( + """CREATE TABLE emails ( + id INTEGER PRIMARY KEY, contact_id TEXT, email TEXT + )""" + ) + cursor.execute( + """CREATE TABLE phones ( + id INTEGER PRIMARY KEY, contact_id TEXT, phone_number TEXT + )""" + ) + + # Insert test data + cursor.execute("INSERT INTO emails VALUES (1, 'id1', 'john@example.com')") + cursor.execute("INSERT INTO emails VALUES (2, 'id2', 'jane@example.com')") + cursor.execute("INSERT INTO phones VALUES (1, 'id1', '111-1111')") + cursor.execute("INSERT INTO phones VALUES (2, 'id2', '222-2222')") + conn.commit() + + # Consolidate + _consolidate_related_records(cursor, "id1", ["id1", "id2"], "?,?") + conn.commit() + + # Check results + cursor.execute("SELECT contact_id FROM emails ORDER BY id") + email_contacts = [row[0] for row in cursor.fetchall()] + assert all(c == "id1" for c in email_contacts) + + cursor.execute("SELECT contact_id FROM phones ORDER BY id") + phone_contacts = [row[0] for row in cursor.fetchall()] + assert all(c == "id1" for c in phone_contacts) + + def test_deduplicates_emails(self) -> None: + """Test that duplicate emails are removed.""" + conn = sqlite3.connect(":memory:") + cursor = conn.cursor() + + cursor.execute( + """CREATE TABLE emails ( + id INTEGER PRIMARY KEY, contact_id TEXT, email TEXT + )""" + ) + cursor.execute( + """CREATE TABLE phones ( + id INTEGER PRIMARY KEY, contact_id TEXT, phone_number TEXT + )""" + ) + + # Insert duplicate emails (different case) + cursor.execute("INSERT INTO emails VALUES (1, 'id1', 'john@example.com')") + cursor.execute("INSERT INTO emails VALUES (2, 'id1', 'JOHN@example.com')") + conn.commit() + + # Consolidate + _consolidate_related_records(cursor, "id1", ["id1"], "?") + conn.commit() + + # Check results - should only have one email + cursor.execute("SELECT COUNT(*) FROM emails") + count = cursor.fetchone()[0] + assert count == 1 diff --git a/tests/unit/test_error_handler.py b/tests/unit/test_error_handler.py new file mode 100644 index 0000000..c1c2734 --- /dev/null +++ b/tests/unit/test_error_handler.py @@ -0,0 +1,155 @@ +"""Tests for the shared error handling module.""" + +import httpx +import pytest + +from dex_python.error_handler import handle_error_response +from dex_python.exceptions import ( + AuthenticationError, + ContactNotFoundError, + DexAPIError, + NoteNotFoundError, + RateLimitError, + ReminderNotFoundError, + ValidationError, +) + + +class TestHandleErrorResponse: + """Tests for the handle_error_response function.""" + + def test_401_raises_authentication_error(self) -> None: + """Test that 401 status raises AuthenticationError.""" + response = httpx.Response( + status_code=401, + json={"error": "Unauthorized"}, + ) + with pytest.raises(AuthenticationError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.status_code == 401 + assert "Invalid API key" in str(exc_info.value) + + def test_429_raises_rate_limit_error(self) -> None: + """Test that 429 status raises RateLimitError.""" + response = httpx.Response( + status_code=429, + headers={"Retry-After": "60"}, + json={"error": "Too many requests"}, + ) + with pytest.raises(RateLimitError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.retry_after == 60 + + def test_429_without_retry_after_header(self) -> None: + """Test that 429 without Retry-After header works.""" + response = httpx.Response( + status_code=429, + json={"error": "Too many requests"}, + ) + with pytest.raises(RateLimitError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.retry_after is None + + def test_400_raises_validation_error(self) -> None: + """Test that 400 status raises ValidationError.""" + response = httpx.Response( + status_code=400, + json={"error": "Invalid input"}, + ) + with pytest.raises(ValidationError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.status_code == 400 + assert "Invalid input" in str(exc_info.value) + + def test_400_without_error_message(self) -> None: + """Test that 400 without error message uses default.""" + response = httpx.Response( + status_code=400, + json={}, + ) + with pytest.raises(ValidationError) as exc_info: + handle_error_response(response, "/contacts") + assert "Validation error" in str(exc_info.value) + + def test_404_contacts_raises_contact_not_found(self) -> None: + """Test that 404 on /contacts endpoint raises ContactNotFoundError.""" + response = httpx.Response( + status_code=404, + json={"error": "Not found"}, + ) + with pytest.raises(ContactNotFoundError) as exc_info: + handle_error_response(response, "/contacts/abc123") + assert exc_info.value.contact_id == "abc123" + + def test_404_reminders_raises_reminder_not_found(self) -> None: + """Test that 404 on /reminders endpoint raises ReminderNotFoundError.""" + response = httpx.Response( + status_code=404, + json={"error": "Not found"}, + ) + with pytest.raises(ReminderNotFoundError) as exc_info: + handle_error_response(response, "/reminders/xyz789") + assert exc_info.value.reminder_id == "xyz789" + + def test_404_timeline_items_raises_note_not_found(self) -> None: + """Test that 404 on /timeline_items endpoint raises NoteNotFoundError.""" + response = httpx.Response( + status_code=404, + json={"error": "Not found"}, + ) + with pytest.raises(NoteNotFoundError) as exc_info: + handle_error_response(response, "/timeline_items/note456") + assert exc_info.value.note_id == "note456" + + def test_404_other_endpoint_raises_generic_error(self) -> None: + """Test that 404 on other endpoints raises generic DexAPIError.""" + response = httpx.Response( + status_code=404, + json={"error": "Not found"}, + ) + with pytest.raises(DexAPIError) as exc_info: + handle_error_response(response, "/some/other/endpoint") + assert exc_info.value.status_code == 404 + assert "Not found" in str(exc_info.value) + + def test_500_raises_generic_api_error(self) -> None: + """Test that 500 status raises generic DexAPIError.""" + response = httpx.Response( + status_code=500, + json={"error": "Internal server error"}, + ) + with pytest.raises(DexAPIError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.status_code == 500 + assert "Internal server error" in str(exc_info.value) + + def test_500_without_error_message_uses_default(self) -> None: + """Test that 500 without error message uses default.""" + response = httpx.Response( + status_code=500, + json={}, + ) + with pytest.raises(DexAPIError) as exc_info: + handle_error_response(response, "/contacts") + assert "API error: 500" in str(exc_info.value) + + def test_handles_invalid_json_response(self) -> None: + """Test that invalid JSON is handled gracefully.""" + response = httpx.Response( + status_code=500, + content=b"Not JSON", + ) + with pytest.raises(DexAPIError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.status_code == 500 + + def test_response_data_included_in_exception(self) -> None: + """Test that response data is included in exceptions.""" + response_data = {"error": "Custom error", "details": "More info"} + response = httpx.Response( + status_code=500, + json=response_data, + ) + with pytest.raises(DexAPIError) as exc_info: + handle_error_response(response, "/contacts") + assert exc_info.value.response_data == response_data