In [None]:
'''%pip install -U langchain-cohere
%pip install langchain-huggingface
%pip install datasets
%pip install transformers huggingface_hub sentence
%pip install -U sentence-transformers
%pip install sentencepiece
%pip install "transformers[sentencepiece]"

%pip install pdf2image pytesseract'''

In [None]:
import os
#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
#new token 13 jul 2025
# Set your Hugging Face token (keep it secret!)
os.environ["HUGGINGFACE_TOKEN"] = "HF_Mistral"

In [None]:
import logging
import os
from datetime import datetime

# Configure logging
log_dir = "data/logs"
os.makedirs(log_dir, exist_ok=True)  # Create logs directory if it doesn't exist
log_file = os.path.join(log_dir, f"hmr_generator_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(log_file),  # Write logs to file
        logging.StreamHandler()         # Print logs to console
    ]
)
logger = logging.getLogger(__name__)

In [None]:
import torch 

# Device setup
def setup_device():
    if torch.backends.cuda.is_built():
        if torch.cuda.is_available():
            device = torch.device("cuda")
            print("Device set to CUDA (NVIDIA GPU)")
        else:
            device = torch.device("cpu")
            print("CUDA is available but no GPU detected, falling back to CPU")
    else:
        device = torch.device("cpu")
        print("CUDA is not built, using CPU")

    return device
device = setup_device()

In [None]:
#Load Documents
from langchain.document_loaders import PyPDFLoader,PyPDFDirectoryLoader,DirectoryLoader,TextLoader

#dont use pdf part as it will have image,Please use only text pdf 
'''loader_pdf = DirectoryLoader("data/pdf/" , glob= '**/*.pdf' , show_progress=True ,loader_cls=PyPDFLoader)
loader_pdf
pdf = loader_pdf.load()
len(pdf)'''

loader_text = DirectoryLoader("data/text/" , glob= '**/*' , show_progress=True ,loader_cls=TextLoader)
text = loader_text.load()
print("Total numbers of Files",len(text))



In [None]:
from langchain.document_loaders import DirectoryLoader, PyPDFLoader


path="data/pdf"
glob="**/*.pdf"

loader = DirectoryLoader(
    path="data/pdf",            # Replace with your actual folder path
    glob="**/*.pdf",            # Recursive match for PDFs
    loader_cls=PyPDFLoader      # Use PyPDFLoader for each file
)

import os
from langchain.schema import Document
from pdf2image import convert_from_path
from langchain.document_loaders import PyPDFLoader
from PIL import Image
import pytesseract

folder_path ="data/pdf"
def hybrid_pdf_loader(doc):
    print(f"[INFO] Loading: {doc}")
    try:
        for doc in folder_path.glob("*.pdf"):
            loader = PyPDFLoader(doc)
            docs = loader.load()
            if all(doc.page_content.strip() == "" for doc in docs):
                print(f"[OCR Fallback] Using OCR for: {doc}")
                images = convert_from_path(doc)
                ocr_docs = []
                for i, image in enumerate(images):
                    text = pytesseract.image_to_string(image)
                    ocr_docs.append(Document(page_content=text, metadata={"page": i + 1, "source": doc}))
                return ocr_docs
            return docs
    except Exception as e:
        print(f"[ERROR] Failed loading {doc}: {e}")
        return []

# Load all PDFs from folder
def load_all_pdfs_from_folder(filepath):
    all_docs = []
    for filename in os.listdir(folder_path):
        if filename.lower().endswith(".pdf"):
            full_path = os.path.join(folder_path, filename)
            all_docs.extend(hybrid_pdf_loader(full_path))
    return all_docs

# Usage
pdf = load_all_pdfs_from_folder("data/pdf")  # pdf path

# Preview output
for i, doc in enumerate(pdf[:3]):
    print(f"\n[Document {i+1} Preview]:\n{doc.page_content[:300]}")

documents = pdf + text

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

# Step 3: Chunking Function
def split_documents(documents: list[Document], chunk_size=300, chunk_overlap=50):
    # Combine all page_content fields into one string
    full_text = "\n".join([doc.page_content for doc in documents if doc.page_content.strip()])

    if not full_text.strip():
        print("[WARNING] No content found in the documents.")
        return []

    # Wrap into a single Document object
    combined_doc = [Document(page_content=full_text)]

    # Initialize the text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=[
            "\nheader-rules\n",     # HMR block separators
            "\nelement-rules\n",
            "\nsip-manipulation\n",
            "\n\n",
            "\n",
            " ",
            "",
            "\nmime-sdp-rulesn",
             "\nmime-isup-rules\n",
             "\nmime-header-rules\n",
              "\nmime-rules\n",
              "\nisup-param-rules\n",
              "\nsdp-session-rules\n",
              "\nsdp-line-rules\n",
              "\nsdp-media-rules\n",

        ]
        #separators=["\n\n", "\n", " ",""]  # Prioritize bigger splits----old code, not used
    )

    # Split into chunks
    chunks = text_splitter.split_documents(combined_doc)
    print(f"[INFO] Total chunks generated: {len(chunks)}")

    # Preview first 3 chunks(just for check)
    for i, chunk in enumerate(chunks[:3]):
        print(f"\n--- Chunk {i+1} ---\n{chunk.page_content[:300]}\n")

    return chunks

# Step 4: Run the chunking
chunks = split_documents(documents)

logger.info(f"Processed {len(documents)} documents")

In [None]:
#Generate Vector Embeddings
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma


# Use a pre-trained sentence embedding model
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# Step 2: Ensure chunks are valid Document objects
# (Assuming you already have `chunks_val` from your splitter function)
print(f"[INFO] Chunks to embed: {len(chunks)}")


vector_db = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_model,
    persist_directory="./chroma_db"
)
vector_db.persist()


#for testing
'''def retrieve_relevant_documents(query, k=3):
    """
    Retrieve the top k most relevant document chunks based on the query.
    """
    docs = vector_db.similarity_search(query, k=k)
    
    retrieved_texts = [doc.page_content for doc in docs]
    return retrieved_texts

# Example usage:
query = "How to modify the SIP header in Oracle SBC?"
retrieved_docs = retrieve_relevant_documents(query)

print("Top Retrieved Documents:")
for idx, doc in enumerate(retrieved_docs):
    print(f"\nDocument {idx+1}:")
    print(doc)'''


#Perfect till abovesteps,Please refer chatgpt for next steps

