In [1]:
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
from presidio_analyzer import Pattern, PatternRecognizer

In [2]:



class PIIAgent:
    """
    A class to mask Personally Identifiable Information (PII) in an email thread.

    This class encapsulates the functionality to anonymize and deanonymize Personally Identifiable Information (PII)
    using custom patterns for various entities such as credit card numbers, account numbers, CIF numbers,
    UAE phone numbers, and Emirates IDs. It uses the inbuilt entities for Email Addresses, Locations, Person Names and Date Times.

    A new anonymizer instance is initialized every time the class is instantiated.
    """

    def __init__(self):
        # Initialize the anonymizer
        self.anonymizer = PresidioReversibleAnonymizer(
            analyzed_fields=[
                "IBAN_CODE",
                "EMAIL_ADDRESS",
                "PERSON",
                "LOCATION",
                "DATE_TIME",
            ],
            add_default_faker_operators=False,
        )
        # Add recognizers
        self._add_recognizers()

    def _add_recognizers(self):
        # Add custom recognizers to the anonymizer

        ## Credit Card Number
        credit_card_number_pattern = Pattern(
            name="credit_card_number_pattern",
            regex=r"\b\d{16}\b",
            score=1,
        )
        credit_card_number_recognizer = PatternRecognizer(
            supported_entity="CREDIT_CARD_NUMBER", patterns=[credit_card_number_pattern]
        )
        self.anonymizer.add_recognizer(credit_card_number_recognizer)

        ## Account Number
        account_number_pattern = Pattern(
            name="account_number_pattern",
            regex=r"\b120\d{8}\b",
            score=1,
        )
        account_number_recognizer = PatternRecognizer(
            supported_entity="ACCOUNT_NUMBER", patterns=[account_number_pattern]
        )
        self.anonymizer.add_recognizer(account_number_recognizer)

        ## CIF Number
        cif_number_pattern = Pattern(
            name="cif_number_pattern",
            regex=r"\b\d{6}\b",
            score=1,
        )
        cif_number_recognizer = PatternRecognizer(
            supported_entity="CIF_NUMBER", patterns=[cif_number_pattern]
        )
        self.anonymizer.add_recognizer(cif_number_recognizer)

        ## UAE Phone Number
        phone_number_pattern = Pattern(
            name="phone_number_pattern",
            regex=r"(?:\+971|00971|971)[\s\-]?5[\s\-]?\d{1}[\s\-]?\d{3}[\s\-]?\d{4}",
            score=1,
        )
        phone_number_recognizer = PatternRecognizer(
            supported_entity="UAE_PHONE_NUMBER", patterns=[phone_number_pattern]
        )
        self.anonymizer.add_recognizer(phone_number_recognizer)

        ## Emirates ID
        emirates_id_pattern = Pattern(
            name="emirates_id_pattern",
            regex=r"784-?\d{4}-?\d{7}-?\d",
            score=1,
        )
        emirates_id_recognizer = PatternRecognizer(
            supported_entity="EMIRATES_ID", patterns=[emirates_id_pattern]
        )
        self.anonymizer.add_recognizer(emirates_id_recognizer)

    def mask(self, text):
        # Anonymize the given text
        return self.anonymizer.anonymize(text)

    def unmask(self, anonymized_text):
        # Deanonymize the given text
        return self.anonymizer.deanonymize(anonymized_text)


In [None]:
import os
from typing import List, Dict, Tuple

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv

In [None]:


class RAGWithPIIApplication:
    """
    A RAG application that handles PII data safely by anonymizing content before storage
    and deanonymizing it during retrieval, using Google's Gemini API for generation.
    """
    
