# match email ID

In [7]:
# Import required libraries
from pymongo import MongoClient
from collections import defaultdict
import re
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Connect to MongoDB using environment variables
MONGO_CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
MONGO_DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')

if not MONGO_CONNECTION_STRING or not MONGO_DATABASE_NAME:
    raise ValueError("MONGO_CONNECTION_STRING and MONGO_DATABASE_NAME must be set in environment variables")

print(f"Connecting to MongoDB...")
print(f"Database: {MONGO_DATABASE_NAME}")

client = MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DATABASE_NAME]

# Get collections
clusters_collection = db['cluster']
emailmessages_collection = db['emailmessages']

def normalize_text(text):
    """Normalize text for better matching"""
    return re.sub(r'[^\w\s]', '', text.lower().strip())

def match_emails_to_clusters():
    """
    Match emails to clusters based on dominant_topic matching keyphrases
    and update cluster documents with email_ids array
    """
    
    print("Fetching all clusters...")
    # Get all clusters
    clusters = list(clusters_collection.find())
    print(f"Found {len(clusters)} clusters to process\n")
    
    # Process each cluster
    for cluster in clusters:
        cluster_id = cluster['cluster_id']
        keyphrases = cluster.get('keyphrases', [])
        
        print(f"Processing Cluster ID: {cluster_id}")
        print(f"Cluster Name: {cluster.get('cluster_name', 'N/A')}")
        print(f"Keyphrases: {keyphrases}")
        
        # Normalize keyphrases for matching
        normalized_keyphrases = [normalize_text(phrase) for phrase in keyphrases]
        print(f"Normalized keyphrases: {normalized_keyphrases}")
        
        # Find matching emails
        matching_email_ids = []
        
        # Get all email messages - using cursor for better memory management
        print("  Searching through emails...")
        emails_cursor = emailmessages_collection.find({}, {
            '_id': 1, 
            'dominant_topic': 1
        })
        
        email_count = 0
        for email in emails_cursor:
            email_count += 1
            if email_count % 1000 == 0:
                print(f"    Processed {email_count} emails...")
                
            email_dominant_topic = email.get('dominant_topic', '')
            
            if email_dominant_topic:
                normalized_topic = normalize_text(email_dominant_topic)
                
                # Check if any keyphrase matches the dominant topic
                for keyphrase in normalized_keyphrases:
                    if keyphrase and normalized_topic:  # Ensure both are not empty
                        if keyphrase in normalized_topic or normalized_topic in keyphrase:
                            matching_email_ids.append(str(email['_id']))
                            print(f"    Match found: {email['_id']} - Topic: '{email_dominant_topic}' matches keyphrase: '{keyphrase}'")
                            break
        
        print(f"  Finished processing {email_count} emails")
        
        # Remove duplicates (in case an email matches multiple times)
        matching_email_ids = list(set(matching_email_ids))
        
        # Update cluster with email_ids
        try:
            if matching_email_ids:
                result = clusters_collection.update_one(
                    {'cluster_id': cluster_id},
                    {'$set': {'email_ids': matching_email_ids}}
                )
                if result.modified_count > 0:
                    print(f"  ✓ Successfully updated cluster {cluster_id} with {len(matching_email_ids)} email IDs")
                else:
                    print(f"  ⚠ No update performed for cluster {cluster_id} (may already have same data)")
            else:
                # Set empty array if no matches found
                result = clusters_collection.update_one(
                    {'cluster_id': cluster_id},
                    {'$set': {'email_ids': []}}
                )
                if result.modified_count > 0:
                    print(f"  ✓ Set empty email_ids array for cluster {cluster_id} (no matches found)")
                else:
                    print(f"  ⚠ No update needed for cluster {cluster_id}")
        except Exception as e:
            print(f"  ❌ Error updating cluster {cluster_id}: {str(e)}")
        
        print(f"  Total unique emails matched: {len(matching_email_ids)}")
        print("-" * 50)

def verify_results():
    """
    Verify the results by displaying updated clusters
    """
    print("\n" + "=" * 60)
    print("VERIFICATION RESULTS")
    print("=" * 60)
    
    try:
        clusters = list(clusters_collection.find({}, {
            'cluster_id': 1, 
            'cluster_name': 1, 
            'keyphrases': 1, 
            'email_ids': 1
        }).sort('cluster_id', 1))
        
        for cluster in clusters:
            email_count = len(cluster.get('email_ids', []))
            print(f"\nCluster {cluster['cluster_id']}: {cluster.get('cluster_name', 'N/A')}")
            print(f"  Keyphrases: {cluster.get('keyphrases', [])}")
            print(f"  Email IDs count: {email_count}")
            if email_count > 0:
                print(f"  First 3 Email IDs: {cluster['email_ids'][:3]}")
                if email_count > 3:
                    print(f"  ... and {email_count - 3} more")
    except Exception as e:
        print(f"❌ Error during verification: {str(e)}")

def get_summary_stats():
    """Get summary statistics"""
    print("\n" + "=" * 60)
    print("SUMMARY STATISTICS")
    print("=" * 60)
    
    try:
        total_clusters = clusters_collection.count_documents({})
        clusters_with_emails = clusters_collection.count_documents({'email_ids': {'$exists': True, '$ne': []}})
        
        pipeline = [
            {'$match': {'email_ids': {'$exists': True}}},
            {'$project': {'email_count': {'$size': '$email_ids'}}},
            {'$group': {'_id': None, 'total_emails_matched': {'$sum': '$email_count'}}}
        ]
        
        result = list(clusters_collection.aggregate(pipeline))
        total_emails_matched = result[0]['total_emails_matched'] if result else 0
        
        total_emails = emailmessages_collection.count_documents({})
        
        print(f"Total clusters: {total_clusters}")
        print(f"Clusters with matched emails: {clusters_with_emails}")
        print(f"Clusters without matches: {total_clusters - clusters_with_emails}")
        print(f"Total emails in database: {total_emails}")
        print(f"Total email-cluster matches: {total_emails_matched}")
        
        if total_emails > 0:
            match_percentage = (total_emails_matched / total_emails) * 100
            print(f"Match percentage: {match_percentage:.2f}%")
            
    except Exception as e:
        print(f"❌ Error getting statistics: {str(e)}")

# Main execution
if __name__ == "__main__":
    try:
        print("🚀 Starting email-cluster matching process...")
        print("=" * 60)
        
        # Test database connection
        clusters_collection.find_one()
        emailmessages_collection.find_one()
        print("✓ Database connection successful\n")
        
        # Execute the matching process
        match_emails_to_clusters()
        
        # Verify results
        verify_results()
        
        # Get summary statistics
        get_summary_stats()
        
        print("\n" + "=" * 60)
        print("✅ Process completed successfully!")
        print("=" * 60)
        
    except Exception as e:
        print(f"\n❌ Error during execution: {str(e)}")
        print("Please check your environment variables and database connection.")
    finally:
        # Close database connection
        if 'client' in locals():
            client.close()
            print("Database connection closed.")

Connecting to MongoDB...
Database: sparzaai
🚀 Starting email-cluster matching process...
✓ Database connection successful

Fetching all clusters...
Found 33 clusters to process

Processing Cluster ID: 0
Cluster Name: N/A
Keyphrases: ['SEPA Payment Failure', 'SEPA Processing Error', 'SEPA Instant Failure', 'SEPA Payment Status', 'TARGET2 Settlement Issue', 'Euro Clearing Problem']
Normalized keyphrases: ['sepa payment failure', 'sepa processing error', 'sepa instant failure', 'sepa payment status', 'target2 settlement issue', 'euro clearing problem']
  Searching through emails...
    Match found: 688b0fab0110d1205b4919e3 - Topic: 'Euro Clearing Problem' matches keyphrase: 'euro clearing problem'
    Match found: 688b10310110d1205b4919eb - Topic: 'SEPA Instant Failure' matches keyphrase: 'sepa instant failure'
    Match found: 688b0f250110d1205b4919d7 - Topic: 'SEPA Instant Failure' matches keyphrase: 'sepa instant failure'
    Match found: 688b10b80110d1205b4919f6 - Topic: 'SEPA Payment

# transform clusters

In [2]:
# Import required libraries
from pymongo import MongoClient
import os
from dotenv import load_dotenv
import time
from datetime import datetime, timezone

# Load environment variables from .env file
load_dotenv()

# Connect to MongoDB using environment variables
MONGO_CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
MONGO_DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')

if not MONGO_CONNECTION_STRING or not MONGO_DATABASE_NAME:
    raise ValueError("MONGO_CONNECTION_STRING and MONGO_DATABASE_NAME must be set in environment variables")

print(f"Connecting to MongoDB...")
print(f"Database: {MONGO_DATABASE_NAME}")

client = MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DATABASE_NAME]

# Get collections
clusters_collection = db['cluster']

def extract_dominant_label(cluster_name):
    """Extract dominant label - keep cluster name as is"""
    if not cluster_name:
        return "EU Banking Management"
    
    # Keep cluster name exactly as is
    return cluster_name

def transform_subtopics_to_subclusters(subtopics):
    """Transform old subtopics format to new subclusters format without keyphrase count"""
    subclusters = {}
    
    if subtopics and isinstance(subtopics, list):
        for i, subtopic in enumerate(subtopics):
            if isinstance(subtopic, dict):
                label = subtopic.get('subcluster_name', f'Subcluster {i}')
                keyphrases = subtopic.get('keyphrases', [])
                
                subclusters[str(i)] = {
                    'label': label,
                    'keyphrases': keyphrases
                    # No keyphrase_count for subclusters
                }
    
    return subclusters