In [None]:
# HMR Generation using Ollama
class HMRGenerator:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.setup_llm()
        self.setup_examples()
    
    def setup_llm(self):
        """Initialize Ollama LLM"""
        from langchain.llms import Ollama
        try:
            self.llm = Ollama(model="llama3.2")  # Make sure this model is installed
            logger.info("Ollama LLM initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize Ollama: {e}")
            self.llm = None
    
    def setup_examples(self):
        """Setup few-shot examples for HMR generation"""
        self.examples = [
            {
                "intent": "Add P-Asserted-Identity using From header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>",
                "hmr": """header-rules
  name                                    addPAI
  header-name                             p-asserted-identity
  action                                  add
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  new-value                               "<sip:"+$From.$From_er.$0+"@telco.com>\""""
            },
            {
                "intent": "Remove Diversion Header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nDiversion: <sip:olduser@domain.com>",
                "hmr": """header-rules
  name                                    removeDiversion
  header-name                             diversion
  action                                  remove
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE"""
            },
            {
                "intent": "Replace domain in From header to newdomain.com",
                "sip_msg": "From: <sip:user@olddomain.com>",
                "hmr": """header-rules
  name                                    replaceFromDomain
  header-name                             from
  action                                  replace
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  match-value                             "olddomain.com"
  new-value                               "newdomain.com\""""
            },
            {
        "intent": "Modify Content-Type in INVITE with SDP",
        "sip_msg": "",
        "hmr": """header-rules
name add_fmtp
header-name Content-Type
action manipulate
comparison-type case-sensitive
msg-type any
methods INVITE
match-value
new-value
element-rules
    name addf
    parameter-name application/sdp
    type mime
    action find-replace-all
    match-val-type any
    comparison-type pattern-rule
    match-value \Rm=audio.*()[[:1:]]
    new-value $CRLF+"a=fmtp:18 annexb=no"
        
"""
    }
        ]

    def create_prompt(self, intent: str, sip_msg: str, context: str = "") -> str:

            """Create a comprehensive prompt for HMR generation"""
            examples_text = "\n\n".join([
                f"Example {i+1}:\nIntent: {ex['intent']}\nSIP Message:\n{ex['sip_msg']}\nHMR:\n{ex['hmr']}"
                for i, ex in enumerate(self.examples)
            ])
            
            prompt = f"""You are an expert in Oracle Session Border Controller (SBC) Header Manipulation Rules (HMR).
    Your task is to generate accurate HMR CLI configuration based on the user's intent and SIP message.

    Context from knowledge base:
    {context}

    Examples:
    {examples_text}

    Now generate HMR for:
    Intent: {intent}
    SIP Message:
    {sip_msg}

    Generate ONLY the Oracle SBC HMR CLI configuration. Follow the exact format shown in examples.

    HMR:"""
            return prompt
    
    def retrieve_context(self, query: str, k: int = 5) -> str:
            """Retrieve relevant context from vector database"""
            if not self.vector_db:
                return ""
            
            try:
                docs = self.vector_db.similarity_search(query, k=k)
                context = "\n\n".join([doc.page_content for doc in docs])
                return context
            except Exception as e:
                logger.error(f"Error retrieving context: {e}")
                return ""
    
    def generate_hmr(self, intent: str, sip_msg: str) -> str:
            """Generate HMR configuration.intent : str - user intent or description of requirement
            sip_msg : str - SIP message content"""
            if not self.llm:
                return "Error: LLM not initialized. Please check Ollama installation."
            
            if not intent.strip():
                return "Error: Please provide an intent/requirement."
            
            try:
                # Retrieve relevant context
                query = f"{intent} {sip_msg}"
                context = self.retrieve_context(query)
                
                # Create prompt
                prompt = self.create_prompt(intent, sip_msg, context)
                
                # Generate response
                response = self.llm(prompt)
                
                # Clean up response
                if "HMR:" in response:
                    hmr_part = response.split("HMR:")[-1].strip()
                else:
                    hmr_part = response.strip()
                
                return hmr_part
                
            except Exception as e:
                logger.error(f"Error generating HMR: {e}")
                return f"Error generating HMR: {str(e)}"
        
hmr_generator = HMRGenerator(vector_db)


In [None]:
def generate_hmr_wrapper(intent: str, sip_msg: str) -> str:
        """Wrapper function for Gradio"""
        if not hmr_generator:
            return "Error: Application not properly initialized"
        hmr_val = hmr_generator.generate_hmr(intent, sip_msg)
        if not hmr_val:
            return "Error: No HMR generated. Please check your input."
        return hmr_val
        

In [None]:
import gradio as gr

with gr.Blocks(theme=gr.themes.Soft(primary_hue="cyan", secondary_hue="gray")) as demo:
    gr.Markdown("# Oracle SBC Header Manipulation Rules (HMR) Generator")
    gr.Markdown("Generate Oracle SBC HMR CLI configurations based on your requirements and SIP messages.")
    
    with gr.Column():
        intent_input = gr.Textbox(label="Intent / Requirement", lines=3, placeholder="e.g., Add P-Asserted-Identity using From header")
        with gr.Row():
            sip_msg_input = gr.Textbox(label="SIP Message", lines=15, placeholder="e.g., INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>")
            output = gr.Textbox(label="Oracle SBC HMR CLI Output", lines=15, interactive=False,show_copy_button=True)
        submit_btn = gr.Button("Generate HMR", variant="primary")

        # Bind the function with the llm object
        submit_btn.click(
            fn=generate_hmr_wrapper,
              # Pass llm explicitly
            inputs=[intent_input, sip_msg_input],
            outputs=output
        )
        
    demo.launch(share=True)

In [None]:
import os
#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
# Set your Hugging Face token (keep it secret!)
os.environ["HUGGINGFACE_TOKEN"] = "HF_Mistral"

In [None]:
# FT dependies
# %pip install peft accelerate bitsandbytes datasets transformers

Collecting peft
  Downloading peft-0.17.0-py3-none-any.whl.metadata (14 kB)
Downloading peft-0.17.0-py3-none-any.whl (503 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m503.9/503.9 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: peft
Successfully installed peft-0.17.0
Note: you may need to restart the kernel to use updated packages.


#RAG all above has good accuracy but below is FT LORA------Testing phase

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer,DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from datasets import Dataset
import json

# 1. Prepare your HMR training dataset
def create_hmr_dataset():
    """Create training dataset from your existing HMR examples"""
    training_data = []
    
    # Convert your examples to instruction-response format

    examples = [
            {
                "intent": "Add P-Asserted-Identity using From header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>",
                "hmr": """header-rules
  name                                    addPAI
  header-name                             p-asserted-identity
  action                                  add
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  new-value                               "<sip:"+$From.$From_er.$0+"@telco.com>\""""
            },
            {
                "intent": "Remove Diversion Header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nDiversion: <sip:olduser@domain.com>",
                "hmr": """header-rules
  name                                    removeDiversion
  header-name                             diversion
  action                                  remove
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE"""
            },
            {
                "intent": "Replace domain in From header to newdomain.com",
                "sip_msg": "From: <sip:user@olddomain.com>",
                "hmr": """header-rules
  name                                    replaceFromDomain
  header-name                             from
  action                                  replace
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  match-value                             "olddomain.com"
  new-value                               "newdomain.com\""""
            },
            {
        "intent": "Modify Content-Type in INVITE with SDP",
        "sip_msg": "",
        "hmr": """header-rules
name add_fmtp
header-name Content-Type
action manipulate
comparison-type case-sensitive
msg-type any
methods INVITE
match-value
new-value
element-rules
    name addf
    parameter-name application/sdp
    type mime
    action find-replace-all
    match-val-type any
    comparison-type pattern-rule
    match-value \Rm=audio.*()[[:1:]]
    new-value $CRLF+"a=fmtp:18 annexb=no"
        
"""
    }
        ]

    # Format for training
    for example in examples:
        prompt = f"""### Instruction:
{example['instruction']}

### Input:
{example['input']}

### Response:
{example['output']}"""
        training_data.append({"text": prompt})
    
    return Dataset.from_list(training_data)

# 2. Setup LoRA configuration
def setup_lora_config():
    return LoraConfig(
        r=16,                    # Rank of adaptation
        lora_alpha=32,           # LoRA scaling parameter
        target_modules=[         # Target modules for LoRA
            "q_proj", "k_proj", "v_proj", "o_proj",
            "gate_proj", "up_proj", "down_proj"
        ],
        lora_dropout=0.1,
        bias="none",
        task_type="CAUSAL_LM"
    )

# 3. Fine-tuning function
def fine_tune_llama_for_hmr():
    model_name = "meta-llama/Llama-3.2-3B-Instruct"  # Use smaller model for local training
    
    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        load_in_4bit=True  # Use 4-bit quantization
    )
    
    # Prepare model for LoRA
    model = prepare_model_for_kbit_training(model)
    lora_config = setup_lora_config()
    model = get_peft_model(model, lora_config)
    
    # Prepare dataset
    dataset = create_hmr_dataset()
    
    def tokenize_function(examples):
        return tokenizer(
            examples["text"],
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors="pt"
        )
    
    tokenized_dataset = dataset.map(tokenize_function, batched=True)
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir="./hmr-llama-lora",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4,
        warmup_steps=100,
        logging_steps=10,
        save_steps=500,
        evaluation_strategy="no",
        save_strategy="steps",
        load_best_model_at_end=False,
        report_to=None,  # Disable wandb
        remove_unused_columns=False,
    )
    
    # Data collator
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False
    )
    
    # Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        data_collator=data_collator,
    )
    
    # Start training
    trainer.train()
    trainer.save_model()
    
    return model, tokenizer

# Usage
# fine_tuned_model, tokenizer = fine_tune_llama_for_hmr()

### Option B: Full Fine-Tuning (Resource Intensive)
# For full fine-tuning with more computational resources
def full_fine_tune():
    # Similar setup but without LoRA
    # Requires significant GPU memory (24GB+ recommended)
    pass

## 2. Fine-Tuning the Retrieval System

### Improve Chunking Strategy

