# Library Importation


In [18]:
import pandas as pd
import numpy as np
import spacy
import re
import threading

In [19]:
nlp = spacy.load("en_core_web_sm")

In [None]:
class PII_Logging:
    def __init__(
        self, data, output=False, replace=False, log_type="Block", debug=False
    ):
        """Class to detect and redact PII information in a given text.

        Args:
            data (str): logs to be processed in the form of a string.
            output (bool, optional): Whether to output the Flagged values in Debug mode. Defaults to False.
            replace (bool, optional): Whether to replace the PII information with "REDACTED". Defaults to False.
            log_type (str, optional): How to handle the PII information. Defaults to "Block".
            debug (bool, optional): Option to print debug statements. Defaults to False.
        """
        self.data = data
        self.output = output
        self.replace = replace
        self.log_type = log_type
        self.debug = debug
        self.PII_KEYWORD = {}
        self.PII_PATTERN = {
            "EMAIL": 0,
            "PHONE": 0,
            "SSN": 0,
            "Credit_Card": 0,
            "Expiration_Date": 0,
            "CVV": 0,
            "Driver's_License": 0,
            "Names": 0,
            "Dates": 0,
            "Addresses": 0,
            "Sensitive_Words": 0,
            "IPV6_Address": 0,
            "IPv4_Address": 0,
        }
        self.PII_OUTPUT = {}
        length = data.split(". ")
        for line_num in range(len(length)):
            self.PII_OUTPUT[line_num] = []

        self.output_function(data)

    def extract_match(self, pattern, text):
        """Helper to extract matched string or return None
        
        Args:
            pattern (str): The pattern to search for.
            text (str): The text to search in.

        Returns:
            str: The matched string or None if no match is found.
        """

        match = re.search(pattern, text)
        return match.group() if match else None

    def redaction(self, flags, text):
        """Redacting the PII information
        
        Args:
            flags (dict): The dictionary of flagged values.
            text (str): The text to redact.

        Returns:
            str: The redacted text.
        """

        for index, flag in flags.items():
            for word in flag:
                text = re.sub(word, "REDACTED", text)
        return text

    def keyword_detection(self, data):
        """Using NER to detect Key Information
        
        Args:
            data (str): The text to process.

        Returns:
            str: The processed text.
        """
        rows = data.split(". ")
        for row, sentence in enumerate(rows):
            doc = nlp(sentence)
            for token in doc.ents:
                if token.label_ in ["PERSON", "ORG", "GPE", "DATE"]:
                    if token.label_ not in self.PII_KEYWORD:
                        self.PII_KEYWORD[token.label_] = []
                if token.label_ in ["DATE"]:
                    self.PII_PATTERN["Dates"] += 1
                    self.PII_OUTPUT[row].append(token.text)
                if token.label_ in ["PERSON"]:
                    self.PII_PATTERN["Names"] += 1
                    self.PII_OUTPUT[row].append(token.text)
                if token.label_ in ["GPE"]:
                    self.PII_PATTERN["Addresses"] += 1
                    self.PII_OUTPUT[row].append(token.text)
                if token.label_ in ["ORG"]:
                    self.PII_PATTERN["Sensitive_Words"] += 1
                    self.PII_OUTPUT[row].append(token.text)

    def pattern_detection(self, data):
        """Using regex to detect Key Information
        
        Args:
            data (str): The text to process.

        Returns:
            str: The processed text.
        """
        patterns = {
            "EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "PHONE": r"\b(?:\+?1[\s.-]?)?(\(?\d{3}\)?[\s.-]?)\d{3}[\s.-]?\d{4}\b",
            "SSN": r"\b\d{3}-\d{2}-\d{4}\b",
            "Credit_Card": r"\b\d{4}-\d{4}-\d{4}-\d{4}\b|\b\d{16}\b",
            "Expiration_Date": r"\b\d{2}/\d{2}\b",
            "CVV": r"(?i)(cvv|cvc|cid|security\s+code)[\s:]*['\"]?\d{3,4}['\"]?",
            "Driver's_License": r"(?i)\b(?:[A-Z]{1,3}\d{4,8}|[A-Z]\d{6,12}|\d{3}[A-Z]{2}\d{4})\b",
            "Addresses": r"(\d{1,5}\s\w+\s\w+)|(P\.O\.\sBox\s\d+)|(\d{5})",
            "IPv4_Address": r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}\b",
            "IPV6_Address": r"(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|:(:[0-9a-fA-F]{1,4}){1,7}|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|::((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|(::ffff|::)ffff:([0-9]{1,3}\.){3}[0-9]{1,3}",
        }
        rows = data.split(". ") if isinstance(data, str) else data
        for row, text in enumerate(rows):
            for category, pattern in patterns.items():
                match = self.extract_match(pattern, text)
                if match:
                    self.PII_PATTERN[category] += 1
                    self.PII_OUTPUT[row].append(match)

    def detect_pii(self, data):
        """Uses Multithreading to process the text Calling prior functions.
        
        Args:
            data (str): The text to process.
        """

        doc = data
        keyword = threading.Thread(target=self.keyword_detection, args=(doc,))
        pattern = threading.Thread(target=self.pattern_detection, args=(doc,))
        keyword.start()
        pattern.start()
        keyword.join()
        pattern.join()

    def output_function(self, data):
        """Output function that handles the output of the PII information.
        
        Args:
            data (str): The text to process.
        """
        self.detect_pii(data)
        for category in self.PII_PATTERN:
            if self.PII_PATTERN[category] > 0:
                if self.debug:
                    print(
                        f"""PII Detected in category: {category}\nNumber Detected: {self.PII_PATTERN[category]}"""
                    )
        if self.replace:
            self.doc_redacted = self.redaction(self.PII_OUTPUT, data)
        if self.output:
            if self.debug:
                if self.replace:
                    print(f"\n{self.doc_redacted}")
                print("\nFlagged Sentences:")
                print(self.PII_OUTPUT)

        if max(self.PII_PATTERN.values()) > 0:
            if self.log_type == "Block":
                print("PII Detected Entry is Not Loggable")
            elif self.log_type == "Mask":
                print("PII Detected Entry is Redacted")
            elif self.log_type == "Log":
                print("PII Detected: Message Logged Anyways")