<<<<<<< Tabnine <<<<<<<
    def __init__(#-
        self, #-
        persist_directory: str = "./chroma_db",#-
        model_name: str = "gemini-pro",#-
        temperature: float = 0.1,#-
        top_p: float = 0.8,#-
        top_k: int = 40,#-
        max_output_tokens: int = 2048#-
    ):#-
    class RAGWithPIIApplication:#+
        """
        Initialize the RAG application with PII handling capabilities.#-
#-
        Args:#-
            persist_directory (str): Directory to persist the vector store#-
            model_name (str): Gemini model name to use#-
            temperature (float): Sampling temperature for generation#-
            top_p (float): Nucleus sampling parameter#-
            top_k (int): Top-k sampling parameter#-
            max_output_tokens (int): Maximum number of tokens in the generated response#-
        A RAG application that handles PII data safely by anonymizing content before storage#+
        and deanonymizing it during retrieval, using Google's Gemini API for generation.#+
        """

        self.pii_agent = PIIAgent()#-
        self.embeddings = HuggingFaceEmbeddings(#-
            model_name="sentence-transformers/all-MiniLM-L6-v2"#-
        )#-
        def __init__(#+
            self, #+
            persist_directory: str = "./chroma_db",#+
            model_name: str = "gemini-pro",#+
            temperature: float = 0.1,#+
            top_p: float = 0.8,#+
            top_k: int = 40,#+
            max_output_tokens: int = 2048#+
        ):#+
            """#+
            Initialize the RAG application with PII handling capabilities.#+

            Args:#+
                persist_directory (str): Directory to persist the vector store#+
                model_name (str): Gemini model name to use#+
                temperature (float): Sampling temperature for generation#+
                top_p (float): Nucleus sampling parameter#+
                top_k (int): Top-k sampling parameter#+
                max_output_tokens (int): Maximum number of tokens in the generated response#+
            """#+

        self.llm = ChatGoogleGenerativeAI(#-
            model=model_name,#-
            temperature=temperature,#-
            top_p=top_p,#-
            top_k=top_k,#-
            max_output_tokens=max_output_tokens,#-
            convert_system_message_to_human=True,api_key=""#-
        )#-
            self.pii_agent = PIIAgent()#+
            self.embeddings = HuggingFaceEmbeddings(#+
                model_name="sentence-transformers/all-MiniLM-L6-v2"#+
            )#+


        self.persist_directory = persist_directory#-
        self.vector_store = Chroma(#-
            persist_directory=persist_directory,#-
            embedding_function=self.embeddings#-
        )#-
            self.llm = ChatGoogleGenerativeAI(#+
                model=model_name,#+
                temperature=temperature,#+
                top_p=top_p,#+
                top_k=top_k,#+
                max_output_tokens=max_output_tokens,#+
                convert_system_message_to_human=True,api_key="AIzaSyBcS2IyK2LdE5EeVDXLBUNCztj09G25Be0"#+
            )#+


        self.text_mappings: Dict[str, Tuple[str, str]] = {}#-
            self.persist_directory = persist_directory#+
            self.vector_store = Chroma(#+
                persist_directory=persist_directory,#+
                embedding_function=self.embeddings#+
            )#+


        self.qa_prompt = PromptTemplate(#-
            input_variables=["context", "question"],#-
            template="""#-
            Context: {context}#-
            self.text_mappings: Dict[str, Tuple[str, str]] = {}#+

            Question: {question}#-

            Please provide a clear and concise answer based on the context above. If the information isn't available in the context, please say so.#-
            self.qa_prompt = PromptTemplate(#+
                input_variables=["context", "question"],#+
                template="""#+
                Context: {context}#+

            Answer:"""#-
        )#-
                Question: {question}#+
#+
                Please provide a clear and concise answer based on the context above. If the information isn't available in the context, please say so.#+
#+
                Answer:"""#+
            )#+
