In [3]:
import pandas as pd
import re
import hashlib
import uuid
import numpy as np
from datetime import datetime, timedelta
from faker import Faker

class DataAnonymizer:
    """
    A tool for anonymizing personally identifiable information (PII) in client records.
    """
    
    def __init__(self, seed=42):
        """
        Initialize the anonymizer with consistent seed for reproducibility.
        
        Args:
            seed (int): Random seed for reproducible anonymization
        """
        self.fake = Faker()
        Faker.seed(seed)
        np.random.seed(seed)
        self.mapping = {}  # For consistent replacement across the dataset
        
    def detect_pii(self, text):
        """
        Detect potential PII in text using regex patterns.
        
        Args:
            text (str): Input text to analyze
            
        Returns:
            dict: Dictionary of detected PII types and values
        """
        if not isinstance(text, str):
            return {}
            
        patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b',
            'ssn': r'\b\d{3}[-]?\d{2}[-]?\d{4}\b',
            'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
            'address': r'\b\d+\s+[A-Za-z0-9\s,]+\b(?:Avenue|Ave|Street|St|Road|Rd|Boulevard|Blvd|Drive|Dr|Lane|Ln|Court|Ct|Way)[,\s]+[A-Za-z]+[,\s]+[A-Z]{2}[,\s]+\d{5}\b',
            'name': r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+\b',  # Simple pattern for full names
            'date': r'\b(0[1-9]|1[0-2])/(0[1-9]|[12][0-9]|3[01])/\d{4}\b'  # MM/DD/YYYY format
        }
        
        detected = {}
        for pii_type, pattern in patterns.items():
            # Use finditer to get the exact matched strings
            matches = []
            for match in re.finditer(pattern, text):
                # Get the entire matched string
                matches.append(match.group(0))
            if matches:
                detected[pii_type] = matches
                
        return detected
    
    def hash_value(self, value, salt=""):
        """
        Create a consistent hash for a value.
        
        Args:
            value: Value to hash
            salt (str): Optional salt to add to the hash
            
        Returns:
            str: Hashed value
        """
        if not isinstance(value, str):
            value = str(value)
        
        return hashlib.sha256((value + salt).encode()).hexdigest()[:10]
    
    def get_replacement(self, original, pii_type):
        """
        Get consistent replacement for a PII value.
        
        Args:
            original: Original PII value
            pii_type (str): Type of PII
            
        Returns:
            str: Replacement value
        """
        # Create a unique key for this value and type
        key = f"{pii_type}:{original}"
        
        # Return existing mapping if available
        if key in self.mapping:
            return self.mapping[key]
        
        # Create a new replacement based on PII type
        if pii_type == 'email':
            replacement = f"person_{len(self.mapping)}@example.com"
        elif pii_type == 'phone':
            replacement = f"555-{self.hash_value(original)[:3]}-{self.hash_value(original)[3:7]}"
        elif pii_type == 'ssn':
            replacement = f"XXX-XX-{self.hash_value(original)[:4]}"
        elif pii_type == 'credit_card':
            replacement = f"XXXX-XXXX-XXXX-{self.hash_value(original)[:4]}"
        elif pii_type == 'address':
            replacement = f"{len(self.mapping) % 999 + 1} Example St, Anytown, US 12345"
        elif pii_type == 'name':
            replacement = self.fake.name()
        elif pii_type == 'date':
            # Adjust date within a reasonable range (±30 days)
            try:
                original_date = datetime.strptime(original, '%m/%d/%Y')
                date_shift = np.random.randint(-30, 30)
                new_date = original_date + timedelta(days=date_shift)
                replacement = new_date.strftime('%m/%d/%Y')
            except:
                replacement = "01/01/2000"
        else:
            # Generic replacement for other types
            replacement = f"ANON_{self.hash_value(original)}"
        
        # Store the mapping for consistency
        self.mapping[key] = replacement
        return replacement
    
    def anonymize_text(self, text):
        """
        Anonymize text by replacing detected PII.
        
        Args:
            text (str): Text to anonymize
            
        Returns:
            str: Anonymized text
        """
        if not isinstance(text, str):
            return text
            
        # Detect PII in the text
        detected_pii = self.detect_pii(text)
        
        # Replace each detected PII
        anonymized = text
        for pii_type, values in detected_pii.items():
            for value in values:
                if not value or not isinstance(value, str):
                    continue
                replacement = self.get_replacement(value, pii_type)
                anonymized = anonymized.replace(value, replacement)
                
        return anonymized
    
    def anonymize_dataframe(self, df, text_columns=None, sensitive_columns=None):
        """
        Anonymize a pandas DataFrame containing client records.
        
        Args:
            df (pd.DataFrame): DataFrame to anonymize
            text_columns (list): Column names containing free text that needs PII detection
            sensitive_columns (dict): Column names mapped to PII types for direct replacement
            
        Returns:
            pd.DataFrame: Anonymized DataFrame
        """
        # Make a copy to avoid modifying the original data
        df_anon = df.copy()
        
        # Process text columns with PII detection
        if text_columns:
            for col in text_columns:
                if col in df_anon.columns:
                    df_anon[col] = df_anon[col].apply(self.anonymize_text)
        
        # Process sensitive columns with direct replacement
        if sensitive_columns:
            for col, pii_type in sensitive_columns.items():
                if col in df_anon.columns:
                    df_anon[col] = df_anon[col].apply(
                        lambda x: self.get_replacement(x, pii_type) if pd.notna(x) else x
                    )
        
        return df_anon
    
    def export_mapping(self, filepath):
        """
        Export the anonymization mapping to a file for reference or de-anonymization.
        
        Args:
            filepath (str): Path to save the mapping file
        """
        mapping_df = pd.DataFrame([
            {"type_and_original": k, "anonymized": v} 
            for k, v in self.mapping.items()
        ])
        mapping_df.to_csv(filepath, index=False)
        
    def anonymize_csv(self, input_path, output_path, text_columns=None, sensitive_columns=None):
        """
        Anonymize a CSV file and save the result.
        
        Args:
            input_path (str): Path to the input CSV
            output_path (str): Path to save the anonymized CSV
            text_columns (list): Column names containing free text that needs PII detection
            sensitive_columns (dict): Column names mapped to PII types for direct replacement
        """
        df = pd.read_csv(input_path)
        df_anon = self.anonymize_dataframe(df, text_columns, sensitive_columns)
        df_anon.to_csv(output_path, index=False)
        
        # Also save the mapping
        mapping_path = output_path.replace('.csv', '_mapping.csv')
        self.export_mapping(mapping_path)
        
        return {
            "rows_processed": len(df),
            "pii_values_replaced": len(self.mapping),
            "output_path": output_path,
            "mapping_path": mapping_path
        }