def transform_clusters():
    """
    Transform all cluster documents to the new format
    """
    
    print("Fetching all clusters for transformation...")
    clusters = list(clusters_collection.find())
    print(f"Found {len(clusters)} clusters to transform\n")
    
    transformed_count = 0
    error_count = 0
    
    for cluster in clusters:
        try:
            cluster_id = cluster.get('cluster_id')
            cluster_name = cluster.get('cluster_name', '')  # Keep original cluster name
            keyphrases = cluster.get('keyphrases', [])
            subtopics = cluster.get('subtopics', [])
            email_ids = cluster.get('email_ids', [])
            data = cluster.get('data', 'email')
            created_at_original = cluster.get('created_at', '')
            
            print(f"Transforming Cluster ID: {cluster_id}")
            print(f"  Cluster Name: {cluster_name}")
            
            # Create the new document structure
            new_document = {
                'cluster_id': cluster_id,
                'keyphrases': keyphrases,
                'keyphrase_count': len(keyphrases),  # Count keyphrases
                'domains': ['EU bank'],  # Fixed domain for EU bank
                'email_ids': email_ids,
                'data': data,
                'created_at': created_at_original,  # Keep original created_at value
                'dominant_label': extract_dominant_label(cluster_name),
                'original_keyphrases_count': len(keyphrases),
                'processing_date': datetime.now(timezone.utc).isoformat(),
                'subclusters': transform_subtopics_to_subclusters(subtopics),
                'uniqueness_validated': True
            }
            
            print(f"  Dominant Label: {new_document['dominant_label']}")
            print(f"  Keyphrase Count: {new_document['keyphrase_count']}")
            print(f"  Domain: {new_document['domains']}")
            
            # Show subcluster info (no keyphrase counts)
            subclusters = new_document['subclusters']
            if subclusters:
                print(f"  Subclusters:")
                for key, subcluster in subclusters.items():
                    print(f"    {key}: '{subcluster['label']}' ({len(subcluster['keyphrases'])} keyphrases)")
            
            # Replace the entire document
            result = clusters_collection.replace_one(
                {'_id': cluster['_id']},
                new_document
            )
            
            if result.modified_count > 0:
                print(f"  ✓ Successfully transformed cluster {cluster_id}")
                transformed_count += 1
            else:
                print(f"  ⚠ No changes made to cluster {cluster_id}")
            
        except Exception as e:
            print(f"  ❌ Error transforming cluster {cluster_id}: {str(e)}")
            error_count += 1
        
        print("-" * 50)
    
    return transformed_count, error_count

def verify_transformation():
    """
    Verify the transformation results
    """
    print("\n" + "=" * 60)
    print("TRANSFORMATION VERIFICATION")
    print("=" * 60)
    
    try:
        # Get a sample of transformed documents
        sample_clusters = list(clusters_collection.find({}).limit(3).sort('cluster_id', 1))
        
        for cluster in sample_clusters:
            print(f"\nCluster {cluster.get('cluster_id')}:")
            print(f"  Dominant Label: '{cluster.get('dominant_label')}'")
            print(f"  Keyphrase Count: {cluster.get('keyphrase_count')}")
            print(f"  Domains: {cluster.get('domains')}")
            print(f"  Email IDs Count: {len(cluster.get('email_ids', []))}")
            print(f"  Processing Date: {cluster.get('processing_date')}")
            print(f"  Uniqueness Validated: {cluster.get('uniqueness_validated')}")
            
            # Show first few keyphrases
            keyphrases = cluster.get('keyphrases', [])
            if keyphrases:
                print(f"  Keyphrases ({len(keyphrases)}): {keyphrases}")
            
            # Show subclusters structure (no keyphrase counts in subclusters)
            subclusters = cluster.get('subclusters', {})
            if subclusters:
                print(f"  Subclusters ({len(subclusters)}):")
                for key, subcluster in subclusters.items():
                    label = subcluster.get('label', 'N/A')
                    keyphrases = subcluster.get('keyphrases', [])
                    print(f"    {key}: '{label}' ({len(keyphrases)} keyphrases)")
                    if keyphrases:
                        print(f"       Keyphrases: {keyphrases}")
    
    except Exception as e:
        print(f"❌ Error during verification: {str(e)}")

def get_transformation_summary():
    """Get summary of the transformation"""
    print("\n" + "=" * 60)
    print("TRANSFORMATION SUMMARY")
    print("=" * 60)
    
    try:
        total_clusters = clusters_collection.count_documents({})
        
        # Count clusters with new fields
        clusters_with_keyphrase_count = clusters_collection.count_documents({'keyphrase_count': {'$exists': True}})
        clusters_with_domains = clusters_collection.count_documents({'domains': {'$exists': True}})
        clusters_with_dominant_label = clusters_collection.count_documents({'dominant_label': {'$exists': True}})
        clusters_with_subclusters = clusters_collection.count_documents({'subclusters': {'$exists': True}})
        
        print(f"Total clusters: {total_clusters}")
        print(f"Clusters with keyphrase_count: {clusters_with_keyphrase_count}")
        print(f"Clusters with domains: {clusters_with_domains}")
        print(f"Clusters with dominant_label: {clusters_with_dominant_label}")
        print(f"Clusters with subclusters: {clusters_with_subclusters}")
        
        # Average keyphrase count for main clusters
        avg_pipeline = [
            {'$group': {'_id': None, 'avg_keyphrases': {'$avg': '$keyphrase_count'}}}
        ]
        avg_result = list(clusters_collection.aggregate(avg_pipeline))
        if avg_result:
            print(f"\nAverage keyphrases per cluster: {avg_result[0]['avg_keyphrases']:.2f}")
        
        # Get subcluster statistics (no keyphrase_count field in subclusters)
        subcluster_pipeline = [
            {'$match': {'subclusters': {'$exists': True, '$ne': {}}}},
            {'$project': {
                'subcluster_counts': {
                    '$map': {
                        'input': {'$objectToArray': '$subclusters'},
                        'as': 'sub',
                        'in': {'$size': '$sub.v.keyphrases'}  # Count keyphrases array length
                    }
                }
            }},
            {'$unwind': '$subcluster_counts'},
            {'$group': {
                '_id': None, 
                'avg_subcluster_keyphrases': {'$avg': '$subcluster_counts'},
                'total_subclusters': {'$sum': 1}
            }}
        ]
        
        subcluster_result = list(clusters_collection.aggregate(subcluster_pipeline))
        if subcluster_result:
            result = subcluster_result[0]
            print(f"Total subclusters: {result['total_subclusters']}")
            print(f"Average keyphrases per subcluster: {result['avg_subcluster_keyphrases']:.2f}")
            
    except Exception as e:
        print(f"❌ Error getting summary: {str(e)}")

# Main execution
if __name__ == "__main__":
    try:
        print("🔄 Starting cluster transformation process...")
        print("=" * 60)
        
        # Test database connection
        test_doc = clusters_collection.find_one()
        if test_doc:
            print("✓ Database connection successful")
            print(f"Sample original document keys: {list(test_doc.keys())}\n")
        else:
            print("⚠ No documents found in clusters collection")
            exit(1)
        
        # Execute the transformation
        transformed_count, error_count = transform_clusters()
        
        print(f"\n📊 Transformation completed:")
        print(f"  Successfully transformed: {transformed_count} clusters")
        print(f"  Errors encountered: {error_count} clusters")
        
        # Verify transformation
        verify_transformation()
        
        # Get summary
        get_transformation_summary()
        
        print("\n" + "=" * 60)
        print("✅ Transformation process completed successfully!")
        print("=" * 60)
        
    except Exception as e:
        print(f"\n❌ Error during execution: {str(e)}")
        print("Please check your environment variables and database connection.")
    finally:
        # Close database connection
        if 'client' in locals():
            client.close()
            print("Database connection closed.")

Connecting to MongoDB...
Database: sparzaai
🔄 Starting cluster transformation process...
✓ Database connection successful
Sample original document keys: ['_id', 'cluster_id', 'cluster_name', 'keyphrases', 'created_at', 'data', 'subtopics', 'email_ids']

Fetching all clusters for transformation...
Found 33 clusters to transform

Transforming Cluster ID: 0
  Cluster Name: Regional Payment Issues
  Dominant Label: Regional Payment Issues
  Keyphrase Count: 6
  Domain: ['EU bank']
  Subclusters:
    0: 'SEPA Failures & Errors' (3 keyphrases)
    1: 'Clearing & Settlement Issues' (3 keyphrases)
  ✓ Successfully transformed cluster 0
--------------------------------------------------
Transforming Cluster ID: 1
  Cluster Name: International & Wire Transfers
  Dominant Label: International & Wire Transfers
  Keyphrase Count: 4
  Domain: ['EU bank']
  Subclusters:
    0: 'Cross-Border Transfer Issues' (2 keyphrases)
    1: 'Wire Transfer Status' (2 keyphrases)
  ✓ Successfully transformed clust

# update domain as banking

In [4]:
# Import required libraries
from pymongo import MongoClient
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Connect to MongoDB using environment variables
MONGO_CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
MONGO_DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')

if not MONGO_CONNECTION_STRING or not MONGO_DATABASE_NAME:
    raise ValueError("MONGO_CONNECTION_STRING and MONGO_DATABASE_NAME must be set in environment variables")

print(f"Connecting to MongoDB...")
print(f"Database: {MONGO_DATABASE_NAME}")

client = MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DATABASE_NAME]

# Get collections
clusters_collection = db['cluster']