>>>>>>> Tabnine >>>>>>># {"conversationId":"03485467-301c-40eb-ba8c-f77572ce9510","source":"instruct"}
        
    def process_documents(self, documents_dir: str, batch_size: int = 100) -> None:
        """Process documents from a directory, anonymize PII, and store in vector database."""
        loader = DirectoryLoader(
            documents_dir,
            glob="**/*.txt",
            loader_cls=TextLoader
        )
        documents = loader.load()
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        splits = text_splitter.split_documents(documents)
        
        for i in range(0, len(splits), batch_size):
            batch = splits[i:i + batch_size]
            self._process_batch(batch)
            
        self.vector_store.persist()
        
    def _process_batch(self, documents: List[Document]) -> None:
        """Process a batch of documents by anonymizing PII and storing in vector store."""
        anonymized_docs = []
        
        for doc in documents:
            anonymized_text = self.pii_agent.mask(doc.page_content)
            doc_id = hash(doc.page_content)
            self.text_mappings[doc_id] = (doc.page_content, anonymized_text)
            
            anonymized_doc = Document(
                page_content=anonymized_text,
                metadata={
                    **doc.metadata,
                    "doc_id": doc_id
                }
            )
            anonymized_docs.append(anonymized_doc)
        
        self.vector_store.add_documents(anonymized_docs)
        
    def query(
        self, 
        query: str, 
        k: int = 4,
        system_prompt: str = None
    ) -> str:
        """
        Query the RAG system with automatic PII handling using Gemini.
        
        Args:
            query (str): User query
            k (int): Number of relevant documents to retrieve
            system_prompt (str, optional): System prompt for Gemini
            
        Returns:
            str: Generated response with deanonymized content
        """
    
        anonymized_query = self.pii_agent.mask(query)
        
    
        qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vector_store.as_retriever(
                search_kwargs={"k": k}
            ),
            chain_type_kwargs={
                "prompt": self.qa_prompt,
                "verbose": True
            }
        )
        
    
        response = qa_chain.run(anonymized_query)
        
    
        deanonymized_response = self.pii_agent.unmask(response)
        
        return deanonymized_response
    
    def add_single_document(self, content: str, metadata: Dict = None) -> None:
        """Add a single document to the RAG system."""
        if metadata is None:
            metadata = {}
            
        anonymized_content = self.pii_agent.mask(content)
        doc_id = hash(content)
        self.text_mappings[doc_id] = (content, anonymized_content)
        
        doc = Document(
            page_content=anonymized_content,
            metadata={
                **metadata,
                "doc_id": doc_id
            }
        )
        
        self.vector_store.add_documents([doc])
        self.vector_store.persist()

    def set_custom_prompt(self, template: str, input_variables: List[str] = None) -> None:
        """
        Set a custom prompt template for the QA chain.
        
        Args:
            template (str): The prompt template string
            input_variables (List[str], optional): List of input variables in the template.
                          Defaults to ["context", "question"]
        """
        if input_variables is None:
            input_variables = ["context", "question"]
            
        self.qa_prompt = PromptTemplate(
            template=template,
            input_variables=input_variables
        )


if __name__ == "__main__":
    load_dotenv()  
    
    
    rag_app = RAGWithPIIApplication()
    
    sample_doc = """
   From: Sanjay Wariyar <sanjay.wariyar@silco.ae<mailto:sanjay.wariyar@silco.ae>>
Date: Thursday, 6 June 2024 at 12:05 PM
To: "Afroze.Naseem" <Afroze.Naseem@nbf.ae<mailto:Afroze.Naseem@nbf.ae>>
Cc: Anupam Paul <anupam.paul@silco.ae<mailto:anupam.paul@silco.ae>>, Saiu George <saju@silco.ae<mailto:saju@silco.ae>>
Subject: Re: Secure Email Message



Dear Afroze



Attached Invoice for which payment US$ 262,845 has be remitted .







Best Regards




    """
    
    
    rag_app.add_single_document(sample_doc)
    
    
    custom_prompt = """
    Context: {context}
    
    Question: {question}
    
    Please provide a professional and concise response based on the given context.
    Focus only on the relevant information and maintain a helpful tone.
    
    Answer:
    """
    rag_app.set_custom_prompt(custom_prompt)
    
    
    query = "What is payment amount for the attached invoice?"
    response = rag_app.query(query)
    print(f"Query: {query}")
    print(f"Response: {response}")