# Example usage
if __name__ == "__main__":
    # Initialize the anonymizer
    anonymizer = DataAnonymizer(seed=42)
    
    # Example with a small DataFrame
    data = {
        'client_id': [1001, 1002, 1003],
        'name': ['John Smith', 'Jane Doe', 'Robert Johnson'],
        'email': ['john.smith@example.com', 'jane.doe@company.org', 'robert.j@gmail.com'],
        'phone': ['(555) 123-4567', '555-987-6543', '(123) 456-7890'],
        'address': ['123 Main St, New York, NY 10001', '456 Oak Ave, Los Angeles, CA 90001', '789 Pine Rd, Chicago, IL 60601'],
        'notes': [
            'Client called on 05/15/2023 about his account #12345. Contact again next week.',
            'Met with Jane at 456 Oak Ave to discuss her credit card 4111-2222-3333-4444.',
            'Robert requested a call back at (123) 456-7890 regarding his SSN 123-45-6789.'
        ]
    }
    
    df = pd.DataFrame(data)
    
    # Define columns that need processing
    text_columns = ['notes']
    sensitive_columns = {
        'name': 'name',
        'email': 'email',
        'phone': 'phone',
        'address': 'address'
    }
    
    # Anonymize the DataFrame
    df_anon = anonymizer.anonymize_dataframe(df, text_columns, sensitive_columns)
    
    # Display the results (in real scenario, save to file instead)
    print("Original Data:")
    print(df)
    print("\nAnonymized Data:")
    print(df_anon)

