# DeduplicationTracker

> Track one-time operations per job to prevent duplicates.

In [None]:
#| default_exp core.deduplication

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from typing import Set, Callable, Optional, Any

## DeduplicationTracker Class

The `DeduplicationTracker` ensures that operations are performed exactly once per job within a user's session. This is useful for operations that should only happen once, such as:

- Auto-saving results to disk
- Sending email notifications
- Logging completion events
- Creating database records

The tracker stores job IDs in the session, so tracking persists across requests but is scoped to each user.

In [None]:
#| export
class DeduplicationTracker:
    """Track one-time operations per job to prevent duplicates."""

    def __init__(
        self, 
        sess:Any, # FastHTML session object
        tracker_id:str # Unique identifier for this tracker (e.g., "saved_jobs", "sent_notifications")
    ):
        """Initialize deduplication tracker."""
        self.sess = sess
        self.tracker_id = tracker_id
        self._key = f"dedup_{tracker_id}"

    def _get_processed_set(self) -> Set[str]: # Set of processed job IDs
        """Get the set of processed job IDs from session."""
        # Session stores as list, we convert to set for O(1) lookup
        processed_list = self.sess.get(self._key, [])
        return set(processed_list)
    
    def _force_session_save(self) -> None:
        """Force session to be marked as modified and saved."""
        import time
        # Update a timestamp to trigger session modification detection
        # This ensures session middleware knows to save the session
        self.sess[f"__dedup_{self.tracker_id}_last_modified"] = time.time()

    def _save_processed_set(
        self, 
        processed:Set[str] # Set of processed job IDs to save
    ) -> None:
        """Save the set of processed job IDs to session."""
        # Convert set to list for JSON serialization
        self.sess[self._key] = list(processed)
        # Force session modification detection
        self._force_session_save()

    def is_processed(
        self, 
        job_id:str # Unique job identifier
    ) -> bool: # True if job has been marked as processed, False otherwise
        """Check if a job has been processed."""
        return job_id in self._get_processed_set()

    def mark_processed(
        self, 
        job_id:str # Unique job identifier
    ) -> None:
        """Mark a job as processed."""
        processed = self._get_processed_set()
        processed.add(job_id)
        self._save_processed_set(processed)

    def unmark_processed(
        self, 
        job_id:str # Unique job identifier
    ) -> None:
        """Remove processed mark from a job (allows reprocessing)."""
        processed = self._get_processed_set()
        processed.discard(job_id)
        self._save_processed_set(processed)

    def clear_all(self) -> None:
        """Clear all processed job IDs."""
        if self._key in self.sess:
            del self.sess[self._key]
        # Force session save after deletion
        self._force_session_save()

    def get_all_processed(self) -> Set[str]: # Set of job IDs that have been marked as processed
        """Get all processed job IDs."""
        return self._get_processed_set()

## Usage Examples

These examples demonstrate how to use `DeduplicationTracker` to prevent duplicate operations.

In [None]:
# Create a mock session
mock_session = {}

# Create a tracker for saved jobs
tracker = DeduplicationTracker(mock_session, "saved_jobs")
tracker

<__main__.DeduplicationTracker>

In [None]:
# Check if job has been processed (initially false)
is_processed = tracker.is_processed("job-123")
print(f"Job-123 processed: {is_processed}")

Job-123 processed: False


In [None]:
# Mark job as processed
tracker.mark_processed("job-123")
print(f"Job-123 processed: {tracker.is_processed('job-123')}")
print(f"Session state: {mock_session}")

Job-123 processed: True
Session state: {'dedup_saved_jobs': ['job-123'], '__dedup_saved_jobs_last_modified': 1761793147.8636549}


In [None]:
# Mark multiple jobs
tracker.mark_processed("job-456")
tracker.mark_processed("job-789")

all_processed = tracker.get_all_processed()
print(f"All processed jobs: {all_processed}")

All processed jobs: {'job-123', 'job-456', 'job-789'}


In [None]:
# Unmark a job (allow reprocessing)
tracker.unmark_processed("job-456")
print(f"Job-456 processed: {tracker.is_processed('job-456')}")
print(f"Remaining: {tracker.get_all_processed()}")

Job-456 processed: False
Remaining: {'job-123', 'job-789'}