def update_domains_to_banking():
    """
    Update all cluster documents to change domains from ["EU bank"] to ["banking"]
    """
    
    print("Starting domain update process...")
    print("=" * 50)
    
    try:
        # Count total documents before update
        total_docs = clusters_collection.count_documents({})
        print(f"Total clusters in collection: {total_docs}")
        
        # Count documents that currently have ["EU bank"] domain
        eu_bank_count = clusters_collection.count_documents({"domains": ["EU bank"]})
        print(f"Clusters with 'EU bank' domain: {eu_bank_count}")
        
        # Count documents that already have ["banking"] domain
        banking_count = clusters_collection.count_documents({"domains": ["banking"]})
        print(f"Clusters already with 'banking' domain: {banking_count}")
        
        print("\n" + "=" * 50)
        print("UPDATING DOMAINS...")
        print("=" * 50)
        
        # Update all documents to have domains: ["banking"]
        update_result = clusters_collection.update_many(
            {},  # Empty filter to match all documents
            {"$set": {"domains": ["banking"]}}
        )
        
        print(f"✓ Successfully updated {update_result.modified_count} documents")
        print(f"  Matched documents: {update_result.matched_count}")
        
        # Verify the update
        verify_update()
        
    except Exception as e:
        print(f"❌ Error during domain update: {str(e)}")

def verify_update():
    """
    Verify that all domains have been updated to ["banking"]
    """
    print("\n" + "=" * 50)
    print("VERIFICATION")
    print("=" * 50)
    
    try:
        # Count documents with different domain values
        banking_count = clusters_collection.count_documents({"domains": ["banking"]})
        eu_bank_count = clusters_collection.count_documents({"domains": ["EU bank"]})
        other_domains = clusters_collection.count_documents({
            "domains": {"$nin": [["banking"], ["EU bank"]]}
        })
        
        total_docs = clusters_collection.count_documents({})
        
        print(f"Total clusters: {total_docs}")
        print(f"Clusters with 'banking' domain: {banking_count}")
        print(f"Clusters with 'EU bank' domain: {eu_bank_count}")
        print(f"Clusters with other domains: {other_domains}")
        
        if banking_count == total_docs:
            print("\n✅ SUCCESS: All clusters now have 'banking' domain!")
        else:
            print(f"\n⚠ WARNING: {total_docs - banking_count} clusters still have different domains")
        
        # Show sample of updated documents
        print(f"\nSample of updated documents:")
        samples = list(clusters_collection.find({}, {
            'cluster_id': 1, 
            'domains': 1, 
            'dominant_label': 1
        }).limit(5).sort('cluster_id', 1))
        
        for sample in samples:
            cluster_id = sample.get('cluster_id', 'N/A')
            domains = sample.get('domains', [])
            label = sample.get('dominant_label', 'N/A')
            print(f"  Cluster {cluster_id}: domains={domains}, label='{label}'")
            
    except Exception as e:
        print(f"❌ Error during verification: {str(e)}")

def get_domain_statistics():
    """
    Get detailed statistics about domains in the collection
    """
    print("\n" + "=" * 50)
    print("DOMAIN STATISTICS")
    print("=" * 50)
    
    try:
        # Aggregate to get all unique domain combinations
        pipeline = [
            {'$group': {'_id': '$domains', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}}
        ]
        
        domain_stats = list(clusters_collection.aggregate(pipeline))
        
        print("Domain distribution:")
        for stat in domain_stats:
            domains = stat['_id']
            count = stat['count']
            print(f"  {domains}: {count} clusters")
            
        total_clusters = clusters_collection.count_documents({})
        if total_clusters > 0:
            banking_percentage = (clusters_collection.count_documents({"domains": ["banking"]}) / total_clusters) * 100
            print(f"\nPercentage with 'banking' domain: {banking_percentage:.1f}%")
            
    except Exception as e:
        print(f"❌ Error getting statistics: {str(e)}")

# Main execution
if __name__ == "__main__":
    try:
        print("🔄 Starting domains update to 'banking'...")
        print("=" * 60)
        
        # Test database connection
        test_doc = clusters_collection.find_one()
        if test_doc:
            print("✓ Database connection successful")
            current_domains = test_doc.get('domains', 'N/A')
            print(f"Sample current domains: {current_domains}\n")
        else:
            print("⚠ No documents found in clusters collection")
            exit(1)
        
        # Execute the domain update
        update_domains_to_banking()
        
        # Get detailed statistics
        get_domain_statistics()
        
        print("\n" + "=" * 60)
        print("✅ Domain update process completed successfully!")
        print("All clusters now have domains: ['banking']")
        print("=" * 60)
        
    except Exception as e:
        print(f"\n❌ Error during execution: {str(e)}")
        print("Please check your environment variables and database connection.")
    finally:
        # Close database connection
        if 'client' in locals():
            client.close()
            print("Database connection closed.")

Connecting to MongoDB...
Database: sparzaai
🔄 Starting domains update to 'banking'...
✓ Database connection successful
Sample current domains: ['EU bank']

Starting domain update process...
Total clusters in collection: 33
Clusters with 'EU bank' domain: 33
Clusters already with 'banking' domain: 0

UPDATING DOMAINS...
✓ Successfully updated 33 documents
  Matched documents: 33

VERIFICATION
Total clusters: 33
Clusters with 'banking' domain: 33
Clusters with 'EU bank' domain: 0
Clusters with other domains: 0

✅ SUCCESS: All clusters now have 'banking' domain!

Sample of updated documents:
  Cluster 0: domains=['banking'], label='Regional Payment Issues'
  Cluster 1: domains=['banking'], label='International & Wire Transfers'
  Cluster 2: domains=['banking'], label='Domestic Payment Processing'
  Cluster 3: domains=['banking'], label='Data Privacy & GDPR'
  Cluster 4: domains=['banking'], label='Digital Access & Authentication'

DOMAIN STATISTICS
Domain distribution:
  ['banking']: 33 c

# matching clusters with emails

In [8]:
from pymongo import MongoClient, UpdateOne
from typing import Dict, List, Optional, Set
import os
from dotenv import load_dotenv
from collections import defaultdict
import threading
from concurrent.futures import ThreadPoolExecutor
import time

# Load environment variables
load_dotenv()