[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m
    Context: 
   From: <PERSON> <<EMAIL_ADDRESS><mailto:<EMAIL_ADDRESS>>>
Date: <DATE_TIME>, <DATE_TIME_2> at <DATE_TIME_3>
To: "<PERSON_2>.<PERSON_3>" <<PERSON_2>.<PERSON_3>@nbf.ae<mailto:<PERSON_2>.<PERSON_3>@nbf.ae>>
Cc: <PERSON_4> <<EMAIL_ADDRESS_3><mailto:<EMAIL_ADDRESS_3>>>, <PERSON_5>>>
Subject: Re: Secure Email Message



Dear <PERSON_2>



Attached Invoice for which payment US$ 262,845 has be remitted .







Best Regards




    


    Customer Support Ticket #12345
    Name: <PERSON>Email: <EMAIL_ADDRESS>
    Phone: <UAE_PHONE_NUMBER>
    Emirates ID: <EMIRATES_ID>
    Account: 1201234567
    
    Customer reported issues with recent transactions on their credit card <CREDIT_CARD_NUMBER>.
    Follow-up required by <DATE_TIME>.
    


    Customer Support Ticket #12345
    Name: <PERSON>Email: <EMAIL_ADDRESS>
    Phone: <UAE_PHONE_NUMBER>
  




[1m> Finished chain.[0m

[1m> Finished chain.[0m
Query: What is payment amount for the attached invoice?
Response: The payment amount for the attached invoice is US$ 262,845.


In [None]:
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
from presidio_analyzer import Pattern, PatternRecognizer
from langchain_google_genai.chat_models import ChatGoogleGenerativeAI
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain

class PIIAgent:
    def __init__(self):
        self.anonymizer = PresidioReversibleAnonymizer(
            analyzed_fields=[
                "IBAN_CODE",
                "EMAIL_ADDRESS",
                "PERSON",
                "LOCATION",
                "DATE_TIME",
            ],
            add_default_faker_operators=False,
        )
        self._add_recognizers()

    def _add_recognizers(self):
        credit_card_number_pattern = Pattern(
            name="credit_card_number_pattern",
            regex=r"\b\d{16}\b",
            score=1,
        )
        credit_card_number_recognizer = PatternRecognizer(
            supported_entity="CREDIT_CARD_NUMBER", patterns=[credit_card_number_pattern]
        )
        self.anonymizer.add_recognizer(credit_card_number_recognizer)

        account_number_pattern = Pattern(
            name="account_number_pattern",
            regex=r"\b120\d{8}\b",
            score=1,
        )
        account_number_recognizer = PatternRecognizer(
            supported_entity="ACCOUNT_NUMBER", patterns=[account_number_pattern]
        )
        self.anonymizer.add_recognizer(account_number_recognizer)

        cif_number_pattern = Pattern(
            name="cif_number_pattern",
            regex=r"\b\d{6}\b",
            score=1,
        )
        cif_number_recognizer = PatternRecognizer(
            supported_entity="CIF_NUMBER", patterns=[cif_number_pattern]
        )
        self.anonymizer.add_recognizer(cif_number_recognizer)

        phone_number_pattern = Pattern(
            name="phone_number_pattern",
            regex=r"(?:\+971|00971|971)[\s\-]?5[\s\-]?\d{1}[\s\-]?\d{3}[\s\-]?\d{4}",
            score=1,
        )
        phone_number_recognizer = PatternRecognizer(
            supported_entity="UAE_PHONE_NUMBER", patterns=[phone_number_pattern]
        )
        self.anonymizer.add_recognizer(phone_number_recognizer)

        emirates_id_pattern = Pattern(
            name="emirates_id_pattern",
            regex=r"784-?\d{4}-?\d{7}-?\d",
            score=1,
        )
        emirates_id_recognizer = PatternRecognizer(
            supported_entity="EMIRATES_ID", patterns=[emirates_id_pattern]
        )
        self.anonymizer.add_recognizer(emirates_id_recognizer)

    def mask(self, text):
        return self.anonymizer.anonymize(text)

    def unmask(self, anonymized_text):
        return self.anonymizer.deanonymize(anonymized_text)

class RAGPIIChatBot:
    def __init__(self):
        # Initialize LLM with Google Gemini
        self.llm = ChatGoogleGenerativeAI(model = "gemini-pro",api_key="AIzaSyBcS2IyK2LdE5EeVDXLBUNCztj09G25Be0")

        # Initialize open-source embeddings (e.g., SentenceTransformer)
        self.embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")

        # Load or create FAISS index for vector storage
        try:
            self.vectorstore = FAISS.load_local("faiss_index", embeddings=self.embeddings)
        except Exception:
            documents = []  # Initialize an empty list if no documents are preloaded
            self.vectorstore = FAISS.from_documents(documents, self.embeddings)
            self.vectorstore.save_local("faiss_index")  # Save the index for future use

        # Initialize retriever
        self.retriever = self.vectorstore.as_retriever()

        # Initialize conversation chain with the LLM and retriever
        self.qa_chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm, retriever=self.retriever
        )

        # PII Agent for masking and unmasking PII data
        self.pii_agent = PIIAgent()

    def run(self, query, chat_history=[]):
        masked_query = self.pii_agent.mask(query)
        response = self.qa_chain({"question": masked_query, "chat_history": chat_history})
        return self.pii_agent.unmask(response["answer"])

# Example usage
if __name__ == "__main__":
    rag_bot = RAGPIIChatBot()
    user_query = "What is the status of the payment for the account 1201234567?"
    response = rag_bot.run(user_query)
    print("Response:", response)
