In [None]:
import docx
import json
import xml.etree.ElementTree as ET

def create_test_docx():
    doc = docx.Document()
    doc.add_paragraph("Employee Report for Microsoft Corporation")
    doc.add_paragraph("Contact: John Smith (email: john.smith@microsoft.com)")
    doc.add_paragraph("Location: 123 Main Street, Seattle, Washington 98104")
    doc.add_paragraph("Project budget: $500,000")
    doc.add_paragraph("Team members visited London and Tokyo last quarter.")
    doc.save("document.docx")

def create_test_xml():
    root = ET.Element("company")
    employee = ET.SubElement(root, "employee")
    ET.SubElement(employee, "name").text = "Sarah Johnson"
    ET.SubElement(employee, "phone").text = "+1 (555) 123-4567"
    ET.SubElement(employee, "address").text = "456 Oak Avenue, Chicago, Illinois 60601"
    tree = ET.ElementTree(root)
    tree.write("data.xml")

def create_test_json():
    data = {
        "organization": "Apple Inc.",
        "employees": [
            {
                "name": "David Brown",
                "email": "david.brown@apple.com",
                "location": "Cupertino, California"
            }
        ],
        "transactions": [
            {
                "amount": "$750,000",
                "date": "2024-01-15"
            }
        ]
    }
    with open("data.json", "w") as f:
        json.dump(data, f, indent=4)

if __name__ == "__main__":
    create_test_docx()
    create_test_xml()
    create_test_json()
    print("Test documents created successfully.")

Test documents created successfully.


In [None]:
import os
import re
import spacy
import pytesseract
from pdf2image import convert_from_path
from PIL import Image, ImageDraw
from typing import List, Set, Tuple, Dict
import docx
from xml.etree import ElementTree as ET
import json

class RedactionError(Exception):
    """Custom exception for redaction errors."""
    pass

def get_file_info():
    """Get file type and path from user input."""
    print("\nSupported file types: docx, xml, json, pdf")
    file_type = input("Enter file type to process: ").lower().strip()
    file_path = input("Enter file path: ").strip()

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    if file_type not in ['docx', 'xml', 'json', 'pdf']:
        raise ValueError(f"Unsupported file type: {file_type}")

    return file_type, file_path

def get_pattern_types() -> List[str]:
    """Get pattern types to redact from user input."""
    available_patterns = ['EMAIL', 'PHONE', 'SSN', 'CREDIT_CARD', 'AADHAAR', 'PAN']
    print("\nAvailable pattern types:")
    print(", ".join(available_patterns))

    selected_patterns = input("\nEnter pattern types to redact (comma-separated) or press Enter to skip: ").strip()
    if not selected_patterns:
        return []
    return [p.strip().upper() for p in selected_patterns.split(',')]

def get_entities_to_redact() -> Tuple[List[str], Set[str], List[str]]:
    """Get entity types, custom terms, and pattern types to redact from user input."""
    nlp = spacy.load("en_core_web_sm")
    entity_types = nlp.pipe_labels['ner']

    print("\nAvailable entity types:")
    print(", ".join(entity_types))

    selected_types = input("\nEnter entity types to redact (comma-separated) or 'all': ").strip()
    pattern_types = get_pattern_types()
    custom_terms = input("Enter additional terms to redact (comma-separated) or press Enter to skip: ").strip()

    ent_types = [t.strip().upper() for t in selected_types.split(',')] if selected_types != 'all' else entity_types
    terms = set(t.strip() for t in custom_terms.split(',')) if custom_terms else set()

    return ent_types, terms, pattern_types

def find_pattern_matches(text: str, pattern_types: List[str]) -> Set[str]:
    """Find sensitive terms using selected regex patterns."""
    patterns = {
        'EMAIL': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        'PHONE': r'\+?1?\s*\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}',
        'SSN': r'\b\d{3}[-.]?\d{2}[-.]?\d{4}\b',
        'CREDIT_CARD': r'\b\d{4}[-. ]?\d{4}[-. ]?\d{4}[-. ]?\d{4}\b',
        'AADHAAR': r'\b[2-9]{1}[0-9]{3}[-. ]?[0-9]{4}[-. ]?[0-9]{4}\b',
        'PAN': r'\b[A-Z]{5}[0-9]{4}[A-Z]{1}\b'
    }

    sensitive_terms = set()
    for pattern_type in pattern_types:
        if pattern_type in patterns:
            matches = re.findall(patterns[pattern_type], text)
            sensitive_terms.update(matches)

    return sensitive_terms

def identify_entities(text: str, ent_types: List[str], pattern_types: List[str]) -> Set[str]:
    """Identify entities to redact using spaCy NER and selected pattern matching."""
    terms = set()

    # SpaCy NER
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(text)
    terms.update({ent.text for ent in doc.ents if ent.label_ in ent_types})

    # Pattern matching for selected patterns
    terms.update(find_pattern_matches(text, pattern_types))

    return terms

def redact_text(text: str, terms: Set[str]) -> str:
    """Redact terms in text by replacing with asterisks."""
    for term in sorted(terms, key=len, reverse=True):
        if term and not term.isspace():
            text = text.replace(term, '*' * len(term))
    return text

def redact_docx(doc_path: str, ent_types: List[str], custom_terms: Set[str], pattern_types: List[str]):
    """Redact sensitive information in a DOCX file."""
    doc = docx.Document(doc_path)
    terms = custom_terms.copy()

    for paragraph in doc.paragraphs:
        text = paragraph.text
        terms.update(identify_entities(text, ent_types, pattern_types))
        for run in paragraph.runs:
            run.text = redact_text(run.text, terms)

    for table in doc.tables:
        for row in table.rows:
            for cell in row.cells:
                for paragraph in cell.paragraphs:
                    text = paragraph.text
                    terms.update(identify_entities(text, ent_types, pattern_types))
                    for run in paragraph.runs:
                        run.text = redact_text(run.text, terms)

    new_path = get_redacted_path(doc_path)
    doc.save(new_path)
    return terms