Original Data:
   client_id            name                   email           phone  \
0       1001      John Smith  john.smith@example.com  (555) 123-4567   
1       1002        Jane Doe    jane.doe@company.org    555-987-6543   
2       1003  Robert Johnson      robert.j@gmail.com  (123) 456-7890   

                              address  \
0     123 Main St, New York, NY 10001   
1  456 Oak Ave, Los Angeles, CA 90001   
2      789 Pine Rd, Chicago, IL 60601   

                                               notes  
0  Client called on 05/15/2023 about his account ...  
1  Met with Jane at 456 Oak Ave to discuss her cr...  
2  Robert requested a call back at (123) 456-7890...  

Anonymized Data:
   client_id             name                  email         phone  \
0       1001      Noah Rhodes   person_8@example.com  555-a28-583f   
1       1002  Angie Henderson   person_9@example.com  555-263-444a   
2       1003    Daniel Wagner  person_10@example.com  555-f03-2ddf   

            

In [None]:
import re
import json
import hashlib
from faker import Faker

class TextAnonymizer:
    """
    A minimal class to anonymize and de-anonymize text containing PII.
    """
    
    def __init__(self, seed=42):
        """Initialize with a seed for reproducibility."""
        self.fake = Faker()
        Faker.seed(seed)
        self.mapping = {}  # Original -> Anonymized
        self.reverse_mapping = {}  # Anonymized -> Original
        
    def detect_pii(self, text):
        """
        Detect PII in text using regex patterns.
        
        Returns a dictionary of detected PII by type.
        """
        patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b',
            'ssn': r'\b\d{3}[-]?\d{2}[-]?\d{4}\b',
            'name': r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+\b',  # Simple full name pattern
            'address': r'\b\d+\s+[A-Za-z\s]+(?:Avenue|Ave|Street|St|Road|Rd|Drive|Dr|Lane|Ln|Court|Ct)\b'
        }
        
        detected = {}
        for pii_type, pattern in patterns.items():
            matches = []
            for match in re.finditer(pattern, text):
                matches.append(match.group(0))
            if matches:
                detected[pii_type] = matches
                
        return detected
    
    def get_replacement(self, original, pii_type):
        """Get consistent replacement for a PII value."""
        # Return existing mapping if available
        if original in self.mapping:
            return self.mapping[original]
        
        # Create a new replacement based on PII type
        if pii_type == 'email':
            replacement = f"person{len(self.mapping)}@example.com"
        elif pii_type == 'phone':
            replacement = f"555-{len(self.mapping):03d}-{len(self.mapping) % 10000:04d}"
        elif pii_type == 'ssn':
            replacement = f"XXX-XX-{len(self.mapping) % 10000:04d}"
        elif pii_type == 'name':
            replacement = self.fake.name()
        elif pii_type == 'address':
            replacement = f"{len(self.mapping) % 999 + 1} Example St"
        else:
            # Hash for generic types
            h = hashlib.md5(original.encode()).hexdigest()[:8]
            replacement = f"ANON_{h}"
        
        # Store mappings in both directions
        self.mapping[original] = replacement
        self.reverse_mapping[replacement] = original
        
        return replacement
    
    def anonymize_text(self, text):
        """
        Anonymize text by replacing all detected PII.
        
        Returns the anonymized text.
        """
        # Detect PII
        detected_pii = self.detect_pii(text)
        
        # Replace each detected PII
        anonymized = text
        for pii_type, values in detected_pii.items():
            for value in values:
                replacement = self.get_replacement(value, pii_type)
                anonymized = anonymized.replace(value, replacement)
        
        return anonymized
    
    def deanonymize_text(self, text):
        """
        Restore original PII in anonymized text.
        
        Returns the de-anonymized text.
        """
        deanonymized = text
        
        # Sort the replacements by length (longest first)
        # This prevents partial replacements of nested anonymized values
        sorted_replacements = sorted(
            self.reverse_mapping.items(), 
            key=lambda x: len(x[0]), 
            reverse=True
        )
        
        # Replace each anonymized value with its original
        for anon, original in sorted_replacements:
            deanonymized = deanonymized.replace(anon, original)
            
        return deanonymized
    
    def export_mappings(self):
        """Export the anonymization mappings as JSON."""
        return json.dumps({
            "anonymized": self.mapping,
            "original": self.reverse_mapping
        }, indent=2)