class OptimizedEmailClusterMatcher:
    def __init__(self, connection_string: str, database_name: str):
        """
        Initialize the matcher with MongoDB connection
        """
        self.client = MongoClient(connection_string)
        self.db = self.client[database_name]
        self.emails_collection = self.db['emailmessages']
        self.clusters_collection = self.db['cluster']
        
        # Cache for cluster data - this is the key optimization
        self._cluster_cache = None
        self._subcluster_cache = None
        self._load_cluster_cache()
    
    def _load_cluster_cache(self):
        """
        Load all cluster data into memory for fast lookups
        """
        print("Loading cluster data into cache...")
        start_time = time.time()
        
        # Dictionary mapping keyphrase -> cluster info
        self._cluster_cache = {}
        # Dictionary mapping keyphrase -> subcluster info
        self._subcluster_cache = {}
        
        clusters = list(self.clusters_collection.find())
        print(f"Found {len(clusters)} clusters to cache")
        
        for cluster in clusters:
            cluster_id = cluster.get('cluster_id')
            dominant_label = cluster.get('dominant_label')
            keyphrases = cluster.get('keyphrases', [])
            subclusters = cluster.get('subclusters', {})
            
            # Cache cluster keyphrases
            for keyphrase in keyphrases:
                self._cluster_cache[keyphrase] = {
                    'cluster_id': cluster_id,
                    'dominant_label': dominant_label,
                    'subclusters': subclusters
                }
            
            # Cache subcluster keyphrases
            for subcluster_id, subcluster_data in subclusters.items():
                if not isinstance(subcluster_data, dict):
                    continue
                    
                subcluster_keyphrases = subcluster_data.get('keyphrases', [])
                for keyphrase in subcluster_keyphrases:
                    self._subcluster_cache[keyphrase] = {
                        'cluster_id': cluster_id,
                        'dominant_label': dominant_label,
                        'subcluster_id': int(subcluster_id),
                        'subcluster_label': subcluster_data.get('label')
                    }
        
        cache_time = time.time() - start_time
        print(f"Cache loaded in {cache_time:.2f} seconds")
        print(f"Cached {len(self._cluster_cache)} cluster keyphrases")
        print(f"Cached {len(self._subcluster_cache)} subcluster keyphrases")
    
    def find_matching_cluster_fast(self, dominant_topic: str) -> Optional[Dict]:
        """
        Fast cluster lookup using cached data
        """
        return self._cluster_cache.get(dominant_topic)
    
    def find_matching_subcluster_fast(self, dominant_topic: str) -> Optional[Dict]:
        """
        Fast subcluster lookup using cached data
        """
        return self._subcluster_cache.get(dominant_topic)
    
    def find_unmatched_emails(self, limit: int = None) -> List[Dict]:
        """
        Find emails that don't match any cluster or subcluster
        """
        unmatched = []
        
        # Get all emails with dominant_topic
        query = {"dominant_topic": {"$exists": True, "$ne": None}}
        cursor = self.emails_collection.find(query, {"dominant_topic": 1})
        
        if limit:
            cursor = cursor.limit(limit)
        
        for email in cursor:
            dominant_topic = email.get('dominant_topic')
            if not dominant_topic:
                continue
                
            # Check if it matches any cluster or subcluster
            cluster_match = self.find_matching_cluster_fast(dominant_topic)
            subcluster_match = self.find_matching_subcluster_fast(dominant_topic)
            
            if not cluster_match and not subcluster_match:
                unmatched.append({
                    'email_id': str(email['_id']),
                    'dominant_topic': dominant_topic
                })
        
        return unmatched
    
    def get_unique_dominant_topics(self) -> Dict:
        """
        Get all unique dominant_topic values and their counts
        """
        pipeline = [
            {"$match": {"dominant_topic": {"$exists": True, "$ne": None}}},
            {"$group": {"_id": "$dominant_topic", "count": {"$sum": 1}}},
            {"$sort": {"count": -1}}
        ]
        
        result = list(self.emails_collection.aggregate(pipeline))
        
        topics_info = {
            'total_unique_topics': len(result),
            'topics': result
        }
        
        return topics_info
    
    def analyze_matching_gaps(self) -> Dict:
        """
        Analyze what dominant_topics exist but don't match any clusters
        """
        print("Analyzing matching gaps...")
        
        # Get all unique dominant topics
        topics_info = self.get_unique_dominant_topics()
        print(f"Found {topics_info['total_unique_topics']} unique dominant topics")
        
        # Check which ones don't match
        unmatched_topics = {}
        matched_topics = {}
        
        for topic_data in topics_info['topics']:
            topic = topic_data['_id']
            count = topic_data['count']
            
            cluster_match = self.find_matching_cluster_fast(topic)
            subcluster_match = self.find_matching_subcluster_fast(topic)
            
            if cluster_match or subcluster_match:
                matched_topics[topic] = {
                    'count': count,
                    'cluster_match': bool(cluster_match),
                    'subcluster_match': bool(subcluster_match)
                }
            else:
                unmatched_topics[topic] = count
        
        return {
            'total_topics': topics_info['total_unique_topics'],
            'matched_topics': len(matched_topics),
            'unmatched_topics': len(unmatched_topics),
            'unmatched_details': unmatched_topics,
            'matched_details': matched_topics,
            'unmatched_email_count': sum(unmatched_topics.values()),
            'matched_email_count': sum([data['count'] for data in matched_topics.values()])
        }
    
    def create_fallback_cluster_entry(self, unmatched_topics: List[str]) -> Dict:
        """
        Create a fallback cluster entry for unmatched topics
        """
        fallback_cluster = {
            'cluster_id': 999,  # Use a high number to avoid conflicts
            'dominant_label': 'Unclassified Topics',
            'keyphrases': unmatched_topics,
            'subclusters': {
                '0': {
                    'label': 'Miscellaneous',
                    'keyphrases': unmatched_topics
                }
            }
        }
        return fallback_cluster
    
    def add_fallback_cluster_to_cache(self, unmatched_topics: List[str]) -> None:
        """
        Add unmatched topics to cache as a fallback cluster
        """
        print(f"Adding {len(unmatched_topics)} unmatched topics to fallback cluster...")
        
        for topic in unmatched_topics:
            # Add to cluster cache
            self._cluster_cache[topic] = {
                'cluster_id': 999,
                'dominant_label': 'Unclassified Topics',
                'subclusters': {'0': {'label': 'Miscellaneous', 'keyphrases': unmatched_topics}}
            }
            
            # Add to subcluster cache
            self._subcluster_cache[topic] = {
                'cluster_id': 999,
                'dominant_label': 'Unclassified Topics',
                'subcluster_id': 0,
                'subcluster_label': 'Miscellaneous'
            }
        
        print(f"✓ Added fallback cluster. Cache now has:")
        print(f"  - Cluster keyphrases: {len(self._cluster_cache)}")
        print(f"  - Subcluster keyphrases: {len(self._subcluster_cache)}")
    
    def process_emails_batch(self, emails: List[Dict]) -> List:
        """
        Process a batch of emails and return bulk operations in correct PyMongo format
        """
        bulk_operations = []
        
        for email in emails:
            dominant_topic = email.get('dominant_topic')
            if not dominant_topic:
                continue
            
            # Fast cluster lookup
            cluster_match = self.find_matching_cluster_fast(dominant_topic)
            subcluster_match = self.find_matching_subcluster_fast(dominant_topic)
            
            update_data = {}
            
            if cluster_match:
                update_data.update({
                    'kmeans_cluster_id': cluster_match['cluster_id'],
                    'dominant_label': cluster_match['dominant_label']
                })
            
            if subcluster_match:
                update_data.update({
                    'kmeans_cluster_id': subcluster_match['cluster_id'],
                    'dominant_label': subcluster_match['dominant_label'],
                    'subcluster_id': subcluster_match['subcluster_id'],
                    'subcluster_label': subcluster_match['subcluster_label']
                })
            
            if update_data:
                # Use PyMongo's UpdateOne class instead of dict
                bulk_operations.append(
                    UpdateOne(
                        {'_id': email['_id']}, 
                        {'$set': update_data}
                    )
                )
        
        return bulk_operations
    
    def process_emails_optimized(self, batch_size: int = 5000, max_workers: int = 4, dry_run: bool = False) -> Dict:
        """
        Optimized email processing with larger batches and optional threading
        """
        start_time = time.time()
        
        # Get total count more efficiently
        total_emails = self.emails_collection.estimated_document_count()
        processed = 0
        matched_clusters = 0
        matched_subclusters = 0
        total_updates = 0
        
        print(f"Processing ~{total_emails} emails in batches of {batch_size}")
        print(f"DRY RUN MODE: {'ON' if dry_run else 'OFF'}")
        
        # Create index on dominant_topic if it doesn't exist (for faster queries)
        try:
            self.emails_collection.create_index([("dominant_topic", 1)], background=True)
            print("✓ Index on dominant_topic created/verified")
        except Exception as e:
            print(f"Index creation note: {e}")
        
        # Process emails in larger batches
        cursor = self.emails_collection.find(
            {"dominant_topic": {"$exists": True, "$ne": None}},  # Only get emails with dominant_topic
            projection={'dominant_topic': 1}  # Only fetch the field we need
        ).batch_size(batch_size)
        
        batch = []
        batch_count = 0
        
        for email in cursor:
            batch.append(email)
            
            if len(batch) >= batch_size:
                batch_count += 1
                print(f"\n--- Processing batch {batch_count} ({len(batch)} emails) ---")
                
                # Process batch
                bulk_operations = self.process_emails_batch(batch)
                print(f"Generated {len(bulk_operations)} update operations")
                
                # Count matches for statistics
                batch_cluster_matches = 0
                batch_subcluster_matches = 0
                for email in batch:
                    dominant_topic = email.get('dominant_topic')
                    if dominant_topic:
                        if self.find_matching_cluster_fast(dominant_topic):
                            matched_clusters += 1
                            batch_cluster_matches += 1
                        if self.find_matching_subcluster_fast(dominant_topic):
                            matched_subclusters += 1
                            batch_subcluster_matches += 1
                
                print(f"Batch matches - Clusters: {batch_cluster_matches}, Subclusters: {batch_subcluster_matches}")
                
                # Execute bulk update (or skip if dry run)
                if bulk_operations and not dry_run:
                    try:
                        print("Executing bulk write...")
                        result = self.emails_collection.bulk_write(
                            bulk_operations, 
                            ordered=False  # Faster unordered operations
                        )
                        total_updates += result.modified_count
                        print(f"✓ Updated {result.modified_count} documents in batch {batch_count}")
                        
                        # Verify some updates
                        if result.modified_count > 0:
                            sample_updated = list(self.emails_collection.find(
                                {"kmeans_cluster_id": {"$exists": True}},
                                {"dominant_topic": 1, "kmeans_cluster_id": 1, "subcluster_id": 1}
                            ).limit(3))
                            print(f"Sample updated documents: {len(sample_updated)} found with cluster IDs")
                        
                    except Exception as e:
                        print(f"❌ Bulk write error in batch {batch_count}: {e}")
                        print(f"Error type: {type(e).__name__}")
                        # Show sample operation for debugging in readable format
                        if bulk_operations:
                            sample_op = bulk_operations[0]
                            print(f"Sample operation: Update {sample_op._filter} with {sample_op._doc}")
                elif bulk_operations and dry_run:
                    print(f"DRY RUN: Would update {len(bulk_operations)} documents")
                    # Show sample operations in readable format
                    for i, op in enumerate(bulk_operations[:3]):
                        print(f"Sample operation {i+1}: Update {op._filter} with {op._doc}")
                else:
                    print("No operations to execute (no matches found)")
                
                processed += len(batch)
                batch = []
                
                # Progress update
                elapsed = time.time() - start_time
                rate = processed / elapsed if elapsed > 0 else 0
                print(f"Progress: {processed} emails processed ({rate:.1f} emails/sec)")
        
        # Process remaining emails in the last batch
        if batch:
            batch_count += 1
            print(f"\n--- Processing final batch {batch_count} ({len(batch)} emails) ---")
            
            bulk_operations = self.process_emails_batch(batch)
            print(f"Generated {len(bulk_operations)} update operations")
            
            # Count matches for final batch
            batch_cluster_matches = 0
            batch_subcluster_matches = 0
            for email in batch:
                dominant_topic = email.get('dominant_topic')
                if dominant_topic:
                    if self.find_matching_cluster_fast(dominant_topic):
                        matched_clusters += 1
                        batch_cluster_matches += 1
                    if self.find_matching_subcluster_fast(dominant_topic):
                        matched_subclusters += 1
                        batch_subcluster_matches += 1
            
            print(f"Final batch matches - Clusters: {batch_cluster_matches}, Subclusters: {batch_subcluster_matches}")
            
            if bulk_operations and not dry_run:
                try:
                    print("Executing final bulk write...")
                    result = self.emails_collection.bulk_write(
                        bulk_operations, 
                        ordered=False
                    )
                    total_updates += result.modified_count
                    print(f"✓ Updated {result.modified_count} documents in final batch")
                except Exception as e:
                    print(f"❌ Bulk write error in final batch: {e}")
                    print(f"Error type: {type(e).__name__}")
            elif bulk_operations and dry_run:
                print(f"DRY RUN: Would update {len(bulk_operations)} documents")
            
            processed += len(batch)
        
        total_time = time.time() - start_time
        
        # Final verification
        if not dry_run and total_updates > 0:
            print(f"\n--- Verification ---")
            updated_count = self.emails_collection.count_documents({"kmeans_cluster_id": {"$exists": True}})
            print(f"Total documents with kmeans_cluster_id: {updated_count}")
            
            subcluster_count = self.emails_collection.count_documents({"subcluster_id": {"$exists": True}})
            print(f"Total documents with subcluster_id: {subcluster_count}")
        
        stats = {
            'total_emails': processed,
            'matched_clusters': matched_clusters,
            'matched_subclusters': matched_subclusters,
            'total_updates': total_updates,
            'processing_time': total_time,
            'emails_per_second': processed / total_time if total_time > 0 else 0,
            'cluster_match_rate': (matched_clusters / processed * 100) if processed > 0 else 0,
            'subcluster_match_rate': (matched_subclusters / processed * 100) if processed > 0 else 0,
            'dry_run': dry_run
        }
        
        return stats
    
    def process_with_fallback(self, batch_size: int = 5000, dry_run: bool = False) -> Dict:
        """
        Process emails with automatic fallback cluster for unmatched topics
        """
        print("=== PROCESSING WITH FALLBACK CLUSTER ===")
        
        # First, analyze gaps
        gaps = self.analyze_matching_gaps()
        
        if gaps['unmatched_email_count'] > 0:
            print(f"Found {gaps['unmatched_email_count']} unmatched emails")
            print(f"Unmatched topics: {list(gaps['unmatched_details'].keys())}")
            
            # Add fallback cluster to cache
            unmatched_topic_list = list(gaps['unmatched_details'].keys())
            self.add_fallback_cluster_to_cache(unmatched_topic_list)
            
            # Optionally save fallback cluster to database
            save_choice = input("Save fallback cluster to database permanently? (y/n): ")
            if save_choice.lower() == 'y':
                fallback_cluster = self.create_fallback_cluster_entry(unmatched_topic_list)
                try:
                    self.clusters_collection.insert_one(fallback_cluster)
                    print("✓ Fallback cluster saved to database")
                except Exception as e:
                    print(f"⚠️  Could not save fallback cluster: {e}")
        
        # Now process all emails (should be 100% match rate)
        return self.process_emails_optimized(batch_size=batch_size, dry_run=dry_run)
    
    def get_performance_stats(self) -> Dict:
        """
        Get database performance statistics
        """
        stats = {}
        
        # Collection sizes
        stats['total_emails'] = self.emails_collection.estimated_document_count()
        stats['emails_with_topic'] = self.emails_collection.count_documents({
            "dominant_topic": {"$exists": True, "$ne": None}
        })
        stats['total_clusters'] = self.clusters_collection.estimated_document_count()
        
        # Cache statistics
        stats['cached_cluster_keyphrases'] = len(self._cluster_cache) if self._cluster_cache else 0
        stats['cached_subcluster_keyphrases'] = len(self._subcluster_cache) if self._subcluster_cache else 0
        
        return stats
    
    def debug_matching_process(self, limit: int = 5) -> None:
        """
        Debug the matching process to see what's happening
        """
        print("\n=== DEBUGGING MATCHING PROCESS ===")
        
        # Check if we have any cluster data
        if not self._cluster_cache and not self._subcluster_cache:
            print("❌ NO CLUSTER CACHE DATA! This is why updates are failing.")
            return
        
        print(f"✓ Cluster cache has {len(self._cluster_cache)} entries")
        print(f"✓ Subcluster cache has {len(self._subcluster_cache)} entries")
        
        # Sample some cluster keyphrases
        print(f"\nSample cluster keyphrases:")
        for i, keyphrase in enumerate(list(self._cluster_cache.keys())[:10]):
            cluster_info = self._cluster_cache[keyphrase]
            print(f"  {i+1}. '{keyphrase}' -> Cluster {cluster_info['cluster_id']}")
        
        # Sample some subcluster keyphrases  
        print(f"\nSample subcluster keyphrases:")
        for i, keyphrase in enumerate(list(self._subcluster_cache.keys())[:10]):
            subcluster_info = self._subcluster_cache[keyphrase]
            print(f"  {i+1}. '{keyphrase}' -> Cluster {subcluster_info['cluster_id']}, Subcluster {subcluster_info['subcluster_id']}")
        
        # Check some actual emails
        print(f"\n=== TESTING {limit} EMAILS ===")
        emails = list(self.emails_collection.find(
            {"dominant_topic": {"$exists": True, "$ne": None}}
        ).limit(limit))
        
        if not emails:
            print("❌ NO EMAILS with dominant_topic found!")
            return
        
        for i, email in enumerate(emails, 1):
            dominant_topic = email.get('dominant_topic', 'NO_TOPIC')
            print(f"\n--- Email {i} ---")
            print(f"Email ID: {email['_id']}")
            print(f"Dominant Topic: '{dominant_topic}'")
            
            # Test cluster matching
            cluster_match = self.find_matching_cluster_fast(dominant_topic)
            if cluster_match:
                print(f"✓ CLUSTER MATCH: ID={cluster_match['cluster_id']}, Label='{cluster_match['dominant_label']}'")
            else:
                print(f"❌ No cluster match for '{dominant_topic}'")
            
            # Test subcluster matching  
            subcluster_match = self.find_matching_subcluster_fast(dominant_topic)
            if subcluster_match:
                print(f"✓ SUBCLUSTER MATCH: Cluster={subcluster_match['cluster_id']}, Subcluster={subcluster_match['subcluster_id']}, Label='{subcluster_match['subcluster_label']}'")
            else:
                print(f"❌ No subcluster match for '{dominant_topic}'")
            
            # Show what the update operation would look like
            update_data = {}
            if cluster_match:
                update_data.update({
                    'kmeans_cluster_id': cluster_match['cluster_id'],
                    'dominant_label': cluster_match['dominant_label']
                })
            if subcluster_match:
                update_data.update({
                    'kmeans_cluster_id': subcluster_match['cluster_id'],
                    'dominant_label': subcluster_match['dominant_label'],
                    'subcluster_id': subcluster_match['subcluster_id'],
                    'subcluster_label': subcluster_match['subcluster_label']
                })
            
            if update_data:
                print(f"UPDATE OPERATION: {update_data}")
            else:
                print("NO UPDATE OPERATION (no matches)")
        
        print(f"\n=== DATABASE STATE CHECK ===")
        # Check existing updates
        existing_with_cluster = self.emails_collection.count_documents({"kmeans_cluster_id": {"$exists": True}})
        existing_with_subcluster = self.emails_collection.count_documents({"subcluster_id": {"$exists": True}})
        emails_with_topic = self.emails_collection.count_documents({"dominant_topic": {"$exists": True, "$ne": None}})
        
        print(f"Emails with dominant_topic: {emails_with_topic}")
        print(f"Emails already with kmeans_cluster_id: {existing_with_cluster}")
        print(f"Emails already with subcluster_id: {existing_with_subcluster}")
        
        if emails_with_topic == 0:
            print("❌ PROBLEM: No emails have 'dominant_topic' field!")
        elif existing_with_cluster == emails_with_topic:
            print("✓ All emails already processed!")
        else:
            print(f"📝 {emails_with_topic - existing_with_cluster} emails need processing")
    
    def get_preview(self, limit: int = 10) -> List[Dict]:
        """
        Get a preview of email-cluster matches for testing
        """
        emails = list(self.emails_collection.find(
            {"dominant_topic": {"$exists": True, "$ne": None}}
        ).limit(limit))
        
        preview = []
        
        for email in emails:
            dominant_topic = email.get('dominant_topic')
            if not dominant_topic:
                continue
            
            cluster_match = self.find_matching_cluster_fast(dominant_topic)
            subcluster_match = self.find_matching_subcluster_fast(dominant_topic)
            
            preview.append({
                'email_id': str(email['_id']),
                'dominant_topic': dominant_topic,
                'cluster_match': cluster_match,
                'subcluster_match': subcluster_match
            })
        
        return preview
    
    def close_connection(self):
        """Close MongoDB connection"""
        self.client.close()

