In [None]:
!pip install chromadb



In [None]:
import json
import chromadb
import logging
from pydantic import BaseModel, Field
from typing import Optional, List
import uuid
from enum import Enum

In [None]:
class DocType(str, Enum):
    SUMMARY = "summary"
    PRECEDENT = "precedent"
    DEFINITION = "definition"
    LEGAL_TEXT = "legal_text"

In [None]:
import os
import shutil

# Delete old DB if exists
if os.path.exists('./chroma_db'):
    shutil.rmtree('./chroma_db')
    print(f"Deleted old Chroma DB at ./chroma_db")


In [None]:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
logger = logging.getLogger(__name__)
collection = chroma_client.get_or_create_collection(name="compliance_rules")

class ChunkModel(BaseModel):
    # REMOVE THE __init__ METHOD COMPLETELY
    # Just define the fields as class variables like this:

    id: str = Field(
        default_factory=lambda: f"chunk_{uuid.uuid4().hex[:8]}",
        description="Auto-generated unique identifier for the chunk"
    )
    content: str = Field(..., description="The main text content of the chunk")
    regulation: str = Field(..., description="Regulation this chunk pertains to")
    jurisdiction: str = Field(..., description="Geographic jurisdiction")
    doc_type: DocType = Field(..., description="Type of document")
    keywords: List[str] = Field(default_factory=list, description="List of keywords for retrieval")
    source: Optional[str] = Field(None, description="Original source of the content")

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "content": self.content,
            "regulation": self.regulation,
            "jurisdiction": self.jurisdiction,
            "doc_type": self.doc_type.value,
            "keywords": self.keywords,
            "source": self.source
        }

    def generateFullText(self) -> str:
        lines = [
            f"REGULATION: {self.regulation}",
            f"JURISDICTION: {self.jurisdiction}",
            f"DOCUMENT TYPE: {self.doc_type.value}",
            f"CONTENT: {self.content}"
        ]
        if self.keywords:
            keywords_str = ", ".join(self.keywords)
            lines.append(f"KEYWORDS: {keywords_str}")
        if self.source:
            lines.append(f"SOURCE: {self.source}")
        lines.append(f"ID: {self.id}")
        return ". ".join(lines)
    @classmethod
    def from_json_dict(cls, json_data: dict) -> "ChunkModel":
        # Use existing ID from JSON or generate new one
        chunk_id = json_data.get('id', f"chunk_{uuid.uuid4().hex[:8]}")
        return cls(
            id=chunk_id,
            content=json_data['content'],
            regulation=json_data['regulation'],
            jurisdiction=json_data['jurisdiction'],
            doc_type=DocType(json_data['doc_type']),
            keywords=json_data.get('keywords', []),
            source=json_data.get('source')
        )

In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingGenerator:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.model_name = model_name
    def generate_embedding(self, chunk: ChunkModel):
      return self.model.encode(chunk.generateFullText())
    def generate_embedding_query(self, query: str):
      return self.model.encode(query)


In [None]:
import json
from typing import List, Dict, Any
from pydantic import BaseModel
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RegulationParser:
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def parse(self, file_path: str) -> List[ChunkModel]:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)

        documents = data.get("documents", [])
        chunks: List[ChunkModel] = []

        for doc in documents:
            # Split content
            for idx, content_chunk in enumerate(self.splitter.split_text(doc["content"])):
                chunk_data = {
                    "id": f"{doc['id']}_content_{idx}",
                    "content": content_chunk,
                    "regulation": doc["regulation"],
                    "jurisdiction": doc["jurisdiction"],
                    "doc_type": doc["doc_type"],
                    "keywords": doc.get("trigger_keywords", []),
                    "source": doc["id"]
                }
                chunks.append(ChunkModel.from_json_dict(chunk_data))

            # Split key_obligations
            for i, obligation in enumerate(doc.get("key_obligations", [])):
                for j, chunk_text in enumerate(self.splitter.split_text(obligation)):
                    chunk_data = {
                        "content": chunk_text,
                        "regulation": doc["regulation"],
                        "jurisdiction": doc["jurisdiction"],
                        "doc_type": doc["doc_type"],
                        "keywords": doc.get("trigger_keywords", []),
                        "source": doc["id"]
                    }
                    chunks.append(ChunkModel.from_json_dict(chunk_data))

        return chunks