# Better chunking for HMR-specific content
def advanced_chunking(documents):
    """Improved chunking strategy for HMR documents"""
    
    # Custom splitter for HMR configuration blocks
    hmr_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,  # Increased for complete HMR blocks
        chunk_overlap=100,
        separators=[
            "\nheader-rules\n",     # HMR block separators
            "\nelement-rules\n",
            "\nsip-manipulation\n",
            "\n\n",
            "\n",
            " ",
            "",
            "\nmime-sdp-rulesn",
             "\nmime-isup-rules\n",
             "\nmime-header-rules\n",
              "\nmime-rules\n",
              "\nisup-param-rules\n",
              "\nsdp-session-rules\n",
              "\nsdp-line-rules\n",
              "\nsdp-media-rules\n",

        ]
    )
    
    return hmr_splitter.split_documents(documents)

# Enhanced metadata extraction
def extract_hmr_metadata(chunk_text):
    """Extract structured metadata from HMR chunks"""
    metadata = {}
    
    if "header-rules" in chunk_text:
        metadata["type"] = "header-rule"
    elif "element-rules" in chunk_text:
        metadata["type"] = "element-rule"
    elif "sip-manipulation" in chunk_text:
        metadata["type"] = "sip-manipulation"
    elif "mime-sdp-rules" in chunk_text:
        metadata["type"] = "mime-sdp-rule"
    elif "mime-isup-rules" in chunk_text:
        metadata["type"] = "mime-isup-rule"
    elif "mime-header-rules" in chunk_text:
        metadata["type"] = "mime-header-rule"
    elif "mime-rules" in chunk_text:
        metadata["type"] = "mime-rule"
    elif "isup-param-rules" in chunk_text:
        metadata["type"] = "isup-param-rule"
    elif "sdp-session-rules" in chunk_text:
        metadata["type"] = "sdp-session-rule"
    elif "sdp-line-rules" in chunk_text:
        metadata["type"] = "sdp-line-rule"
    elif "sdp-media-rules" in chunk_text:
        metadata["type"] = "sdp-media-rule"

    # Extract action types
    actions = ["add", "delete", "find-replace-all", "manipulate", "store","log","none","monitor","reject","sip-manip","store","delete-element","delete-header"]
    for action in actions:
        if action in chunk_text.lower():
            metadata["action"] = action
            break
    
    return metadata


### Optimize Embedding Model

# Option 1: Use domain-specific embedding model
def setup_better_embeddings():
    # Try telecommunications-specific models or train your own
    embedding_models = [
        "sentence-transformers/all-MiniLM-L6-v2",           # Current
        "sentence-transformers/all-mpnet-base-v2",          # Better quality
        "sentence-transformers/multi-qa-mpnet-base-dot-v1", # QA-optimized
        "BAAI/bge-small-en-v1.5"                           # Latest SOTA
    ]
    
    # Benchmark different models
    for model_name in embedding_models:
        embeddings = HuggingFaceEmbeddings(model_name=model_name)
        # Test retrieval accuracy with your queries
        
# Option 2: Fine-tune embedding model on your domain
def fine_tune_embeddings():
    """Fine-tune sentence transformer on HMR domain"""
    from sentence_transformers import SentenceTransformer, InputExample, losses
    from torch.utils.data import DataLoader
    
    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
    
    # Create training examples from your HMR data
    train_examples = []
    # Add positive pairs: intent <-> correct HMR
    # Add negative pairs: intent <-> incorrect HMR
    
    train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
    train_loss = losses.CosineSimilarityLoss(model)
    
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=4,
        warmup_steps=100,
        output_path="./hmr-tuned-embeddings"
    )

## 3. Prompt Engineering Optimization

### Enhanced Prompt Templates

def create_advanced_prompt_template():
    """More sophisticated prompt engineering"""
    
    system_prompt = """You are an Oracle SBC expert specializing in Header Manipulation Rules (HMR). 
    Your responses must be:
    1. Syntactically correct Oracle SBC CLI format
    2. Logically sound for the given SIP scenario
    3. Following Oracle SBC best practices
    4. Include proper error handling when applicable"""
    
    few_shot_template = """
### Context from Knowledge Base:
{context}

### Examples of Correct HMR Generation:
{examples}

### Current Task:
Intent: {intent}
SIP Message: {sip_msg}

### Analysis Steps:
1. Identify the SIP headers/elements that need modification
2. Determine the appropriate HMR action (add/remove/replace/manipulate)
3. Consider message type and method restrictions
4. Apply proper pattern matching and value substitution"""

### Generate Oracle SBC HMR Configuration:

### Dynamic Example Selection

def select_relevant_examples(intent, available_examples, k=3):
    """Dynamically select most relevant examples based on intent"""

    from sentence_transformers import SentenceTransformer
    from sklearn.metrics.pairwise import cosine_similarity

    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
    
    # Encode intent and example intents
    intent_embedding = model.encode([intent])
    example_embeddings = model.encode([ex["intent"] for ex in available_examples])
    
    # Calculate similarities
    similarities = cosine_similarity(intent_embedding, example_embeddings)[0]
    
    # Select top k most similar examples
    top_indices = similarities.argsort()[-k:][::-1]
    return [available_examples[i] for i in top_indices]


## 4. Retrieval System Optimization

### Hybrid Search Strategy

def hybrid_retrieval(query, vector_db, k=10):
    """Combine semantic and keyword-based search"""
    
    # Semantic search
    semantic_docs = vector_db.similarity_search(query, k=k//2)
    
    # Keyword search for exact matches
    keywords = extract_hmr_keywords(query)
    keyword_docs = vector_db.similarity_search(
        " ".join(keywords), 
        k=k//2,
        filter={"contains_keywords": True}
    )
    
    # Combine and deduplicate
    all_docs = semantic_docs + keyword_docs
    unique_docs = remove_duplicates(all_docs)
    
    return unique_docs[:k]

def extract_hmr_keywords(query):
    """Extract HMR-specific keywords"""
    hmr_keywords = [
        "header-rules", "element-rules", "sip-manipulation",
        "add", "remove", "replace", "manipulate", "store",
        "from", "to", "contact", "p-asserted-identity", "diversion"
    ]
    
    found_keywords = []
    query_lower = query.lower()
    for keyword in hmr_keywords:
        if keyword in query_lower:
            found_keywords.append(keyword)
    
    return found_keywords


### Re-ranking Retrieved Documents
def rerank_documents(query, retrieved_docs):
    """Re-rank documents based on HMR-specific criteria"""
    
    scores = []
    for doc in retrieved_docs:
        score = 0
        content = doc.page_content.lower()
        
        # Boost documents with complete HMR blocks
        if "header-rules" in content and "name" in content:
            score += 2.0
        
        # Boost documents with similar actions
        if any(action in query.lower() and action in content 
               for action in ["add", "remove", "replace", "modify"]):
            score += 1.5
        
        # Boost documents with similar headers
        headers = ["from", "to", "contact", "p-asserted-identity", "diversion"]
        if any(header in query.lower() and header in content 
               for header in headers):
            score += 1.0
        
        scores.append(score)
    
    # Sort by score
    ranked_docs = [doc for _, doc in sorted(zip(scores, retrieved_docs), reverse=True)]
    return ranked_docs


## 5. Training Data Enhancement
### Automated Data Augmentation
def augment_training_data():
    """Generate more training examples from existing data"""
    
    # Extract patterns from existing HMR files
    hmr_patterns = extract_hmr_patterns("data/text/")
    
    # Generate variations
    augmented_examples = []
    
    for pattern in hmr_patterns:
        # Vary header names
        headers = ["from", "to", "contact", "p-asserted-identity", "diversion"]
        for header in headers:
            if header != pattern["original_header"]:
                new_example = create_variation(pattern, new_header=header)
                augmented_examples.append(new_example)
        
        # Vary actions
        actions = ["add", "remove", "replace", "manipulate"]
        for action in actions:
            if action != pattern["original_action"]:
                new_example = create_variation(pattern, new_action=action)
                augmented_examples.append(new_example)
    
    return augmented_examples

def extract_hmr_patterns(data_dir):
    """Extract reusable patterns from existing HMR files"""
    patterns = []
    
    for file_path in Path(data_dir).glob("*.txt"):
        content = file_path.read_text()
        
        # Parse HMR blocks
        if "header-rules" in content:
            pattern = parse_hmr_block(content)
            patterns.append(pattern)
    
    return patterns


### Validation Dataset Creation

def create_validation_dataset():
    """Create test cases for evaluating HMR generation quality"""
    
    validation_cases = [
        {
            "intent": "Block anonymous calls",
            "sip_msg": "From: \"Anonymous\" <sip:anonymous@anonymous.invalid>",
            "expected_hmr_elements": ["comparison-type", "match-value", "anonymous"]
        },
        {
            "intent": "Add emergency routing",
            "sip_msg": "INVITE sip:911@emergency.com SIP/2.0",
            "expected_hmr_elements": ["911", "emergency", "add"]
        }
    ]
    
    return validation_cases

## 6. Model Performance Optimization

### Quantization for Faster Inference

def setup_quantized_model():
    """Setup 4-bit quantized model for faster inference"""
    from transformers import BitsAndBytesConfig
    
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True
    )
    
    model = AutoModelForCausalLM.from_pretrained(
        "meta-llama/Llama-3.2-3B-Instruct",
        quantization_config=quantization_config,
        device_map="auto"
    )
    
    return model