# Usage example
def main():
    # Get configuration from environment variables
    CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
    DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')
    
    if not CONNECTION_STRING:
        raise ValueError("MONGO_CONNECTION_STRING not found in environment variables")
    if not DATABASE_NAME:
        raise ValueError("MONGO_DATABASE_NAME not found in environment variables")
    
    print(f"Connecting to database: {DATABASE_NAME}")
    
    # Initialize optimized matcher
    matcher = OptimizedEmailClusterMatcher(CONNECTION_STRING, DATABASE_NAME)
    
    try:
        # Show performance stats
        print("\n--- Database Statistics ---")
        perf_stats = matcher.get_performance_stats()
        for key, value in perf_stats.items():
            print(f"{key}: {value:,}")
        
        # Analyze matching gaps
        print("\n--- Gap Analysis ---")
        gap_choice = input("Analyze which emails aren't matching? (y/n): ")
        if gap_choice.lower() == 'y':
            gaps = matcher.analyze_matching_gaps()
            print(f"\n=== MATCHING GAP ANALYSIS ===")
            print(f"Total unique topics: {gaps['total_topics']}")
            print(f"Matched topics: {gaps['matched_topics']}")
            print(f"Unmatched topics: {gaps['unmatched_topics']}")
            print(f"Matched emails: {gaps['matched_email_count']}")
            print(f"Unmatched emails: {gaps['unmatched_email_count']}")
            
            if gaps['unmatched_details']:
                print(f"\n--- UNMATCHED DOMINANT TOPICS ---")
                for topic, count in list(gaps['unmatched_details'].items())[:10]:
                    print(f"'{topic}' - {count} emails")
                
                if len(gaps['unmatched_details']) > 10:
                    print(f"... and {len(gaps['unmatched_details']) - 10} more")
                
                print(f"\n💡 To get 100% matches, you need to:")
                print(f"1. Add these topics to your cluster keyphrases, OR")
                print(f"2. Create a 'catch-all' cluster for unmatched topics")
        
        # Debug the matching process first
        print("\n--- Debugging Mode ---")
        debug_choice = input("Run debug mode to see why DB isn't updating? (y/n): ")
        if debug_choice.lower() == 'y':
            matcher.debug_matching_process()
        
        # Get a preview first
        print("\n--- Preview of Matches ---")
        preview = matcher.get_preview(limit=5)
        
        for i, item in enumerate(preview, 1):
            print(f"\n--- Email {i} ---")
            print(f"Dominant Topic: {item['dominant_topic']}")
            
            if item['subcluster_match']:
                print(f"✓ Subcluster Match: Cluster ID={item['subcluster_match']['cluster_id']}, "
                      f"Subcluster ID={item['subcluster_match']['subcluster_id']}, "
                      f"Label={item['subcluster_match']['subcluster_label']}")
            elif item['cluster_match']:
                print(f"✓ Cluster Match: ID={item['cluster_match']['cluster_id']}, "
                      f"Label={item['cluster_match']['dominant_label']}")
            else:
                print("✗ No match found")
        
        # Process all emails
        print("\n--- Processing Options ---")
        print("1. Dry run (see what would be updated without changing DB)")
        print("2. Full processing (actually update the database)")
        print("3. Process with fallback cluster (100% match guarantee)")
        choice = input("Choose option (1, 2, or 3): ")
        
        if choice in ['1', '2', '3']:
            if choice == '3':
                # Use fallback processing
                dry_run = False
                fallback_choice = input("Dry run with fallback first? (y/n): ")
                if fallback_choice.lower() == 'y':
                    dry_run = True
                
                print(f"\nStarting {'DRY RUN' if dry_run else 'LIVE'} processing with fallback...")
                batch_size = int(input("Enter batch size (recommended: 5000-10000): ") or "5000")
                stats = matcher.process_with_fallback(batch_size=batch_size, dry_run=dry_run)
            else:
                # Regular processing
                dry_run = (choice == '1')
                
                print(f"\nStarting {'DRY RUN' if dry_run else 'LIVE PROCESSING'}...")
                
                # Use larger batch size for better performance
                batch_size = int(input("Enter batch size (recommended: 5000-10000): ") or "5000")
                
                stats = matcher.process_emails_optimized(batch_size=batch_size, dry_run=dry_run)
            
            print("\n--- Final Results ---")
            print(f"Total emails processed: {stats['total_emails']:,}")
            print(f"Total updates made: {stats['total_updates']:,}")
            print(f"Cluster matches: {stats['matched_clusters']:,} ({stats['cluster_match_rate']:.1f}%)")
            print(f"Subcluster matches: {stats['matched_subclusters']:,} ({stats['subcluster_match_rate']:.1f}%)")
            print(f"Processing time: {stats['processing_time']:.2f} seconds")
            print(f"Processing rate: {stats['emails_per_second']:.1f} emails/second")
        
    finally:
        matcher.close_connection()