In [None]:
class ChunkManager:
    def __init__(self, db_path: str = "./chroma_db", collection_name: str = "regulations"):
        self.client = chromadb.PersistentClient(path=db_path)
        self.collection = self.client.get_or_create_collection(name=collection_name)

    def add_chunks(self, chunks: List[ChunkModel]):
        self.collection.upsert(
            documents=[c.content for c in chunks],
            ids=[c.id for c in chunks],
            embeddings=[EmbeddingGenerator().generate_embedding(c) for c in chunks],
            metadatas=[{
                "regulation": c.regulation,
                "jurisdiction": c.jurisdiction,
                "doc_type": str(c.doc_type),
                "keywords": ", ".join(c.keywords) if c.keywords else "",
                "source": c.source
            } for c in chunks]
        )

    def query_chunks(self, query_embedding: List[float], k: int = 5):
        try:
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=k,
                include=["documents", "metadatas", "distances", "embeddings"]
            )
            if not results["metadatas"] or not results["metadatas"][0]:
                return {"results": []}

            num_results = len(results["metadatas"][0])

            return {
                "results": [
                    {
                        "metadata": results["metadatas"][0][i],
                        "document": results["documents"][0][i],
                        "distance": results["distances"][0][i],
                        "embedding": results["embeddings"][0][i].tolist() if results["embeddings"] else None
                    }
                    for i in range(num_results)
                ]
            }

        except Exception as e:
            print(f"Error querying chunks: {e}")
            return {"results": []}



    def get_all_chunks(self) -> List[ChunkModel]:
      """Retrieve all chunks from the collection"""
      results = self.collection.get(
          include=["documents", "metadatas", "ids", "embeddings"]
      )
      all_chunks = []

      for doc, meta, cid, embedding in zip(results['documents'], results['metadatas'], results['ids'], results["embeddings"]):
          chunk_data = {
              "id": cid,
              "content": doc,
              **meta,  # spread metadata fields directly
              "embedding": embedding.tolist() if embedding is not None else None
          }
          all_chunks.append(ChunkModel.from_json_dict(chunk_data))

      return all_chunks


In [None]:
import numpy as np

def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
    """Compute cosine similarity between two vectors."""
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

In [None]:
from typing import List, Optional
import numpy as np

class ChunkComparator:

    def __init__(self, chunk_manager: ChunkManager):
        self.chunk_manager = chunk_manager
        self.embedding_generator = EmbeddingGenerator()

    def query_by_text(self, query_text: str, top_k: int = 5, metadata_filter: Optional[dict] = None) -> List[ChunkModel]:
        query_embedding = self.embedding_generator.model.encode(query_text)
        return self.query_by_embedding(query_embedding, top_k=top_k, metadata_filter=metadata_filter)


    def query_by_embedding(self, query_embedding: List[float], top_k: int = 5, metadata_filter: Optional[dict] = None) -> List[ChunkModel]:
        all_chunks = self.chunk_manager.get_all_chunks()
        if metadata_filter:
            for key, value in metadata_filter.items():
                all_chunks = [c for c in all_chunks if getattr(c, key) == value]

        # Compute cosine similarity for each chunk
        similarities = []
        query_vec = np.array(query_embedding)
        for chunk in all_chunks:
            chunk_vec = np.array(chunk.embedding)  # assumes embedding stored in chunkModel
            sim = cosine_similarity(query_vec, chunk_vec)
            similarities.append((sim, chunk))

        # Sort by similarity descending and return top_k
        similarities.sort(key=lambda x: x[0], reverse=True)
        top_chunks = [chunk for _, chunk in similarities[:top_k]]
        return top_chunks