In [None]:
# Clear all processed jobs
tracker.clear_all()
print(f"After clearing: {tracker.get_all_processed()}")
print(f"Session state: {mock_session}")

After clearing: set()
Session state: {'__dedup_saved_jobs_last_modified': 1761793147.915495}


## Multiple Trackers

You can use multiple trackers in the same session for different types of operations:

In [None]:
session = {}

# Tracker for saved jobs
save_tracker = DeduplicationTracker(session, "saved_jobs")
save_tracker.mark_processed("job-001")
save_tracker.mark_processed("job-002")

# Tracker for sent notifications
notify_tracker = DeduplicationTracker(session, "sent_notifications")
notify_tracker.mark_processed("job-001")

# Tracker for logged events
log_tracker = DeduplicationTracker(session, "logged_events")
log_tracker.mark_processed("job-001")
log_tracker.mark_processed("job-002")
log_tracker.mark_processed("job-003")

print("Session with multiple trackers:")
for key, value in session.items():
    print(f"  {key}: {value}")

Session with multiple trackers:
  dedup_saved_jobs: ['job-001', 'job-002']
  __dedup_saved_jobs_last_modified: 1761793147.9306018
  dedup_sent_notifications: ['job-001']
  __dedup_sent_notifications_last_modified: 1761793147.9306183
  dedup_logged_events: ['job-003', 'job-001', 'job-002']
  __dedup_logged_events_last_modified: 1761793147.9306488


In [None]:
# Each tracker is independent
print(f"Job-001 saved: {save_tracker.is_processed('job-001')}")
print(f"Job-001 notified: {notify_tracker.is_processed('job-001')}")
print(f"Job-001 logged: {log_tracker.is_processed('job-001')}")
print(f"\nJob-002 saved: {save_tracker.is_processed('job-002')}")
print(f"Job-002 notified: {notify_tracker.is_processed('job-002')}")
print(f"Job-002 logged: {log_tracker.is_processed('job-002')}")

Job-001 saved: True
Job-001 notified: True
Job-001 logged: True

Job-002 saved: True
Job-002 notified: False
Job-002 logged: True


## Real-World Pattern

Here's a typical pattern for using the tracker to prevent duplicate saves:

In [None]:
# Simulate a session
user_session = {}

def save_result_to_disk(job_id, data):
    """Simulated disk save operation."""
    print(f"  [DISK] Saving result for {job_id}: {data}")
    # In real code: write to file, database, etc.

def save_job_result_once(sess, job_id, data):
    """
    Save job result to disk, ensuring it's only saved once.
    
    This prevents duplicate saves even if the route is called multiple times.
    """
    tracker = DeduplicationTracker(sess, "saved_jobs")
    
    if tracker.is_processed(job_id):
        print(f"  [SKIP] Job {job_id} already saved")
        return
    
    try:
        save_result_to_disk(job_id, data)
        tracker.mark_processed(job_id)
        print(f"  [SUCCESS] Job {job_id} saved and marked")
    except Exception as e:
        print(f"  [ERROR] Failed to save job {job_id}: {e}")
        # Note: job is NOT marked as processed, so it can be retried

# Simulate multiple calls (e.g., page refreshes, SSE polling, etc.)
print("First call:")
save_job_result_once(user_session, "job-abc", {"text": "Hello world"})

print("\nSecond call (duplicate):")
save_job_result_once(user_session, "job-abc", {"text": "Hello world"})

print("\nThird call (duplicate):")
save_job_result_once(user_session, "job-abc", {"text": "Hello world"})

print("\nDifferent job:")
save_job_result_once(user_session, "job-xyz", {"text": "Goodbye world"})

print("\nFinal session state:")
user_session

First call:
  [DISK] Saving result for job-abc: {'text': 'Hello world'}
  [SUCCESS] Job job-abc saved and marked

Second call (duplicate):
  [SKIP] Job job-abc already saved

Third call (duplicate):
  [SKIP] Job job-abc already saved

Different job:
  [DISK] Saving result for job-xyz: {'text': 'Goodbye world'}
  [SUCCESS] Job job-xyz saved and marked

Final session state:


{'dedup_saved_jobs': ['job-abc', 'job-xyz'],
 '__dedup_saved_jobs_last_modified': 1761793147.9616601}

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()