if __name__ == "__main__":
    main()

Connecting to database: sparzaai
Loading cluster data into cache...
Found 33 clusters to cache
Cache loaded in 2.88 seconds
Cached 155 cluster keyphrases
Cached 154 subcluster keyphrases

--- Database Statistics ---
total_emails: 2,004
emails_with_topic: 2,004
total_clusters: 33
cached_cluster_keyphrases: 155
cached_subcluster_keyphrases: 154

--- Gap Analysis ---
Analyzing matching gaps...
Found 155 unique dominant topics

=== MATCHING GAP ANALYSIS ===
Total unique topics: 155
Matched topics: 155
Unmatched topics: 0
Matched emails: 2004
Unmatched emails: 0

--- Debugging Mode ---

--- Preview of Matches ---

--- Email 1 ---
Dominant Topic: ACH Processing Error
✓ Subcluster Match: Cluster ID=2, Subcluster ID=0, Label=Electronic Payment Failures

--- Email 2 ---
Dominant Topic: ACH Processing Error
✓ Subcluster Match: Cluster ID=2, Subcluster ID=0, Label=Electronic Payment Failures

--- Email 3 ---
Dominant Topic: ACH Processing Error
✓ Subcluster Match: Cluster ID=2, Subcluster ID=0, L

In [1]:
import os
from pymongo import MongoClient
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Get connection details from environment variables
mongo_connection_string = os.getenv('MONGO_CONNECTION_STRING')
mongo_database_name = os.getenv('MONGO_DATABASE_NAME')

# Connect to MongoDB
client = MongoClient(mongo_connection_string)
db = client[mongo_database_name]
collection = db['emailmessages']

# Rename both fields in a single operation
result = collection.update_many(
    {},  # Empty filter to match all documents
    {
        "$rename": {
            "is_urgent": "urgency",
            "dominant_label": "dominant_cluster_label"
        }
    }
)

# Print results
print(f"Matched documents: {result.matched_count}")
print(f"Modified documents: {result.modified_count}")
print(f"Operation acknowledged: {result.acknowledged}")

# Verify the changes by checking a sample document
sample_doc = collection.find_one()
if sample_doc:
    print("\nSample document after rename:")
    print(f"Document ID: {sample_doc.get('_id')}")
    print(f"Has 'urgency' field: {'urgency' in sample_doc}")
    print(f"Has 'dominant_cluster_label' field: {'dominant_cluster_label' in sample_doc}")
    print(f"Has old 'is_urgent' field: {'is_urgent' in sample_doc}")
    print(f"Has old 'dominant_label' field: {'dominant_label' in sample_doc}")
else:
    print("No documents found in the collection")

# Optional: Count documents with the new field names
urgency_count = collection.count_documents({"urgency": {"$exists": True}})
cluster_label_count = collection.count_documents({"dominant_cluster_label": {"$exists": True}})

print(f"\nDocuments with 'urgency' field: {urgency_count}")
print(f"Documents with 'dominant_cluster_label' field: {cluster_label_count}")

# Close the connection
client.close()

Matched documents: 2004
Modified documents: 2004
Operation acknowledged: True

Sample document after rename:
Document ID: 688b0e9d0110d1205b4919d3
Has 'urgency' field: True
Has 'dominant_cluster_label' field: True
Has old 'is_urgent' field: False
Has old 'dominant_label' field: False

Documents with 'urgency' field: 2004
Documents with 'dominant_cluster_label' field: 2004


In [4]:
from pymongo import MongoClient, UpdateOne
from typing import Dict, List, Optional
import os
from dotenv import load_dotenv
import time

# Load environment variables
load_dotenv()