In [None]:
class RAGEngine:
  def __init__(self, embedding_generator: EmbeddingGenerator, chromaVectorStore: ChunkManager, chunkGenerator: RegulationParser) -> None:
    self.embedding_generator = EmbeddingGenerator()
    self.chromaVectorStore = ChunkManager()
    self.chunkGenerator = RegulationParser()

  def initialize_database(self, filepath: str):
    chunks = self.chunkGenerator.parse(filepath)
    self.chromaVectorStore.add_chunks(chunks)
    print("Chunks added to database...")


  def query_with_context(self, feature_description: str) -> str:
    """
    Simple RAG query that retrieves relevant compliance context and formats a prompt for LLM.
    """
    query_embedding = self.embedding_generator.generate_embedding_query(feature_description)
    query_result = self.chromaVectorStore.query_chunks(query_embedding, k=5)
    relevant_chunks = query_result["results"]
    context = self._build_context(relevant_chunks)
    prompt = f"""
    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    {feature_description}
    RELEVANT COMPLIANCE CONTEXT:
    {context}
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear reasoning based on the context.
    3. Which specific regulations apply, if any?

    Format your response as:
    Requires Geo Logic: [Yes/No/Maybe]
    Reasoning: [Your reasoning here]
    Related Regulations: [Comma-separated list or None]
    """
    return prompt

  def _build_context(self, chunks: list) -> str:
    context_lines = []
    #print(type(chunks))
    for i, chunk in enumerate(chunks):
      #print(chunk)
      #print(f"DEBUG: chunk {i} type = {type(chunk)}")
      #print(f"DEBUG: chunk {i} value = {chunk}")
      context_lines.append(f"--- Chunk {i+1} ---")
      context_lines.append(chunk['document'])
      return "\n".join(context_lines)

In [None]:
!pip install transformers accelerate sentencepiece huggingface_hub --quiet

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from huggingface_hub import login

# --- 1. Hugging Face login ---

login("Create a hugging Face account")

# --- 2. Load LLaMA 3.2 1B Instruct ---
model_name = "meta-llama/Llama-3.2-1B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    use_auth_token=True,
    device_map="auto",  # or "cpu" if you don't have GPU
)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:

def main():
    """Main function to run the Compliance RAG System."""

    # Initialize components
    print("Initializing RAG System Components...")

    embedding_generator = EmbeddingGenerator()
    chroma_vector_store = ChunkManager()
    regulation_parser = RegulationParser()

    # Create RAG engine
    rag_engine = RAGEngine(
        embedding_generator=embedding_generator,
        chromaVectorStore=chroma_vector_store,
        chunkGenerator=regulation_parser
    )

    # Initialize database with compliance knowledge
    print("Loading compliance knowledge base...")
    rag_engine.initialize_database("./sample_data/compliance_knowledge_base.json")
    print("Database initialized successfully!")

    # Test queries
    test_queries = [
        "We need to add a one-click report button for illegal videos",
        "Add autoplay feature to video feed for all users",
        "Create age verification system for new user signups",
        "Implement content download blocking feature",
        "Add parental controls for video viewing"
    ]

    print("\n" + "="*50)
    print("RUNNING COMPLIANCE CHECKS...")
    print("="*50)

    for i, query in enumerate(test_queries, 1):
        print(f"\n{i}. Testing: '{query}'")

        # Generate the prompt with context
        prompt = rag_engine.query_with_context(query)

        # In a real system, you'd send this to an LLM API
        # For now, we'll just print the prompt structure
        print(prompt)
        print(f"   Generated prompt length: {len(prompt)} characters")
        print(f"   Context retrieved: {prompt.count('Chunk')} relevant chunks")

        # Simulate LLM response (you'll replace this with actual LLM call)
        # print("   → Would send to LLM for analysis")

        # Prepare input for LLaMA ---
        llama_input = f"{prompt}\nAnswer:"

        # Tokenize
        inputs = tokenizer(llama_input, return_tensors="pt").to(model.device)

        # Generate output ---
        outputs = model.generate(
            **inputs,
            max_new_tokens=200,
            temperature=0.7,
            top_p=0.9,
            do_sample=True
        )

        # Decode output ---
        answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
        print("=== LLaMA Output ===")
        print(answer)


    print("\n" + "="*50)
    print("SYSTEM READY FOR PRODUCTION USE!")
    print("="*50)

    # Interactive mode example
    print("\nTo use interactively, you would:")
    print("1. Call rag_engine.query_with_context('your feature description')")
    print("2. Send the resulting prompt to your LLM (GPT-4, Claude, etc.)")
    print("3. Parse the LLM response for compliance decisions")