def redact_xml(xml_path: str, ent_types: List[str], custom_terms: Set[str], pattern_types: List[str]):
    """Redact sensitive information in an XML file."""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    terms = custom_terms.copy()

    for element in root.iter():
        if element.text:
            terms.update(identify_entities(element.text, ent_types, pattern_types))
            element.text = redact_text(element.text, terms)

    new_path = get_redacted_path(xml_path)
    tree.write(new_path)
    return terms

def redact_json(json_path: str, ent_types: List[str], custom_terms: Set[str], pattern_types: List[str]):
    """Redact sensitive information in a JSON file."""
    with open(json_path, 'r') as f:
        data = json.load(f)

    terms = custom_terms.copy()

    def redact_value(value):
        if isinstance(value, str):
            terms.update(identify_entities(value, ent_types, pattern_types))
            return redact_text(value, terms)
        elif isinstance(value, list):
            return [redact_value(v) for v in value]
        elif isinstance(value, dict):
            return {k: redact_value(v) for k, v in value.items()}
        return value

    redacted_data = redact_value(data)
    new_path = get_redacted_path(json_path)

    with open(new_path, 'w') as f:
        json.dump(redacted_data, f, indent=4)

    return terms

def redact_image(image: Image.Image, ocr_data: dict, terms_to_redact: Set[str]) -> Image.Image:
    """Redact sensitive text in the image using word-level bounding boxes."""
    try:
        redacted_image = image.copy()
        draw = ImageDraw.Draw(redacted_image)

        n_boxes = len(ocr_data['text'])
        for i in range(n_boxes):
            if int(ocr_data['conf'][i]) > 0:
                word = ocr_data['text'][i]

                # Check if word should be redacted
                should_redact = False
                for term in terms_to_redact:
                    if term and not term.isspace():
                        if term.lower() in word.lower() or word.lower() in term.lower():
                            should_redact = True
                            break

                if should_redact:
                    x = ocr_data['left'][i]
                    y = ocr_data['top'][i]
                    w = ocr_data['width'][i]
                    h = ocr_data['height'][i]

                    # Add padding
                    padding = 2
                    draw.rectangle([
                        (x - padding, y - padding),
                        (x + w + padding, y + h + padding)
                    ], fill="black")

        return redacted_image
    except Exception as e:
        raise RedactionError(f"Image redaction failed: {str(e)}")

def convert_images_to_pdf(images: List[Image.Image], output_path: str) -> None:
    """Convert a list of images to a PDF file."""
    try:
        rgb_images = [img.convert('RGB') for img in images]
        rgb_images[0].save(
            output_path,
            save_all=True,
            append_images=rgb_images[1:],
            resolution=100.0
        )
    except Exception as e:
        raise RedactionError(f"Failed to create PDF: {str(e)}")

def process_pdf(pdf_path: str, ent_types: List[str], custom_terms: Set[str], pattern_types: List[str]):
    """Process PDF using OCR and image-based redaction."""
    try:
        print("Converting PDF to images...")
        images = convert_from_path(pdf_path)

        redacted_images = []
        terms = custom_terms.copy()

        for i, image in enumerate(images, 1):
            print(f"Processing page {i} of {len(images)}...")

            text = pytesseract.image_to_string(image)
            ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)

            page_terms = identify_entities(text, ent_types, pattern_types)
            terms.update(page_terms)

            if page_terms:
                print(f"Found {len(page_terms)} items to redact on page {i}")

            redacted_image = redact_image(image, ocr_data, terms)
            redacted_images.append(redacted_image)

        new_path = get_redacted_path(pdf_path)
        print("Creating redacted PDF...")
        convert_images_to_pdf(redacted_images, new_path)
        print(f"Redacted PDF saved as: {new_path}")

        return terms

    except Exception as e:
        raise RedactionError(f"PDF processing failed: {str(e)}")

def get_redacted_path(file_path: str) -> str:
    """Generate path for redacted file."""
    base, ext = os.path.splitext(file_path)
    return f"{base}_redacted{ext}"

def main():
    try:
        # Get file information and redaction settings
        file_type, file_path = get_file_info()
        ent_types, custom_terms, pattern_types = get_entities_to_redact()

        # Process based on file type
        redaction_funcs = {
            'docx': redact_docx,
            'xml': redact_xml,
            'json': redact_json,
            'pdf': process_pdf
        }

        terms = redaction_funcs[file_type](file_path, ent_types, custom_terms, pattern_types)

        print(f"\nProcessed {file_path}")
        print("Redacted terms:", ', '.join(sorted(terms)))

    except Exception as e:
        print(f"\nError: {str(e)}")
        return 1

    return 0

if __name__ == "__main__":
    exit(main())


Supported file types: docx, xml, json, pdf
Enter file type to process: xml
Enter file path: data.xml

Available entity types:
CARDINAL, DATE, EVENT, FAC, GPE, LANGUAGE, LAW, LOC, MONEY, NORP, ORDINAL, ORG, PERCENT, PERSON, PRODUCT, QUANTITY, TIME, WORK_OF_ART

Enter entity types to redact (comma-separated) or 'all': person,gpe

Available pattern types:
EMAIL, PHONE, SSN, CREDIT_CARD, AADHAAR, PAN

Enter pattern types to redact (comma-separated) or press Enter to skip: email
Enter additional terms to redact (comma-separated) or press Enter to skip: 

Processed data.xml
Redacted terms: Chicago, Illinois, Oak Avenue, Sarah Johnson