In [169]:
ner_text = "John Smith, a resident of New York, contacted Acme Corp on Jan 1st. His colleague, Maria Garcia from Boston, also sent an inquiry."
customer = "Customer Jane Doe (email: jane.d@email.net, DOB: 1990-05-15) called to report an issue. She lives at 123 Main St, Anytown, USA 12345. Verified identity with SSN ending in 5678. Contact number is 555-123-4567."
transaction = "Transaction record for account ending in 4401. Credit Card: 4111-1111-1111-1111, Exp: 12/26. Driver's License #: D12345678."

fake_email1 = """From: john.smith@examplecorp.com
To: contact@acmecorp.com
Date: January 1, 2025, 10:15 AM EST
Subject: Inquiry Regarding Potential Enterprise Partnership

Dear Acme Corp Team,

My name is John Smith, and I am a Senior Project Manager based in New York. I am writing to express our strong interest in your enterprise-level data solutions. Our team has been following Acme Corp's innovations for some time, and we believe your platform could be a significant asset to our upcoming Q2 initiatives.

I would appreciate it if you could provide some preliminary information on your enterprise licensing model and any available case studies relevant to the logistics sector.

My colleague, Maria Garcia, who is based in our Boston office, will also be sending a separate inquiry with some more technical questions. We are working together on this evaluation project.

Thank you for your time and assistance.

Sincerely,

John Smith
Senior Project Manager
ExampleCorp | New York, NY"""
fake_email2 = """From: maria.garcia@examplecorp.com
To: contact@acmecorp.com
Date: January 2, 2025, 11:30 AM EST
Subject: Technical Questions - Following up on John Smith's Inquiry

Dear Acme Corp Team,

I am writing to follow up on the email sent yesterday by my colleague, John Smith. As he mentioned, my name is Maria Garcia, and I am the lead solutions architect for his team.

Building on John's request, I have a few specific technical questions regarding your platform's API integration capabilities:

What are the standard rate limits for API calls on your enterprise plan?

Do you offer support for custom data connectors, specifically for SAP S/4HANA?

Could you provide documentation on your security and data encryption protocols?

We are very excited about the possibility of a partnership. The answers to these questions will be crucial for our internal technical assessment.

Thank you,

Maria Garcia
Lead Solutions Architect
ExampleCorp | Boston, MA"""
dictionary_format = """{
  "timestamp": "2024-05-21T10:00:00Z",
  "level": "INFO",
  "message": "User login successful for user_email: test@example.com",
  "details": {
    "ip_address": "192.168.1.1",
    "user_agent": "Mozilla/5.0",
    "contact": "For support, call 555-123-4567.",
    "ssn_ref": "User SSN is 987-65-4321 for verification."
  }
}"""

Clean_Example1 = """From: [REDACTED EMAIL]
To: [REDACTED EMAIL]
Subject: Update on Ticket #[REDACTED ID]

Dear Support Team,

I am writing to follow up on a support ticket I filed on [REDACTED DATE].

My name is [REDACTED NAME], and my account number is [REDACTED ID]. The issue was regarding a delivery that was supposed to be sent to my address at [REDACTED ADDRESS].

A support agent mentioned that the technical team was looking into an issue and would call me back at [REDACTED PHONE], but I have not yet received a call. The case was logged from my home computer, which has the IP address [REDACTED IP].

Could you please provide an update on the status of my ticket?

Thank you,

[REDACTED NAME]"""

In [194]:
_ = PII_Logging(
    dictionary_format, output=True, replace=True, debug=False, log_type="Log"
)

PII Detected: Message Logged Anyways