if __name__ == "__main__":
    main()

Initializing RAG System Components...
Loading compliance knowledge base...


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Chunks added to database...
Database initialized successfully!

RUNNING COMPLIANCE CHECKS...

1. Testing: 'We need to add a one-click report button for illegal videos'

    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    We need to add a one-click report button for illegal videos
    RELEVANT COMPLIANCE CONTEXT:
    --- Chunk 1 ---
Feature: User content reporting flow. Requirement: The feature must include a one-click reporting mechanism for users to flag illegal content. The feature must also tag reports coming from trusted flagger organizations and prioritize them for review. Technical Implementation: A dedicated backend service 'eu-content-moderation' handles all reports from EU member states. Logic exists to validate the user's geo-location and apply the correct UI strings and processing rules.
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


=== LLaMA Output ===

    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    We need to add a one-click report button for illegal videos
    RELEVANT COMPLIANCE CONTEXT:
    --- Chunk 1 ---
Feature: User content reporting flow. Requirement: The feature must include a one-click reporting mechanism for users to flag illegal content. The feature must also tag reports coming from trusted flagger organizations and prioritize them for review. Technical Implementation: A dedicated backend service 'eu-content-moderation' handles all reports from EU member states. Logic exists to validate the user's geo-location and apply the correct UI strings and processing rules.
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear reasoning based on the context.
    3. Which specific regulations apply, if any?

    Format your response as:
    Requires Geo Logic: [Yes/No/May

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


=== LLaMA Output ===

    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    Add autoplay feature to video feed for all users
    RELEVANT COMPLIANCE CONTEXT:
    --- Chunk 1 ---
Feature: Autoplay for video feed. Requirement: The autoplay feature must be disabled by default for any user identified as a minor or located in California. An age gate or parental consent check must be presented to enable it. Technical Implementation: The client-side app checks a user-age-preference service. If the user's age is under 18 or not verified, and their IP is in California, autoplay is disabled at the firmware level.
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear reasoning based on the context.
    3. Which specific regulations apply, if any?

    Format your response as:
    Requires Geo Logic: [Yes/No/Maybe]
    Reasoning: [Your reasoning here]
    Related R

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


=== LLaMA Output ===

    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    Create age verification system for new user signups
    RELEVANT COMPLIANCE CONTEXT:
    --- Chunk 1 ---
Age Verification: Platforms must use independent, third-party services to verify age.
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear reasoning based on the context.
    3. Which specific regulations apply, if any?

    Format your response as:
    Requires Geo Logic: [Yes/No/Maybe]
    Reasoning: [Your reasoning here]
    Related Regulations: [Comma-separated list or None]
    
Answer: 
    Requires Geo Logic: Yes
    Reasoning: Platforms must use independent, third-party services to verify age. Geo-specific compliance logic is required to ensure age verification is accurate and reliable across different regions, and to prevent age fraud. Independent services help to a

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


=== LLaMA Output ===

    Analyze this feature description for geo-compliance requirements.
    FEATURE DESCRIPTION:
    Implement content download blocking feature
    RELEVANT COMPLIANCE CONTEXT:
    --- Chunk 1 ---
Feature: User content reporting flow. Requirement: The feature must include a one-click reporting mechanism for users to flag illegal content. The feature must also tag reports coming from trusted flagger organizations and prioritize them for review. Technical Implementation: A dedicated backend service 'eu-content-moderation' handles all reports from EU member states. Logic exists to validate the user's geo-location and apply the correct UI strings and processing rules.
    Answer these questions:
    1. Does this feature require geo-specific compliance logic? (Yes/No/Maybe)
    2. Why or why not? Provide clear reasoning based on the context.
    3. Which specific regulations apply, if any?

    Format your response as:
    Requires Geo Logic: [Yes/No/Maybe]
    Reasonin