In [8]:
import ollama
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import pdfplumber
import pytesseract
from PIL import Image
from transformers import LlamaTokenizer
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
from langchain_ollama import OllamaLLM
from langchain.embeddings import HuggingFaceEmbeddings  # Local embeddings
from typing import List, Dict, Union, Tuple
import logging


In [9]:
# 1. PDF Processing and Chunking with OCR
class DataPrivacyProcessor:
    def __init__(self):
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
        self.tokenizer = LlamaTokenizer.from_pretrained("meta-llama/Llama-2-7b")
        
    def extract_text_from_pdfs(self, pdf_paths: List[str]) -> List[str]:
        all_chunks = []
        for pdf_path in pdf_paths:
            try:
                with pdfplumber.open(pdf_path) as pdf:
                    for page in pdf.pages:
                        text = page.extract_text()
                        if text and len(text.strip()) > 10:
                            chunks = [chunk.strip() for chunk in text.split('\n\n') if len(chunk) > 50]
                            all_chunks.extend(chunks)
                        else:
                            img = page.to_image(resolution=300).original
                            text = pytesseract.image_to_string(img)
                            chunks = [chunk.strip() for chunk in text.split('\n\n') if len(chunk) > 50]
                            all_chunks.extend(chunks)
            except Exception as e:
                print(f"Error processing {pdf_path}: {e}")
        return all_chunks

    def vectorize_chunks(self, chunks: List[str]) -> np.ndarray:
        try:
            embeddings = self.embedder.encode(chunks, show_progress_bar=True)
            return embeddings
        except Exception as e:
            raise ValueError(f"Vectorization failed: {e}")

In [10]:
class PrivacyRetriever:
    def __init__(self, embeddings: np.ndarray, chunks: List[str], embedder: SentenceTransformer):
        self.chunks = chunks
        self.embedder = embedder
        # Normalize embeddings for cosine similarity
        embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        self.index = faiss.IndexFlatIP(embeddings.shape[1])  # Inner product for cosine similarity
        self.index.add(embeddings.astype(np.float32))  # FAISS requires float32
        
    def retrieve(self, query: str, top_k: int = 5, min_similarity: float = 0.5) -> List[Tuple[str, float]]:
        """
        Retrieve relevant chunks with similarity scores
        
        Args:
            query: Search query string
            top_k: Number of results to return
            min_similarity: Minimum similarity score (0-1)
            
        Returns:
            List of tuples containing (chunk_text, similarity_score)
            
        Raises:
            ValueError: If query encoding fails
            RuntimeError: If retrieval fails
        """
        try:
            # Encode and normalize query
            query_embedding = self.embedder.encode([query], show_progress_bar=False)
            query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
            query_embedding = query_embedding.astype(np.float32)
            
            # Search index
            similarities, indices = self.index.search(query_embedding, top_k)
            
            # Process results
            results = []
            for sim, idx in zip(similarities[0], indices[0]):
                if sim >= min_similarity and 0 <= idx < len(self.chunks):
                    results.append((self.chunks[idx], float(sim)))
            
            return results
            
        except ValueError as ve:
            raise ValueError(f"Query encoding failed: {ve}")
        except IndexError as ie:
            raise RuntimeError(f"FAISS index error: {ie}")
        except Exception as e:
            raise RuntimeError(f"Retrieval failed unexpectedly: {e}")
        


In [11]:
# 3. RAG with LLaMA3:8B (Updated to handle tuples)
class PrivacyRAG:
    def __init__(self, retriever: PrivacyRetriever, model_name='llama3:8b', max_tokens=4096):
        self.retriever = retriever
        self.model_name = model_name
        self.max_tokens = max_tokens
        self.tokenizer = LlamaTokenizer.from_pretrained("meta-llama/Llama-2-7b")
        self.system_prompt = """You are a data privacy expert. Using the provided context from data privacy laws and guidelines, answer the query accurately and concisely in formal legal language."""
    
    def _truncate_context(self, context: str, query: str) -> str:
        prompt_tokens = len(self.tokenizer.encode(self.system_prompt))
        query_tokens = len(self.tokenizer.encode(query))
        available_tokens = self.max_tokens - prompt_tokens - query_tokens - 100
        context_tokens = self.tokenizer.encode(context)
        if len(context_tokens) > available_tokens:
            context_tokens = context_tokens[:available_tokens]
            return self.tokenizer.decode(context_tokens)
        return context
    
    def generate_response(self, query: str) -> str:
        try:
            context_chunks_with_scores = self.retriever.retrieve(query)
            # Extract only the chunks (first element of each tuple)
            context_chunks = [chunk for chunk, _ in context_chunks_with_scores]
            context = "\n\n".join(context_chunks)
            context = self._truncate_context(context, query)
            full_prompt = f"{self.system_prompt}\n\nContext:\n{context}\n\nQuery:\n{query}"
            response = ollama.generate(model=self.model_name, prompt=full_prompt)
            return response['response']
        except Exception as e:
            raise RuntimeError(f"Generation failed: {e}")