class ClusterKeyphraseUpdater:
    def __init__(self, connection_string: str, database_name: str):
        """Initialize the updater with MongoDB connection"""
        self.client = MongoClient(connection_string)
        self.db = self.client[database_name]
        self.emails_collection = self.db['emailmessages']
        self.clusters_collection = self.db['cluster']
        
        # Cache for keyphrase -> cluster mapping (ONLY cluster level)
        self._keyphrase_to_cluster = {}
        self._load_keyphrase_cache()
    
    def _load_keyphrase_cache(self):
        """Load only cluster-level keyphrases into memory for fast lookups"""
        print("Loading cluster keyphrase cache...")
        start_time = time.time()
        
        clusters = list(self.clusters_collection.find())
        print(f"Found {len(clusters)} clusters to process")
        
        for cluster in clusters:
            cluster_id = cluster.get('cluster_id')
            dominant_label = cluster.get('dominant_label')
            cluster_keyphrases = cluster.get('keyphrases', [])
            
            # Cache ONLY cluster-level keyphrases
            for keyphrase in cluster_keyphrases:
                self._keyphrase_to_cluster[keyphrase] = {
                    'cluster_id': cluster_id,
                    'dominant_label': dominant_label,
                    'matched_keyphrase': keyphrase
                }
        
        cache_time = time.time() - start_time
        print(f"Cache loaded in {cache_time:.2f} seconds")
        print(f"Cached {len(self._keyphrase_to_cluster)} cluster keyphrases")
    
    def find_matching_keyphrase(self, dominant_topic: str) -> Optional[Dict]:
        """Find the matching keyphrase for a dominant topic (cluster level only)"""
        return self._keyphrase_to_cluster.get(dominant_topic)
    
    def process_emails_batch(self, emails: List[Dict]) -> List:
        """Process a batch of emails and return bulk operations"""
        bulk_operations = []
        
        for email in emails:
            dominant_topic = email.get('dominant_topic')
            if not dominant_topic:
                continue
            
            # Find matching keyphrase
            match_info = self.find_matching_keyphrase(dominant_topic)
            
            if match_info:
                update_data = {
                    'kmeans_cluster_keyphrase': match_info['matched_keyphrase']
                }
                
                bulk_operations.append(
                    UpdateOne(
                        {'_id': email['_id']}, 
                        {'$set': update_data}
                    )
                )
        
        return bulk_operations
    
    def add_keyphrase_field(self, batch_size: int = 5000, dry_run: bool = False) -> Dict:
        """Add kmeans_cluster_keyphrase field to all matching emails"""
        start_time = time.time()
        
        # Get total count
        total_emails = self.emails_collection.count_documents({
            "dominant_topic": {"$exists": True, "$ne": None}
        })
        
        processed = 0
        matched = 0
        total_updates = 0
        
        print(f"Processing {total_emails} emails with dominant_topic")
        print(f"Batch size: {batch_size}")
        print(f"DRY RUN MODE: {'ON' if dry_run else 'OFF'}")
        
        # Create index for faster queries
        try:
            self.emails_collection.create_index([("dominant_topic", 1)], background=True)
            print("✓ Index on dominant_topic verified")
        except Exception as e:
            print(f"Index note: {e}")
        
        # Process in batches
        cursor = self.emails_collection.find(
            {"dominant_topic": {"$exists": True, "$ne": None}},
            projection={'dominant_topic': 1}
        ).batch_size(batch_size)
        
        batch = []
        batch_count = 0
        
        for email in cursor:
            batch.append(email)
            
            if len(batch) >= batch_size:
                batch_count += 1
                print(f"\n--- Processing batch {batch_count} ({len(batch)} emails) ---")
                
                # Process batch
                bulk_operations = self.process_emails_batch(batch)
                batch_matched = len(bulk_operations)
                matched += batch_matched
                
                print(f"Generated {batch_matched} keyphrase updates for this batch")
                
                # Execute bulk update (or skip if dry run)
                if bulk_operations and not dry_run:
                    try:
                        result = self.emails_collection.bulk_write(
                            bulk_operations, 
                            ordered=False
                        )
                        total_updates += result.modified_count
                        print(f"✓ Updated {result.modified_count} documents with keyphrase field")
                        
                    except Exception as e:
                        print(f"❌ Bulk write error in batch {batch_count}: {e}")
                
                elif bulk_operations and dry_run:
                    print(f"DRY RUN: Would add keyphrase field to {batch_matched} documents")
                    # Show sample operations
                    for i, op in enumerate(bulk_operations[:3]):
                        keyphrase = op._doc['$set']['kmeans_cluster_keyphrase']
                        print(f"  Sample {i+1}: Would set keyphrase='{keyphrase}'")
                
                processed += len(batch)
                batch = []
                
                # Progress update
                elapsed = time.time() - start_time
                rate = processed / elapsed if elapsed > 0 else 0
                print(f"Progress: {processed}/{total_emails} ({rate:.1f} emails/sec)")
        
        # Process remaining emails in final batch
        if batch:
            batch_count += 1
            print(f"\n--- Processing final batch {batch_count} ({len(batch)} emails) ---")
            
            bulk_operations = self.process_emails_batch(batch)
            batch_matched = len(bulk_operations)
            matched += batch_matched
            
            print(f"Generated {batch_matched} keyphrase updates for final batch")
            
            if bulk_operations and not dry_run:
                try:
                    result = self.emails_collection.bulk_write(
                        bulk_operations, 
                        ordered=False
                    )
                    total_updates += result.modified_count
                    print(f"✓ Updated {result.modified_count} documents in final batch")
                except Exception as e:
                    print(f"❌ Bulk write error in final batch: {e}")
            elif bulk_operations and dry_run:
                print(f"DRY RUN: Would add keyphrase field to {batch_matched} documents")
            
            processed += len(batch)
        
        total_time = time.time() - start_time
        
        # Final verification
        if not dry_run and total_updates > 0:
            print(f"\n--- Verification ---")
            keyphrase_count = self.emails_collection.count_documents({
                "kmeans_cluster_keyphrase": {"$exists": True}
            })
            print(f"Total documents with kmeans_cluster_keyphrase: {keyphrase_count}")
            
            # Show some sample results
            samples = list(self.emails_collection.find(
                {"kmeans_cluster_keyphrase": {"$exists": True}},
                {"dominant_topic": 1, "kmeans_cluster_keyphrase": 1}
            ).limit(5))
            
            print(f"\nSample results:")
            for i, sample in enumerate(samples, 1):
                print(f"  {i}. Topic: '{sample.get('dominant_topic')}' -> "
                      f"Keyphrase: '{sample.get('kmeans_cluster_keyphrase')}'")
        
        stats = {
            'total_emails_processed': processed,
            'emails_matched': matched,
            'total_updates': total_updates,
            'processing_time': total_time,
            'emails_per_second': processed / total_time if total_time > 0 else 0,
            'match_rate': (matched / processed * 100) if processed > 0 else 0,
            'dry_run': dry_run
        }
        
        return stats
    
    def debug_keyphrase_matching(self, limit: int = 10) -> None:
        """Debug the keyphrase matching process"""
        print("\n=== DEBUGGING CLUSTER KEYPHRASE MATCHING ===")
        
        # Check cache
        if not self._keyphrase_to_cluster:
            print("❌ NO CLUSTER KEYPHRASE CACHE DATA!")
            return
        
        print(f"✓ Cluster keyphrase cache: {len(self._keyphrase_to_cluster)} entries")
        
        # Show sample keyphrases
        print(f"\nSample cluster keyphrases:")
        for i, (keyphrase, info) in enumerate(list(self._keyphrase_to_cluster.items())[:10]):
            print(f"  {i+1}. '{keyphrase}' -> Cluster {info['cluster_id']} ({info['dominant_label']})")
        
        # Test with actual emails
        print(f"\n=== TESTING {limit} EMAILS ===")
        emails = list(self.emails_collection.find(
            {"dominant_topic": {"$exists": True, "$ne": None}}
        ).limit(limit))
        
        if not emails:
            print("❌ NO EMAILS with dominant_topic found!")
            return
        
        for i, email in enumerate(emails, 1):
            dominant_topic = email.get('dominant_topic', 'NO_TOPIC')
            print(f"\n--- Email {i} ---")
            print(f"Email ID: {email['_id']}")
            print(f"Dominant Topic: '{dominant_topic}'")
            
            # Test keyphrase matching
            match_info = self.find_matching_keyphrase(dominant_topic)
            if match_info:
                print(f"✓ CLUSTER KEYPHRASE MATCH: '{match_info['matched_keyphrase']}'")
                print(f"  Cluster ID: {match_info['cluster_id']}")
                print(f"  Cluster Label: {match_info['dominant_label']}")
            else:
                print(f"❌ No cluster keyphrase match for '{dominant_topic}'")
    
    def get_keyphrase_stats(self) -> Dict:
        """Get statistics about keyphrase matching"""
        # Count emails with dominant_topic
        emails_with_topic = self.emails_collection.count_documents({
            "dominant_topic": {"$exists": True, "$ne": None}
        })
        
        # Count emails already with keyphrase field
        emails_with_keyphrase = self.emails_collection.count_documents({
            "kmeans_cluster_keyphrase": {"$exists": True}
        })
        
        # Get unique dominant topics and check match rates
        unique_topics = self.emails_collection.distinct("dominant_topic")
        unique_topics = [topic for topic in unique_topics if topic is not None]
        
        matchable_topics = 0
        for topic in unique_topics:
            if self.find_matching_keyphrase(topic):
                matchable_topics += 1
        
        return {
            'total_emails_with_topic': emails_with_topic,
            'emails_with_keyphrase_field': emails_with_keyphrase,
            'unique_dominant_topics': len(unique_topics),
            'matchable_topics': matchable_topics,
            'topic_match_rate': (matchable_topics / len(unique_topics) * 100) if unique_topics else 0,
            'cached_cluster_keyphrases': len(self._keyphrase_to_cluster)
        }
    
    def close_connection(self):
        """Close MongoDB connection"""
        self.client.close()

def main():
    # Get configuration from environment variables
    CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
    DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')
    
    if not CONNECTION_STRING:
        raise ValueError("MONGO_CONNECTION_STRING not found in environment variables")
    if not DATABASE_NAME:
        raise ValueError("MONGO_DATABASE_NAME not found in environment variables")
    
    print(f"Connecting to database: {DATABASE_NAME}")
    
    # Initialize keyphrase updater
    updater = ClusterKeyphraseUpdater(CONNECTION_STRING, DATABASE_NAME)
    
    try:
        # Show current statistics
        print("\n--- Current Statistics ---")
        stats = updater.get_keyphrase_stats()
        for key, value in stats.items():
            if isinstance(value, float):
                print(f"{key}: {value:.1f}")
            else:
                print(f"{key}: {value:,}")
        
        # Debug keyphrase matching
        print("\n--- Debug Mode ---")
        debug_choice = input("Run debug mode to see keyphrase matching? (y/n): ")
        if debug_choice.lower() == 'y':
            updater.debug_keyphrase_matching()
        
        # Choose processing mode
        print("\n--- Processing Options ---")
        print("1. Dry run (see what would be updated)")
        print("2. Live processing (actually add keyphrase field)")
        choice = input("Choose option (1 or 2): ")
        
        if choice in ['1', '2']:
            dry_run = (choice == '1')
            
            print(f"\nStarting {'DRY RUN' if dry_run else 'LIVE PROCESSING'}...")
            
            # Get batch size
            batch_size = int(input("Enter batch size (recommended: 5000): ") or "5000")
            
            # Process emails
            results = updater.add_keyphrase_field(batch_size=batch_size, dry_run=dry_run)
            
            print("\n--- Final Results ---")
            print(f"Total emails processed: {results['total_emails_processed']:,}")
            print(f"Emails with matching keyphrases: {results['emails_matched']:,}")
            print(f"Total updates made: {results['total_updates']:,}")
            print(f"Match rate: {results['match_rate']:.1f}%")
            print(f"Processing time: {results['processing_time']:.2f} seconds")
            print(f"Processing rate: {results['emails_per_second']:.1f} emails/second")
            
            if not dry_run and results['total_updates'] > 0:
                print(f"\n✅ Successfully added kmeans_cluster_keyphrase field to {results['total_updates']:,} emails!")
        
    finally:
        updater.close_connection()

if __name__ == "__main__":
    main()