# Simulated LLM function
def simulated_llm(text):
    """
    Simulates sending text to an LLM and getting a response.
    
    In a real implementation, this would be an API call to an LLM service.
    """
    # For simulation, we'll just echo the text with some analysis
    response = f"""
I analyzed the text you provided and found:
- The client mentioned in the text is dealing with a financial issue
- There appear to be some contact details and personal information
- A meeting is scheduled in the future
- Recommendation: Follow up with {text.split()[-20]} as mentioned 
  in the document to resolve the outstanding matters.
"""
    return response


# Example of the full anonymization workflow
def anonymization_workflow(input_text):
    # Step 1: Create the anonymizer
    anonymizer = TextAnonymizer()
    
    # Step 2: Anonymize the text
    anonymous_text = anonymizer.anonymize_text(input_text)
    print("\n=== ANONYMIZED TEXT ===")
    print(anonymous_text)
    
    # Step 3: Send to "LLM" (simulated)
    print("\n=== SENDING TO LLM ===")
    llm_response = simulated_llm(anonymous_text)
    print("\n=== LLM RESPONSE ===")
    print(llm_response)
    
    # Step 4: De-anonymize the response
    deanonymized_response = anonymizer.deanonymize_text(llm_response)
    print("\n=== DE-ANONYMIZED RESPONSE ===")
    print(deanonymized_response)
    
    # Step 5: Export the mappings (for reference)
    print("\n=== ANONYMIZATION MAPPINGS ===")
    print(anonymizer.export_mappings())
    
    return {
        "original_text": input_text,
        "anonymized_text": anonymous_text,
        "llm_response": llm_response,
        "deanonymized_response": deanonymized_response,
        "mappings": anonymizer.mapping
    }


# Run an example
if __name__ == "__main__":
    sample_text = """
    Client: Jason Statham (johnsmith@gmail.com)
    Phone: +61 123-456-7890
    Address: 123 Main Street BUNDANOON NSW 2578
    ACN: AC12345678
    
    Notes: Jason Statham called regarding his investment portfolio on January 15th. 
    He's concerned about the recent market fluctuations and wants to discuss 
    reallocating his assets. We've scheduled a meeting for next Tuesday at his home 
    address 123 Main Street. He requested that we also prepare an analysis of his 
    retirement accounts. He can also be reached at his alternate email: js1980@hotmail.com.
    """
    
    results = anonymization_workflow(sample_text)


=== ANONYMIZED TEXT ===

    Client: Allison Hill (person0@example.com)
    Phone: (555-002-0002
    Address: 123 Noah Rhodes, Anytown, CA 94582
    SSN: XXX-XX-0003
    
    Notes: Allison Hill called regarding his investment portfolio on January 15th. 
    He's concerned about the recent market fluctuations and wants to discuss 
    reallocating his assets. We've scheduled a meeting for next Tuesday at his home 
    address (123 Noah Rhodes). He requested that we also prepare an analysis of his 
    retirement accounts. He can also be reached at his alternate email: person1@example.com.
    

=== SENDING TO LLM ===

=== LLM RESPONSE ===

I analyzed the text you provided and found:
- The client mentioned in the text is dealing with a financial issue
- There appear to be some contact details and personal information
- A meeting is scheduled in the future
- Recommendation: Follow up with that as mentioned 
  in the document to resolve the outstanding matters.


=== DE-ANONYMIZED RESPON