### Inference Optimization

def optimize_generation_params():
    """Optimized generation parameters for HMR"""
    
    generation_config = {
        "max_new_tokens": 300,
        "temperature": 0.1,        # Low temperature for consistent output
        "top_p": 0.9,
        "top_k": 50,
        "repetition_penalty": 1.1,
        "do_sample": True,
        "pad_token_id": tokenizer.eos_token_id,
        "eos_token_id": tokenizer.eos_token_id
    }
    
    return generation_config


## 7. RAG System Fine-Tuning

### Improved Context Selection

def context_aware_retrieval(intent, sip_msg, vector_db):
    """Enhanced retrieval with context awareness"""
    
    # Multi-query retrieval
    queries = [
        intent,
        f"SIP {intent}",
        f"Oracle SBC {intent}",
        extract_sip_method(sip_msg) + " " + intent,
        extract_headers(sip_msg) + " manipulation"
    ]
    
    all_docs = []
    for query in queries:
        docs = vector_db.similarity_search(query, k=3)
        all_docs.extend(docs)
    
    # Remove duplicates and re-rank
    unique_docs = remove_duplicates(all_docs)
    reranked_docs = rerank_documents(intent, unique_docs)
    
    return reranked_docs[:5]  # Top 5 most relevant

def extract_sip_method(sip_msg):
    """Extract SIP method from message"""
    methods = ["INVITE", "REGISTER", "BYE", "CANCEL", "ACK", "OPTIONS"]
    for method in methods:
        if method in sip_msg:
            return method
    return "INVITE"  # Default

def extract_headers(sip_msg):
    """Extract header names from SIP message"""
    headers = []
    lines = sip_msg.split('\n')
    for line in lines:
        if ':' in line:
            header = line.split(':')[0].strip().lower()
            headers.append(header)
    return " ".join(headers)


## 8. Evaluation and Metrics

### HMR Quality Assessment

def evaluate_hmr_quality(generated_hmr, expected_elements):
    """Evaluate generated HMR quality"""
    
    metrics = {
        "syntax_score": check_syntax_correctness(generated_hmr),
        "completeness_score": check_completeness(generated_hmr, expected_elements),
        "oracle_compliance": check_oracle_format(generated_hmr),
        "logical_coherence": check_logical_flow(generated_hmr)
    }
    
    return metrics

def check_syntax_correctness(hmr):
    """Check if HMR follows Oracle SBC syntax"""
    required_elements = ["name", "header-name", "action", "comparison-type", "msg-type"]
    score = 0
    
    for element in required_elements:
        if element in hmr:
            score += 0.2
    
    return score

def automated_testing_suite():
    """Automated testing of generated HMRs"""
    
    test_cases = load_validation_dataset()
    results = []
    
    for case in test_cases:
        generated = hmr_generator.generate_hmr(case["intent"], case["sip_msg"])
        metrics = evaluate_hmr_quality(generated, case["expected_elements"])
        
        results.append({
            "case": case["intent"],
            "metrics": metrics,
            "generated_hmr": generated
        })
    
    return results


## 9. Continuous Improvement Pipeline

### Feedback Loop Implementation

def implement_feedback_loop():
    """Collect and incorporate user feedback"""
    
    # Add to Gradio interface
    def gradio_with_feedback():
        with gr.Blocks() as demo:
            # ... existing interface ...
            
            with gr.Row():
                rating = gr.Slider(1, 5, label="Rate HMR Quality")
                feedback = gr.Textbox(label="Feedback/Corrections")
                submit_feedback = gr.Button("Submit Feedback")
            
            def collect_feedback(intent, sip_msg, generated_hmr, rating, feedback):
                # Store feedback for retraining
                feedback_data = {
                    "timestamp": datetime.now().isoformat(),
                    "intent": intent,
                    "sip_msg": sip_msg,
                    "generated_hmr": generated_hmr,
                    "rating": rating,
                    "feedback": feedback
                }
                
                # Save to feedback database
                save_feedback(feedback_data)
                return "Feedback submitted successfully!"
            
            submit_feedback.click(
                collect_feedback,
                inputs=[intent_input, sip_msg_input, output, rating, feedback],
                outputs=gr.Textbox(label="Status")
            )


### Model Update Pipeline

def periodic_model_update():
    """Periodically retrain with new feedback data"""
    
    # 1. Collect feedback data
    feedback_data = load_feedback_database()
    
    # 2. Filter high-quality corrections
    good_corrections = [f for f in feedback_data if f["rating"] >= 4]
    
    # 3. Augment training dataset
    new_training_data = convert_feedback_to_training_data(good_corrections)
    
    # 4. Retrain with LoRA
    retrain_model_with_new_data(new_training_data)
    
    # 5. Evaluate improvements
    performance_metrics = evaluate_model_performance()
    
    return performance_metrics


## 10. Advanced Features

### Multi-Modal Input Support

def support_sip_trace_files():
    """Support SIP trace file uploads"""
    
    def parse_sip_trace(file_content):
        """Parse Wireshark/tcpdump SIP traces"""
        # Extract SIP messages from trace files
        # Identify problematic scenarios
        # Suggest appropriate HMRs
        pass

def intelligent_hmr_chaining():
    """Generate multiple related HMRs for complex scenarios"""
    
    def generate_hmr_sequence(complex_intent):
        """Break down complex intents into HMR sequence"""
        # Analyze dependencies between HMR rules
        # Generate ordered sequence of HMRs
        # Validate rule interactions
        pass


## Implementation Priority

### Phase 1: Quick Wins (1-2 weeks)
1. Fix the missing `generate_hmr` method
2. Optimize chunking strategy for HMR blocks
3. Improve prompt engineering with better examples
4. Add input validation and error handling

### Phase 2: Performance Optimization (2-4 weeks)
1. Implement LoRA fine-tuning on your HMR dataset
2. Upgrade to better embedding models
3. Add evaluation metrics and automated testing
4. Implement feedback collection system

### Phase 3: Advanced Features (1-2 months)
1. Full model fine-tuning if resources allow
2. Multi-modal input support
3. HMR validation and testing integration
4. Continuous learning pipeline

## Getting Started

To begin fine-tuning immediately:

1. **Fix Current Issues**: Start with the missing method implementation
2. **Data Preparation**: Convert your 124 text files into structured training format
3. **LoRA Setup**: Implement the LoRA fine-tuning pipeline above
4. **Evaluation**: Create validation metrics for HMR quality
5. **Iterate**: Use feedback to continuously improve the system

Would you like me to help implement any specific part of this fine-tuning strategy?

In [16]:
import requests
import json
import os
from pathlib import Path
import gradio as gr
from datetime import datetime
import numpy as np
from typing import List, Dict, Any

# Langchain imports for RAG
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.document_loaders import TextLoader
from langchain.schema import Document
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