Connecting to database: sparzaai
Loading cluster keyphrase cache...
Found 33 clusters to process
Cache loaded in 2.76 seconds
Cached 155 cluster keyphrases

--- Current Statistics ---
total_emails_with_topic: 2,004
emails_with_keyphrase_field: 0
unique_dominant_topics: 155
matchable_topics: 155
topic_match_rate: 100.0
cached_cluster_keyphrases: 155

--- Debug Mode ---

--- Processing Options ---
1. Dry run (see what would be updated)
2. Live processing (actually add keyphrase field)

Starting LIVE PROCESSING...
Processing 2004 emails with dominant_topic
Batch size: 1000
DRY RUN MODE: OFF
✓ Index on dominant_topic verified

--- Processing batch 1 (1000 emails) ---
Generated 1000 keyphrase updates for this batch
✓ Updated 1000 documents with keyphrase field
Progress: 1000/2004 (289.8 emails/sec)

--- Processing batch 2 (1000 emails) ---
Generated 1000 keyphrase updates for this batch
✓ Updated 1000 documents with keyphrase field
Progress: 2000/2004 (372.4 emails/sec)

--- Processing fina

In [6]:
import os
from pymongo import MongoClient
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get connection details from environment variables
MONGO_CONNECTION_STRING = os.getenv('MONGO_CONNECTION_STRING')
MONGO_DATABASE_NAME = os.getenv('MONGO_DATABASE_NAME')

if not MONGO_CONNECTION_STRING or not MONGO_DATABASE_NAME:
    raise ValueError("Please set MONGO_CONNECTION_STRING and MONGO_DATABASE_NAME in your environment variables")

# Connect to MongoDB
client = MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DATABASE_NAME]
collection = db['emailmessages']

try:
    # Add domain field to all documents
    result = collection.update_many(
        {},  # Empty filter to match all documents
        {"$set": {"domain": "banking"}}
    )
    
    print(f"Matched documents: {result.matched_count}")
    print(f"Modified documents: {result.modified_count}")
    
    if result.matched_count == 2004:
        print("Successfully updated all 2004 documents!")
    else:
        print(f"Expected 2004 documents, but found {result.matched_count}")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Close the connection
    client.close()

# Alternative: Add domain field only to documents that don't already have it
def add_domain_conditionally():
    try:
        result = collection.update_many(
            {"domain": {"$exists": False}},  # Only documents without 'domain' field
            {"$set": {"domain": "banking"}}
        )
        
        print(f"Documents without domain field: {result.matched_count}")
        print(f"Modified documents: {result.modified_count}")
        
    except Exception as e:
        print(f"An error occurred: {e}")

# Uncomment the line below if you want to run the conditional update instead
# add_domain_conditionally()

Matched documents: 2004
Modified documents: 2004
Successfully updated all 2004 documents!


In [1]:
import os
from pymongo import MongoClient
from dotenv import load_dotenv
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Get connection details from environment variables
mongo_connection_string = os.getenv('MONGO_CONNECTION_STRING')
mongo_database_name = os.getenv('MONGO_DATABASE_NAME')

def copy_processed_at_field():
    """
    Copy processed_at field from 'sample email' collection to 'emailmessages' collection
    """
    try:
        # Connect to MongoDB
        client = MongoClient(mongo_connection_string)
        db = client[mongo_database_name]
        
        # Get collections
        sample_email_collection = db['sample email']
        emailmessages_collection = db['emailmessages']
        
        logger.info("Connected to MongoDB successfully")
        
        # Get all documents from sample email collection with processed_at field
        sample_emails = list(sample_email_collection.find(
            {"processed_at": {"$exists": True}},
            {"_id": 1, "processed_at": 1}
        ))
        
        logger.info(f"Found {len(sample_emails)} documents with processed_at field in 'sample email' collection")
        
        if not sample_emails:
            logger.warning("No documents found with processed_at field in 'sample email' collection")
            return
        
        # Create a mapping of _id to processed_at value
        processed_at_mapping = {doc['_id']: doc['processed_at'] for doc in sample_emails}
        
        # Update tickets collection
        updated_count = 0
        failed_count = 0
        
        for doc_id, processed_at_value in processed_at_mapping.items():
            try:
                # Update the document in emailmessages collection
                result = emailmessages_collection.update_one(
                    {"_id": doc_id},
                    {"$set": {"processed_at": processed_at_value}},
                    upsert=False  # Don't create new documents if they don't exist
                )
                
                if result.matched_count > 0:
                    updated_count += 1
                    if updated_count % 100 == 0:  # Log progress every 100 updates
                        logger.info(f"Updated {updated_count} documents so far...")
                else:
                    logger.warning(f"Document with _id {doc_id} not found in emailmessages collection")
                    failed_count += 1
                    
            except Exception as e:
                logger.error(f"Failed to update document {doc_id}: {str(e)}")
                failed_count += 1
        
        logger.info(f"Operation completed:")
        logger.info(f"- Successfully updated: {updated_count} documents")
        logger.info(f"- Failed/Not found: {failed_count} documents")
        
        # Verify the operation
        verify_count = emailmessages_collection.count_documents({"processed_at": {"$exists": True}})
        logger.info(f"Verification: {verify_count} documents in 'emailmessages' collection now have 'processed_at' field")
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
    finally:
        # Close the connection
        try:
            client.close()
            logger.info("MongoDB connection closed")
        except:
            pass

def copy_all_processed_at_regardless_of_id():
    """
    Alternative approach: Copy processed_at values based on document order/position
    Use this if documents don't have matching _ids between collections
    """
    try:
        # Connect to MongoDB
        client = MongoClient(mongo_connection_string)
        db = client[mongo_database_name]
        
        # Get collections
        sample_email_collection = db['sample email']
        emailmessages_collection = db['emailmessages']
        
        logger.info("Connected to MongoDB successfully")
        
        # Get all documents from both collections
        sample_emails = list(sample_email_collection.find().sort("_id", 1))
        emailmessages = list(emailmessages_collection.find().sort("_id", 1))
        
        logger.info(f"Sample email collection has {len(sample_emails)} documents")
        logger.info(f"Emailmessages collection has {len(emailmessages)} documents")
        
        # Ensure both collections have the same number of documents
        min_count = min(len(sample_emails), len(emailmessages))
        
        if len(sample_emails) != len(emailmessages):
            logger.warning(f"Collections have different sizes. Will process {min_count} documents")
        
        updated_count = 0
        
        # Update emailmessages with processed_at values from sample emails
        for i in range(min_count):
            sample_doc = sample_emails[i]
            email_doc = emailmessages[i]
            
            # Check if sample document has processed_at field
            if 'processed_at' in sample_doc:
                try:
                    # Update the corresponding emailmessages document
                    result = emailmessages_collection.update_one(
                        {"_id": email_doc['_id']},
                        {"$set": {"processed_at": sample_doc['processed_at']}}
                    )
                    
                    if result.modified_count > 0:
                        updated_count += 1
                        if updated_count % 100 == 0:
                            logger.info(f"Updated {updated_count} documents so far...")
                            
                except Exception as e:
                    logger.error(f"Failed to update document at index {i}: {str(e)}")
        
        logger.info(f"Successfully updated {updated_count} documents with processed_at field")
        
        # Verify the operation
        verify_count = emailmessages_collection.count_documents({"processed_at": {"$exists": True}})
        logger.info(f"Verification: {verify_count} documents in 'emailmessages' collection now have 'processed_at' field")
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
    finally:
        # Close the connection
        try:
            client.close()
            logger.info("MongoDB connection closed")
        except:
            pass

if __name__ == "__main__":
    # Check if environment variables are set
    if not mongo_connection_string:
        logger.error("MONGO_CONNECTION_STRING environment variable is not set")
        exit(1)
    
    if not mongo_database_name:
        logger.error("MONGO_DATABASE_NAME environment variable is not set")
        exit(1)
    
    print("Choose the method to copy processed_at field:")
    print("1. Copy based on matching document _id (recommended)")
    print("2. Copy based on document order/position")
    
    choice = input("Enter your choice (1 or 2): ").strip()
    
    if choice == "1":
        logger.info("Starting copy operation based on matching _id...")
        copy_processed_at_field()
    elif choice == "2":
        logger.info("Starting copy operation based on document order...")
        copy_all_processed_at_regardless_of_id()
    else:
        logger.error("Invalid choice. Please run the script again and choose 1 or 2.")

Choose the method to copy processed_at field:
1. Copy based on matching document _id (recommended)
2. Copy based on document order/position


2025-09-02 15:58:11,828 - INFO - Starting copy operation based on document order...
2025-09-02 15:58:11,852 - INFO - Connected to MongoDB successfully
2025-09-02 15:59:59,144 - INFO - Sample email collection has 2004 documents
2025-09-02 15:59:59,145 - INFO - Emailmessages collection has 2004 documents
2025-09-02 16:00:26,488 - INFO - Updated 100 documents so far...
2025-09-02 16:00:53,800 - INFO - Updated 200 documents so far...
2025-09-02 16:01:23,335 - INFO - Updated 300 documents so far...
2025-09-02 16:01:51,553 - INFO - Updated 400 documents so far...
2025-09-02 16:02:19,406 - INFO - Updated 500 documents so far...
2025-09-02 16:02:47,466 - INFO - Updated 600 documents so far...
2025-09-02 16:03:15,931 - INFO - Updated 700 documents so far...
2025-09-02 16:03:43,931 - INFO - Updated 800 documents so far...
2025-09-02 16:04:12,252 - INFO - Updated 900 documents so far...
2025-09-02 16:04:39,695 - INFO - Updated 1000 documents so far...
2025-09-02 16:05:06,490 - INFO - Updated 1100