In [12]:
# 4. RAGAS Evaluation with Ollama
def evaluate_rag(rag: PrivacyRAG, test_data: List[dict]) -> dict:
    """Evaluate RAG system using RAGAS with Ollama and local embeddings."""
    # Set up Ollama as the evaluation LLM
    ollama_llm = OllamaLLM(model="llama3:8b")
    
    # Set up local embeddings
    local_embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

    # Prepare dataset
    questions = [item['question'] for item in test_data]
    ground_truths = [item['ground_truth'] for item in test_data]
    contexts = []
    answers = []
    
    for q in questions:
        context_chunks = rag.retriever.retrieve(q)
        contexts.append(context_chunks)
        answers.append(rag.generate_response(q))
    
    dataset = {
        "question": questions,
        "answer": answers,
        "contexts": contexts,
        "ground_truth": ground_truths
    }
    
    # Evaluate with Ollama LLM and local embeddings
    result = evaluate(
        dataset=dataset,
        metrics=[faithfulness, answer_relevancy, context_precision],
        llm=ollama_llm,
        embeddings=local_embeddings  # Override default OpenAI embeddings
    )
    return result

In [None]:
# Example Usage
if __name__ == "__main__":
    processor = DataPrivacyProcessor()
    pdf_paths = ["./data-privacy-pdf/RBI-Guidelines.pdf"]
    chunks = processor.extract_text_from_pdfs(pdf_paths)
    embeddings = processor.vectorize_chunks(chunks)
    
    retriever = PrivacyRetriever(embeddings, chunks, processor.embedder)
    rag = PrivacyRAG(retriever)
    
    query = "How to plan an IS Audit as per the RBI guidelines"
    response = rag.generate_response(query)
    print("Response:", response)
    
'''    test_data = [
        {
            "question": "What are the penalties for data privacy violations in the updated IT act?",
            "ground_truth": "Under DPDPA 2023, penalties can include fines up to INR 2 lakhs per instance."
        }
    ]
    evaluation_results = evaluate_rag(rag, test_data)
    print("RAGAS Results:", evaluation_results)
    '''

Batches: 100%|██████████| 6/6 [00:01<00:00,  3.57it/s]


Response: Based on the provided context from data privacy laws and guidelines, here's how to plan an IS Audit as per the RBI guidelines:

**Step 1: Define the Audit Universe**
 Identify the IT resources that are in scope for the audit based on the risk assessment process.

**Step 2: Prepare the Audit Plan**
Document the audit plan in a formal document, approved by the Audit Committee initially and during any subsequent major changes. The plan should include:

* Internal Audit Subject (name of the audit subject)
* Nature of Audit (compliance with legal, regulatory or standards, performance metrics assessment or security configuration testing)
* Schedule (period of audit and its expected duration)
* Scoped Systems (identified IT resources that are in scope based on the risk assessment process)
* System Overview (details of system environment based on the risk assessment process)
* Audit Details (details of risks and controls identified, based on the risk assessment process)
* Nature and 

'    test_data = [\n        {\n            "question": "What are the penalties for data privacy violations in the updated IT act?",\n            "ground_truth": "Under DPDPA 2023, penalties can include fines up to INR 2 lakhs per instance."\n        }\n    ]\n    evaluation_results = evaluate_rag(rag, test_data)\n    print("RAGAS Results:", evaluation_results)\n    '