class OllamaHMRRAG:
    def __init__(self, model_name="llama3.2:3b", ollama_host="http://localhost:11434"):
        self.model_name = model_name
        self.ollama_host = ollama_host
        self.vector_db = None
        self.embeddings = None
        
    def check_ollama_connection(self):
        """Check if Ollama is running and model is available"""
        try:
            response = requests.get(f"{self.ollama_host}/api/tags")
            if response.status_code == 200:
                models = response.json().get('models', [])
                available_models = [model['name'] for model in models]
                
                if self.model_name not in available_models:
                    print(f"Model {self.model_name} not found. Available models: {available_models}")
                    print(f"To download the model, run: ollama pull {self.model_name}")
                    return False
                    
                print(f"✓ Connected to Ollama. Using model: {self.model_name}")
                return True
            else:
                print("✗ Cannot connect to Ollama. Make sure it's running.")
                return False
                
        except requests.exceptions.ConnectionError:
            print("✗ Ollama not running. Start it with: ollama serve")
            return False

    def generate_with_ollama(self, prompt: str, temperature: float = 0.1) -> str:
        """Generate response using Ollama API"""
        payload = {
            "model": self.model_name,
            "prompt": prompt,
            "temperature": temperature,
            "stream": False,
            "options": {
                "num_predict": 500,
                "top_p": 0.9,
                "repeat_penalty": 1.1
            }
        }
        
        try:
            response = requests.post(f"{self.ollama_host}/api/generate", json=payload)
            response.raise_for_status()
            return response.json()['response']
            
        except requests.exceptions.RequestException as e:
            return f"Error calling Ollama: {str(e)}"

    def create_training_examples(self) -> List[Dict]:
        """Create training examples for fine-tuning context"""
        return [
            {
                "intent": "Add P-Asserted-Identity using From header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>",
                "hmr": """header-rules
  name                                    addPAI
  header-name                             p-asserted-identity
  action                                  add
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  new-value                               "<sip:"+$From.$From_er.$0+"@telco.com>\""""
            },
            {
                "intent": "Remove Diversion Header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nDiversion: <sip:olduser@domain.com>",
                "hmr": """header-rules
  name                                    removeDiversion
  header-name                             diversion
  action                                  remove
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE"""
            },
            {
                "intent": "Replace domain in From header to newdomain.com",
                "sip_msg": "From: <sip:user@olddomain.com>",
                "hmr": """header-rules
  name                                    replaceFromDomain
  header-name                             from
  action                                  replace
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  match-value                             "olddomain.com"
  new-value                               "newdomain.com\""""
            },
            {
                "intent": "Add Contact header with custom domain",
                "sip_msg": "REGISTER sip:example.com SIP/2.0\nFrom: <sip:user@example.com>",
                "hmr": """header-rules
  name                                    addContact
  header-name                             contact
  action                                  add
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 REGISTER
  new-value                               "<sip:user@newdomain.com>\""""
            },
            {
                "intent": "Modify Content-Type for SDP manipulation",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nContent-Type: application/sdp",
                "hmr": """header-rules
  name                                    modify_sdp
  header-name                             Content-Type
  action                                  manipulate
  comparison-type                         case-sensitive
  msg-type                                request
  methods                                 INVITE
  element-rules
    name                                  add_fmtp
    parameter-name                        application/sdp
    type                                  mime
    action                                find-replace-all
    match-val-type                        any
    comparison-type                       pattern-rule
    match-value                           m=audio.*
    new-value                             $0+$CRLF+"a=fmtp:18 annexb=no\""""
            }
        ]

    def advanced_chunking(self, documents: List[Document]) -> List[Document]:
        """Advanced chunking strategy for HMR documents"""
        hmr_splitter = RecursiveCharacterTextSplitter(
            chunk_size=600,
            chunk_overlap=150,
            separators=[
                "\nheader-rules\n",
                "\nelement-rules\n", 
                "\nsip-manipulation\n",
                "\nmime-sdp-rules\n",
                "\nmime-isup-rules\n",
                "\nmime-header-rules\n",
                "\nmime-rules\n",
                "\nisup-param-rules\n",
                "\nsdp-session-rules\n",
                "\nsdp-line-rules\n",
                "\nsdp-media-rules\n",
                "\n\n",
                "\n",
                " ",
                ""
            ]
        )
        
        return hmr_splitter.split_documents(documents)

    def extract_hmr_metadata(self, chunk_text: str) -> Dict[str, str]:
        """Extract metadata from HMR chunks"""
        metadata = {}
        
        # Rule type detection
        rule_types = {
            "header-rules": "header-rule",
            "element-rules": "element-rule", 
            "sip-manipulation": "sip-manipulation",
            "mime-sdp-rules": "mime-sdp-rule",
            "mime-isup-rules": "mime-isup-rule",
            "mime-header-rules": "mime-header-rule",
            "mime-rules": "mime-rule",
            "isup-param-rules": "isup-param-rule",
            "sdp-session-rules": "sdp-session-rule",
            "sdp-line-rules": "sdp-line-rule",
            "sdp-media-rules": "sdp-media-rule"
        }
        
        for rule_pattern, rule_type in rule_types.items():
            if rule_pattern in chunk_text:
                metadata["rule_type"] = rule_type
                break
        
        # Action detection
        actions = ["add", "delete", "find-replace-all", "manipulate", "store", 
                  "log", "none", "monitor", "reject", "sip-manip", "remove",
                  "delete-element", "delete-header", "replace"]
        
        for action in actions:
            if f"action {' ' * 10}{action}" in chunk_text or f"action{' ' * 20}{action}" in chunk_text:
                metadata["action"] = action
                break
        
        # Header detection
        headers = ["from", "to", "contact", "p-asserted-identity", "diversion", 
                  "content-type", "via", "route", "record-route"]
        for header in headers:
            if f"header-name {' ' * 10}{header}" in chunk_text.lower():
                metadata["header"] = header
                break
        
        return metadata

    def setup_vector_database(self, data_dir: str = "data/text/") -> FAISS:
        """Setup vector database from HMR documents"""
        print("🔍 Setting up vector database...")
        
        # Load documents
        documents = []
        data_path = Path(data_dir)
        
        if data_path.exists():
            for file_path in data_path.glob("*.txt"):
                try:
                    loader = TextLoader(str(file_path), encoding='utf-8')
                    docs = loader.load()
                    documents.extend(docs)
                    print(f"✓ Loaded {file_path.name}")
                except Exception as e:
                    print(f"✗ Error loading {file_path}: {e}")
        else:
            print(f"Directory {data_dir} not found. Creating sample documents...")
        
        # Add training examples as documents
        training_examples = self.create_training_examples()
        for example in training_examples:
            doc_content = f"Intent: {example['intent']}\nSIP Message: {example['sip_msg']}\nHMR:\n{example['hmr']}"
            documents.append(Document(page_content=doc_content, metadata={"source": "training_example"}))
        
        if not documents:
            print("No documents found. Creating minimal sample data...")
            sample_docs = self.create_sample_documents()
            documents.extend(sample_docs)
        
        # Advanced chunking
        chunks = self.advanced_chunking(documents)
        
        # Add metadata
        for chunk in chunks:
            hmr_metadata = self.extract_hmr_metadata(chunk.page_content)
            chunk.metadata.update(hmr_metadata)
        
        # Create embeddings
        print("🔗 Creating embeddings...")
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={'device': 'cpu'}
        )
        
        # Create vector database
        self.vector_db = FAISS.from_documents(chunks, self.embeddings)
        
        print(f"✅ Vector database created with {len(chunks)} chunks")
        return self.vector_db

    def create_sample_documents(self) -> List[Document]:
        """Create sample documents for testing"""
        sample_data = [
            """header-rules
  name                                    addFromDomain  
  header-name                             from
  action                                  add
  comparison-type                         pattern-rule
  msg-type                                request
  methods                                 INVITE
  new-value                               "domain.com\"""",
            
            """header-rules
  name                                    removePAI
  header-name                             p-asserted-identity  
  action                                  remove
  comparison-type                         case-sensitive
  msg-type                                request
  methods                                 INVITE""",
            
            """element-rules
  name                                    modifySDPCodec
  type                                    mime
  action                                  find-replace-all
  match-val-type                          any
  comparison-type                         pattern-rule
  match-value                             "m=audio.*"
  new-value                               "m=audio 5004 RTP/AVP 0 18\""""
        ]
        
        return [Document(page_content=content, metadata={"source": "sample"}) for content in sample_data]

    def extract_hmr_keywords(self, query: str) -> List[str]:
        """Extract HMR-specific keywords"""
        hmr_keywords = [
            "header-rules", "element-rules", "sip-manipulation",
            "add", "remove", "replace", "manipulate", "store", "delete",
            "from", "to", "contact", "p-asserted-identity", "diversion", "via",
            "invite", "register", "bye", "cancel", "ack", "options",
            "content-type", "route", "record-route"
        ]
        
        found_keywords = []
        query_lower = query.lower()
        for keyword in hmr_keywords:
            if keyword in query_lower:
                found_keywords.append(keyword)
        
        return found_keywords

    def hybrid_retrieval(self, query: str, k: int = 8) -> List[Document]:
        """Enhanced retrieval combining semantic and keyword search"""
        if not self.vector_db:
            raise ValueError("Vector database not initialized. Call setup_vector_database() first.")
        
        # Semantic search
        semantic_docs = self.vector_db.similarity_search(query, k=k//2)
        
        # Keyword-enhanced search
        keywords = self.extract_hmr_keywords(query)
        if keywords:
            keyword_query = " ".join(keywords)
            keyword_docs = self.vector_db.similarity_search(keyword_query, k=k//2)
        else:
            keyword_docs = []
        
        # Combine and remove duplicates
        all_docs = semantic_docs + keyword_docs
        unique_docs = self.remove_duplicates(all_docs)
        
        # Re-rank documents
        reranked_docs = self.rerank_documents(query, unique_docs)
        
        return reranked_docs[:k]

    def remove_duplicates(self, docs: List[Document]) -> List[Document]:
        """Remove duplicate documents"""
        seen = set()
        unique_docs = []
        
        for doc in docs:
            content_hash = hash(doc.page_content)
            if content_hash not in seen:
                seen.add(content_hash)
                unique_docs.append(doc)
        
        return unique_docs

    def rerank_documents(self, query: str, docs: List[Document]) -> List[Document]:
        """Re-rank documents based on HMR-specific criteria"""
        if not docs:
            return docs
            
        scores = []
        query_lower = query.lower()
        
        for doc in docs:
            score = 1.0  # Base score
            content = doc.page_content.lower()
            
            # Boost complete HMR blocks
            if "header-rules" in content and "name" in content:
                score += 3.0
            elif "element-rules" in content and "name" in content:
                score += 2.5
            
            # Boost similar actions
            actions = ["add", "remove", "replace", "modify", "manipulate", "delete"]
            for action in actions:
                if action in query_lower and action in content:
                    score += 2.0
                    break
            
            # Boost similar headers
            headers = ["from", "to", "contact", "p-asserted-identity", "diversion", "via", "content-type"]
            for header in headers:
                if header in query_lower and header in content:
                    score += 1.5
                    break
            
            # Boost SIP methods
            methods = ["invite", "register", "bye", "cancel", "options"]
            for method in methods:
                if method in query_lower and method in content:
                    score += 1.0
                    break
            
            scores.append(score)
        
        # Sort by score (descending)
        scored_docs = list(zip(scores, docs))
        scored_docs.sort(key=lambda x: x[0], reverse=True)
        
        return [doc for score, doc in scored_docs]

    def create_hmr_prompt(self, intent: str, sip_msg: str, context_docs: List[Document]) -> str:
        """Create optimized prompt for HMR generation"""
        
        # Prepare context from retrieved documents
        context_examples = []
        for i, doc in enumerate(context_docs[:3]):  # Use top 3 most relevant
            context_examples.append(f"Example {i+1}:\n{doc.page_content}\n")
        
        context_str = "\n".join(context_examples) if context_examples else "No specific examples found."
        
        # Get training examples for few-shot learning
        training_examples = self.create_training_examples()
        few_shot_examples = []
        
        # Select most relevant training examples
        for example in training_examples[:2]:  # Use 2 best examples
            few_shot_examples.append(f"""
Intent: {example['intent']}
SIP Message: {example['sip_msg']}
Generated HMR:
{example['hmr']}
""")
        
        few_shot_str = "\n---\n".join(few_shot_examples)
        
        prompt = f"""You are an Oracle Session Border Controller (SBC) expert specializing in Header Manipulation Rules (HMR).

Your task is to generate syntactically correct Oracle SBC HMR configuration based on the provided intent and SIP message.

CONTEXT FROM KNOWLEDGE BASE:
{context_str}

EXAMPLES OF CORRECT HMR GENERATION:
{few_shot_str}

REQUIREMENTS:
1. Generate valid Oracle SBC CLI format
2. Use appropriate rule types (header-rules, element-rules, etc.)
3. Include all required parameters (name, action, comparison-type, msg-type, etc.)
4. Follow Oracle SBC best practices
5. Handle the specific SIP scenario correctly

CURRENT TASK:
Intent: {intent}
SIP Message: {sip_msg if sip_msg else "Not provided"}

ANALYSIS:
1. Identify the SIP headers/elements that need modification
2. Determine the appropriate HMR action (add/remove/replace/manipulate)
3. Consider message type and method restrictions
4. Apply proper pattern matching and value substitution

Generate the Oracle SBC HMR configuration:"""

        return prompt

    def generate_hmr(self, intent: str, sip_msg: str = "") -> str:
        """Generate HMR using Ollama with RAG"""
        if not self.check_ollama_connection():
            return "Error: Cannot connect to Ollama. Please ensure it's running and the model is available."
        
        try:
            # Retrieve relevant context
            query = f"{intent} {sip_msg}".strip()
            relevant_docs = self.hybrid_retrieval(query, k=5)
            
            # Create optimized prompt
            prompt = self.create_hmr_prompt(intent, sip_msg, relevant_docs)
            
            # Generate with Ollama
            response = self.generate_with_ollama(prompt, temperature=0.1)
            
            # Clean up the response
            cleaned_response = self.clean_hmr_response(response)
            
            return cleaned_response
            
        except Exception as e:
            return f"Error generating HMR: {str(e)}"

    def clean_hmr_response(self, response: str) -> str:
        """Clean and format the HMR response"""
        # Remove any leading/trailing whitespace
        response = response.strip()
        
        # If the response contains the prompt echo, remove it
        if "Generate the Oracle SBC HMR configuration:" in response:
            response = response.split("Generate the Oracle SBC HMR configuration:")[-1].strip()
        
        # Remove any explanatory text after the HMR block
        lines = response.split('\n')
        hmr_lines = []
        in_hmr_block = False
        
        for line in lines:
            # Start of HMR block
            if any(rule_type in line for rule_type in ['header-rules', 'element-rules', 'sip-manipulation']):
                in_hmr_block = True
                hmr_lines.append(line)
            # Continue HMR block
            elif in_hmr_block and (line.strip().startswith(' ') or line.strip() == '' or 
                                 any(param in line for param in ['name', 'action', 'header-name', 'comparison-type', 'msg-type', 'methods', 'match-value', 'new-value'])):
                hmr_lines.append(line)
            # End of HMR block
            elif in_hmr_block and line.strip() and not line.strip().startswith(' '):
                break
            # Before HMR block
            elif not in_hmr_block:
                continue
        
        return '\n'.join(hmr_lines) if hmr_lines else response

    def evaluate_hmr_quality(self, generated_hmr: str, intent: str = "") -> Dict[str, float]:
        """Evaluate generated HMR quality"""
        metrics = {
            "syntax_score": self.check_syntax_correctness(generated_hmr),
            "completeness_score": self.check_completeness(generated_hmr),
            "oracle_compliance": self.check_oracle_format(generated_hmr),
            "intent_alignment": self.check_intent_alignment(generated_hmr, intent)
        }
        
        # Overall score
        metrics["overall_score"] = sum(metrics.values()) / len(metrics)
        
        return metrics

    def check_syntax_correctness(self, hmr: str) -> float:
        """Check if HMR follows Oracle SBC syntax"""
        required_elements = ["name", "action", "comparison-type", "msg-type"]
        optional_elements = ["header-name", "methods", "match-value", "new-value"]
        
        score = 0
        hmr_lower = hmr.lower()
        
        # Check required elements
        for element in required_elements:
            if element in hmr_lower:
                score += 0.2
        
        # Check for proper indentation (Oracle SBC uses spaces)
        lines = hmr.split('\n')
        proper_indentation = sum(1 for line in lines[1:] if line.startswith('  ') or line.strip() == '')
        if proper_indentation > 0:
            score += 0.1
        
        # Check for proper rule block start
        if any(rule_type in hmr_lower for rule_type in ['header-rules', 'element-rules']):
            score += 0.1
        
        return min(score, 1.0)

    def check_completeness(self, hmr: str) -> float:
        """Check completeness of HMR configuration"""
        score = 0
        hmr_lower = hmr.lower()
        
        # Check for rule name
        if "name" in hmr_lower:
            score += 0.3
        
        # Check for action
        actions = ["add", "remove", "replace", "manipulate", "delete", "store"]
        if any(action in hmr_lower for action in actions):
            score += 0.3
        
        # Check for target (header-name or similar)
        if any(target in hmr_lower for target in ["header-name", "parameter-name"]):
            score += 0.2
        
        # Check for message context
        if "msg-type" in hmr_lower:
            score += 0.2
        
        return min(score, 1.0)

    def check_oracle_format(self, hmr: str) -> float:
        """Check Oracle SBC format compliance"""
        score = 0
        
        # Check for proper rule block declaration
        if any(rule_type in hmr for rule_type in ['header-rules', 'element-rules', 'sip-manipulation']):
            score += 0.4
        
        # Check for proper parameter spacing (Oracle uses specific spacing)
        lines = hmr.split('\n')
        proper_format_lines = 0
        for line in lines[1:]:  # Skip first line (rule declaration)
            if line.strip() and ('  ' in line[:20] or line.strip().startswith('name') or line.strip().startswith('action')):
                proper_format_lines += 1
        
        if proper_format_lines > 0:
            score += 0.4
        
        # Check for reasonable rule name (no special characters, reasonable length)
        if "name" in hmr.lower():
            score += 0.2
        
        return min(score, 1.0)

    def check_intent_alignment(self, hmr: str, intent: str) -> float:
        """Check if HMR aligns with the stated intent"""
        if not intent:
            return 0.5  # Neutral score if no intent provided
        
        score = 0
        intent_lower = intent.lower()
        hmr_lower = hmr.lower()
        
        # Check action alignment
        if "add" in intent_lower and "add" in hmr_lower:
            score += 0.3
        elif "remove" in intent_lower and ("remove" in hmr_lower or "delete" in hmr_lower):
            score += 0.3
        elif "replace" in intent_lower and "replace" in hmr_lower:
            score += 0.3
        elif "modify" in intent_lower and ("manipulate" in hmr_lower or "find-replace" in hmr_lower):
            score += 0.3
        
        # Check header alignment
        headers = ["from", "to", "contact", "p-asserted-identity", "diversion", "via", "content-type"]
        for header in headers:
            if header in intent_lower and header in hmr_lower:
                score += 0.4
                break
        
        # Check method alignment
        methods = ["invite", "register", "bye", "cancel", "options","message","notify","prack","subscribe","publish","refer","update"]
        for method in methods:
            if method in intent_lower and method in hmr_lower:
                score += 0.3
                break
        
        return min(score, 1.0)

    def create_gradio_interface(self):
        """Create enhanced Gradio interface"""
        def generate_and_evaluate(intent, sip_msg):
            if not intent.strip():
                return "Please provide an intent.", "", ""
            
            try:
                # Generate HMR
                hmr = self.generate_hmr(intent, sip_msg)
                
                # Evaluate quality
                metrics = self.evaluate_hmr_quality(hmr, intent)
                
                # Format metrics
                metrics_str = f"""📊 Quality Metrics:
• Overall Score: {metrics['overall_score']:.2f}/1.0
• Syntax Score: {metrics['syntax_score']:.2f}/1.0
• Completeness: {metrics['completeness_score']:.2f}/1.0  
• Oracle Compliance: {metrics['oracle_compliance']:.2f}/1.0
• Intent Alignment: {metrics['intent_alignment']:.2f}/1.0

💡 Score Guide:
• 0.8+ : Excellent
• 0.6-0.8 : Good  
• 0.4-0.6 : Fair
• <0.4 : Needs improvement"""
                
                # Add recommendations
                recommendations = []
                if metrics['syntax_score'] < 0.6:
                    recommendations.append("• Check parameter names and indentation")
                if metrics['completeness_score'] < 0.6:
                    recommendations.append("• Verify all required parameters are present")
                if metrics['oracle_compliance'] < 0.6:
                    recommendations.append("• Review Oracle SBC format requirements")
                if metrics['intent_alignment'] < 0.6:
                    recommendations.append("• Ensure HMR matches the stated intent")
                
                if recommendations:
                    recommendations_str = "\n🔧 Recommendations:\n" + "\n".join(recommendations)
                else:
                    recommendations_str = "\n✅ HMR looks good!"
                
                return hmr, metrics_str, recommendations_str
                
            except Exception as e:
                return f"Error: {str(e)}", "Error in evaluation", ""
                '''
        
        '''def load_example(example_name):
            examples = {
                "Add P-Asserted-Identity": (
                    "Add P-Asserted-Identity header using From header value",
                    "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>"
                ),
                "Remove Diversion": (
                    "Remove Diversion header from INVITE requests", 
                    "INVITE sip:bob@oracle.com SIP/2.0\nDiversion: <sip:olduser@domain.com>"
                ),
                "Replace Domain": (
                    "Replace domain in From header to newdomain.com",
                    "INVITE sip:user@example.com SIP/2.0\nFrom: <sip:user@olddomain.com>"
                ),
                "Add Contact": (
                    "Add Contact header for REGISTER requests",
                    "REGISTER sip:example.com SIP/2.0\nFrom: <sip:user@example.com>"
                )
            }
            
            if example_name in examples:
                return examples[example_name][0], examples[example_name][1]
            return "", ""
        '''
        
        with gr.Blocks(title="Oracle SBC HMR Generator") as interface:
            gr.Markdown("""
            # 🔧 Oracle SBC Header Manipulation Rule Generator
            
            Generate Oracle Session Border Controller Header Manipulation Rules using AI and RAG technology.
            Powered by Ollama and optimized for Oracle SBC configurations.
            """, elem_classes=["main-header"])
            
            with gr.Row():
                with gr.Column(scale=1):
                    gr.Markdown("Input Configuration")
                    
                    intent_input = gr.Textbox(
                        label="Intent",
                        placeholder="Describe what you want the HMR to do...\nExample: Add P-Asserted-Identity header using From header",
                        lines=3,
                        info="Clearly describe the desired HMR functionality"
                    )
                    
                    sip_msg_input = gr.Textbox(
                        label="SIP Message (Optional)",
                        placeholder="Provide relevant SIP message content...\nExample: INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>",
                        lines=6,
                        info="Include relevant SIP headers and message content"
                    )
                    
                    with gr.Row():
                        generate_btn = gr.Button("🚀 Generate HMR", variant="primary", size="lg")
                        
                    
                    '''gr.Markdown("### 📚 Quick Examples")
                    example_dropdown = gr.Dropdown(
                        choices=["Add P-Asserted-Identity", "Remove Diversion", "Replace Domain", "Add Contact"],
                        label="Load Example",
                        info="Select a pre-defined example to get started"
                    )'''
                    
                    '''# Model Status
                    with gr.Accordion("🔧 System Status", open=False):
                        status_text = gr.Textbox(
                            value="Click 'Check Status' to verify Ollama connection",
                            label="Ollama Status",
                            interactive=False
                        )
                        check_status_btn = gr.Button("Check Status")'''

                with gr.Column(scale=1):
                    gr.Markdown("### 🎯 Generated HMR Configuration")
                    
                    hmr_output = gr.Code(
                        label="Generated HMR",
                        lines=20,
                        show_label=True
                    )
                    
                    with gr.Row():
                        copy_btn = gr.Button("📋 Copy to Clipboard", size="sm")
                        save_btn = gr.Button("💾 Save HMR", size="sm")
                    
                    '''metrics_output = gr.Textbox(
                        label="📊 Quality Assessment",
                        lines=12,
                        elem_classes=["metrics-output"],
                        interactive=False
                    )
                    
                    recommendations_output = gr.Textbox(
                        label="💡 Recommendations",
                        lines=5,
                        interactive=False
                    )'''

            # Event handlers
            generate_btn.click(
                generate_and_evaluate,
                inputs=[intent_input, sip_msg_input],
                outputs=[hmr_output, metrics_output, recommendations_output]
            )
            
            example_dropdown.change(
                load_example,
                inputs=[example_dropdown],
                outputs=[intent_input, sip_msg_input]
            )
            
            '''def check_system_status():
                if self.check_ollama_connection():
                    return "✅ Ollama connected successfully! Model ready for use."
                else:
                    return "❌ Cannot connect to Ollama. Please ensure it's running and model is available."
            
            check_status_btn.click(
                check_system_status,
                outputs=[status_text]
            )
            
            # Add footer information
            gr.Markdown("""
            ---
            ### 📖 How to Use:
            1. **Describe your intent** - What should the HMR accomplish?
            2. **Provide SIP context** - Include relevant SIP message parts (optional but helpful)
            3. **Generate HMR** - Click the generate button to create the configuration
            4. **Review quality** - Check the metrics and recommendations
            5. **Copy & Deploy** - Use the generated HMR in your Oracle SBC
            
            ### 🔧 Prerequisites:
            - Ollama must be running (`ollama serve`)
            - Llama 3.2 model must be available (`ollama pull llama3.2:3b`)
            
            ### 💡 Tips:
            - Be specific in your intent description
            - Include relevant SIP headers in the message field
            - Use the quality metrics to validate the generated HMR
            """)
        '''
        return interface

    def save_hmr_to_file(self, hmr_content: str, filename: str = None) -> str:
        """Save generated HMR to file"""
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"generated_hmr_{timestamp}.txt"
        
        try:
            output_dir = Path("generated_hmrs")
            output_dir.mkdir(exist_ok=True)
            
            file_path = output_dir / filename
            file_path.write_text(hmr_content)
            
            return f"HMR saved to: {file_path}"
        except Exception as e:
            return f"Error saving file: {str(e)}"

    def fine_tune_with_ollama(self, training_data_path: str = None):
        """
        Fine-tune Ollama model with custom HMR data
        Note: This requires creating a Modelfile and using ollama create
        """
        print("🔧 Setting up Ollama fine-tuning...")
        
        # Create training examples in Ollama format
        training_examples = self.create_training_examples()
        
        # Generate Modelfile content
        modelfile_content = f"""FROM {self.model_name}

# Set parameters for HMR generation
PARAMETER temperature 0.1
PARAMETER top_p 0.9
PARAMETER repeat_penalty 1.1

# System message for HMR generation
SYSTEM \"\"\"You are an Oracle Session Border Controller (SBC) expert specializing in Header Manipulation Rules (HMR). 
Generate syntactically correct Oracle SBC CLI format configurations based on user intents and SIP message context.
Always follow Oracle SBC best practices and proper parameter formatting.\"\"\"

"""
        
        # Add training examples as few-shot prompts
        for i, example in enumerate(training_examples):
            modelfile_content += f"""
# Example {i+1}
TEMPLATE \"\"\"### Instruction:
Generate Oracle SBC Header Manipulation Rule based on the intent and SIP message

### Input:
Intent: {example['intent']}
SIP Message: {example['sip_msg']}

### Response:
{example['hmr']}
\"\"\"

"""
        
        # Save Modelfile
        modelfile_path = Path("Modelfile_HMR")
        modelfile_path.write_text(modelfile_content)
        
        print(f"✅ Modelfile created: {modelfile_path}")
        print("To create the custom model, run:")
        print(f"ollama create hmr-specialist -f {modelfile_path}")
        
        return str(modelfile_path)

    def benchmark_model_performance(self) -> Dict[str, Any]:
        """Benchmark the model performance on test cases"""
        test_cases = [
            {
                "intent": "Add P-Asserted-Identity from From header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>",
                "expected_elements": ["p-asserted-identity", "add", "from"]
            },
            {
                "intent": "Remove Diversion header",
                "sip_msg": "INVITE sip:bob@oracle.com SIP/2.0\nDiversion: <sip:olduser@domain.com>",
                "expected_elements": ["diversion", "remove"]
            },
            {
                "intent": "Replace domain in Contact header",
                "sip_msg": "REGISTER sip:example.com SIP/2.0\nContact: <sip:user@olddomain.com>",
                "expected_elements": ["contact", "replace", "domain"]
            }
        ]
        
        results = []
        total_score = 0
        
        print("🧪 Running performance benchmark...")
        
        for i, test_case in enumerate(test_cases):
            print(f"Test {i+1}/{len(test_cases)}: {test_case['intent']}")
            
            # Generate HMR
            generated_hmr = self.generate_hmr(test_case['intent'], test_case['sip_msg'])
            
            # Evaluate
            metrics = self.evaluate_hmr_quality(generated_hmr, test_case['intent'])
            
            # Check for expected elements
            expected_found = sum(1 for element in test_case['expected_elements'] 
                               if element.lower() in generated_hmr.lower())
            expected_score = expected_found / len(test_case['expected_elements'])
            
            test_result = {
                "test_case": test_case['intent'],
                "generated_hmr": generated_hmr,
                "metrics": metrics,
                "expected_score": expected_score,
                "overall_score": (metrics['overall_score'] + expected_score) / 2
            }
            
            results.append(test_result)
            total_score += test_result['overall_score']
        
        benchmark_summary = {
            "individual_results": results,
            "average_score": total_score / len(test_cases),
            "total_tests": len(test_cases),
            "timestamp": datetime.now().isoformat()
        }
        
        print(f"📊 Benchmark completed. Average score: {benchmark_summary['average_score']:.2f}")
        
        return benchmark_summary


def main():
    """Main function to run the HMR RAG system"""
    print("🚀 Initializing Oracle SBC HMR Generator with Ollama...")
    
    # Initialize the system
    hmr_system = OllamaHMRRAG(model_name="llama3.2:3b")
    
    # Check Ollama connection
    if not hmr_system.check_ollama_connection():
        print("❌ Cannot proceed without Ollama connection.")
        print("Please ensure:")
        print("1. Ollama is running: ollama serve")
        print("2. Model is available: ollama pull llama3.2:3b")
        return
    
    # Setup vector database
    print("\n📚 Setting up knowledge base...")
    try:
        hmr_system.setup_vector_database("data/text/")
        print("✅ Knowledge base ready!")
    except Exception as e:
        print(f"⚠️  Warning: Knowledge base setup failed: {e}")
        print("Continuing with basic examples...")
        hmr_system.setup_vector_database()
    
    # Test generation
    print("\n🧪 Testing HMR generation...")
    test_intent = "Add P-Asserted-Identity header using From header"
    test_sip = "INVITE sip:bob@oracle.com SIP/2.0\nFrom: <sip:alice@telco.com>"
    
    try:
        result = hmr_system.generate_hmr(test_intent, test_sip)
        print("Generated HMR:")
        print("-" * 50)
        print(result)
        print("-" * 50)
        
        # Evaluate quality
        metrics = hmr_system.evaluate_hmr_quality(result, test_intent)
        print(f"\nQuality Score: {metrics['overall_score']:.2f}/1.0")
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
    
    # Launch Gradio interface
    print("\n🌐 Launching web interface...")
    try:
        interface = hmr_system.create_gradio_interface()
        interface.launch()
    except Exception as e:
        print(f"❌ Failed to launch interface: {e}")


# Additional utility functions
def create_modelfile_for_hmr():
    """Create a specialized Modelfile for HMR generation"""
    hmr_system = OllamaHMRRAG()
    modelfile_path = hmr_system.fine_tune_with_ollama()
    print(f"Modelfile created at: {modelfile_path}")


def run_benchmark():
    """Run performance benchmark"""
    hmr_system = OllamaHMRRAG()
    hmr_system.setup_vector_database()
    results = hmr_system.benchmark_model_performance()
    
    # Save results
    import json
    with open("benchmark_results.json", "w") as f:
        json.dump(results, f, indent=2)
    
    print("Benchmark results saved to benchmark_results.json")


if __name__ == "__main__":
    # You can run different modes:
    # main()                    # Full system with GUI
    # create_modelfile_for_hmr() # Create Ollama Modelfile
    # run_benchmark()           # Run performance tests
    
    main()

IndentationError: unindent does not match any outer indentation level (<string>, line 795)