In [None]:
import os
import json
import uuid
import numpy as np
import torch
import faiss
from dotenv import load_dotenv
from typing import Optional
from rich import print
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from cerebras.cloud.sdk import Cerebras

# Load environment variables
load_dotenv()

# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# AgenticChunker class (from agentic_chunker.py with minor optimizations)
class AgenticChunker:
    def __init__(self, cerebras_api_key: Optional[str] = None):
        self.chunks = {}
        self.id_truncate_limit = 5
        self.generate_new_metadata_ind = True
        self.print_logging = True

        if cerebras_api_key is None:
            cerebras_api_key = os.getenv("CEREBRAS_API_KEY")
        if cerebras_api_key is None:
            raise ValueError("CEREBRAS_API_KEY not provided or found in environment variables")

        self.client = Cerebras(api_key=cerebras_api_key)
        self.model = "llama-4-scout-17b-16e-instruct"

    def _llm_invoke(self, prompt: str) -> str:
        try:
            response = self.client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model=self.model,
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            print(f"[Error] LLM invocation failed: {e}")
            return ""

    def add_propositions(self, propositions: list):
        for proposition in propositions:
            self.add_proposition(proposition)

    def add_proposition(self, proposition: str):
        if self.print_logging:
            print(f"\nAdding: '{proposition}'")
        if not self.chunks:
            if self.print_logging:
                print("No chunks, creating a new one")
            self._create_new_chunk(proposition)
            return

        chunk_id = self._find_relevant_chunk(proposition)
        if chunk_id:
            if self.print_logging:
                print(f"Chunk Found ({self.chunks[chunk_id]['chunk_id']}), adding to: {self.chunks[chunk_id]['title']}")
            self.add_proposition_to_chunk(chunk_id, proposition)
        else:
            if self.print_logging:
                print("No chunks found")
            self._create_new_chunk(proposition)

    def add_proposition_to_chunk(self, chunk_id: str, proposition: str):
        self.chunks[chunk_id]['propositions'].append(proposition)
        if self.generate_new_metadata_ind:
            self.chunks[chunk_id]['summary'] = self._update_chunk_summary(self.chunks[chunk_id])
            self.chunks[chunk_id]['title'] = self._update_chunk_title(self.chunks[chunk_id])

    def _update_chunk_summary(self, chunk: dict) -> str:
        prompt = (
            "You are the steward of a group of chunks representing groups of sentences on a similar topic.\n"
            "A new proposition was added to a chunk. Generate a brief 1-sentence summary for the chunk.\n"
            f"Chunk's propositions:\n" + "\n".join(chunk['propositions']) +
            f"\nCurrent chunk summary:\n{chunk['summary']}"
        )
        return self._llm_invoke(prompt)

    def _update_chunk_title(self, chunk: dict) -> str:
        prompt = (
            "You are the steward of a group of chunks representing groups of sentences on a similar topic.\n"
            "A new proposition was added to a chunk. Generate a brief updated chunk title.\n"
            f"Chunk's propositions:\n" + "\n".join(chunk['propositions']) +
            f"\nChunk summary:\n{chunk['summary']}\n"
            f"Current chunk title:\n{chunk['title']}"
        )
        return self._llm_invoke(prompt)

    def _get_new_chunk_summary(self, proposition: str) -> str:
        prompt = (
            "You are the steward of a group of chunks representing groups of sentences on a similar topic.\n"
            "Generate a brief 1-sentence summary for a new chunk based on this proposition:\n" + proposition
        )
        return self._llm_invoke(prompt)

    def _get_new_chunk_title(self, summary: str) -> str:
        prompt = (
            "You are the steward of a group of chunks representing groups of sentences on a similar topic.\n"
            "Generate a brief few-word chunk title for this summary:\n" + summary
        )
        return self._llm_invoke(prompt)

    def _create_new_chunk(self, proposition: str):
        new_chunk_id = str(uuid.uuid4())[:self.id_truncate_limit]
        new_chunk_summary = self._get_new_chunk_summary(proposition)
        new_chunk_title = self._get_new_chunk_title(new_chunk_summary)
        self.chunks[new_chunk_id] = {
            'chunk_id': new_chunk_id,
            'propositions': [proposition],
            'title': new_chunk_title,
            'summary': new_chunk_summary,
            'chunk_index': len(self.chunks)
        }
        if self.print_logging:
            print(f"Created new chunk ({new_chunk_id}): {new_chunk_title}")

    def get_chunk_outline(self) -> str:
        chunk_outline = ""
        for chunk_id, chunk in self.chunks.items():
            single_chunk_string = f"""Chunk ({chunk['chunk_id']}): {chunk['title']}\nSummary: {chunk['summary']}\n\n"""
            chunk_outline += single_chunk_string
        return chunk_outline

    def _find_relevant_chunk(self, proposition: str) -> Optional[str]:
        current_chunk_outline = self.get_chunk_outline()
        prompt = (
            "Determine if the 'Proposition' should belong to any existing chunks.\n"
            "If it should join a chunk, return the chunk id.\n"
            "If not, return 'No chunks'.\n"
            f"Current Chunks:\n--Start of current chunks--\n{current_chunk_outline}\n--End of current chunks--\n"
            f"Proposition:\n{proposition}"
        )
        chunk_found = self._llm_invoke(prompt).strip()
        if len(chunk_found) == self.id_truncate_limit and chunk_found in self.chunks:
            return chunk_found
        return None

    def get_chunks(self, get_type: str = 'dict') -> list:
        if get_type == 'dict':
            return self.chunks
        if get_type == 'list_of_strings':
            return [" ".join(chunk['propositions']) for chunk in self.chunks.values()]

    def pretty_print_chunks(self):
        print(f"\nYou have {len(self.chunks)} chunks\n")
        for chunk_id, chunk in self.chunks.items():
            print(f"Chunk #{chunk['chunk_index']}")
            print(f"Chunk ID: {chunk_id}")
            print(f"Summary: {chunk['summary']}")
            print(f"Propositions:")
            for prop in chunk['propositions']:
                print(f"    - {prop}")
            print("\n")

    def pretty_print_chunk_outline(self):
        print("Chunk Outline\n")
        print(self.get_chunk_outline())

# Proposition extraction (simplified from Retriver.ipynb)
def extract_propositions(text: str, chunker: AgenticChunker) -> list:
    prompt = (
        "You are an expert at extracting key propositions from text.\n"
        "Break down the following text into a list of concise, standalone propositions.\n"
        "Each proposition should be a complete sentence capturing a single idea.\n"
        "Text:\n" + text
    )
    response = chunker._llm_invoke(prompt)
    propositions = [line.strip() for line in response.split("\n") if line.strip()]
    return propositions

# Agentic chunking with metadata
def perform_agentic_chunking_with_metadata(documents_json: list) -> list:
    ac = AgenticChunker()
    chunked_docs = []

    for doc in documents_json:
        text = doc.get("text", "")
        title = doc.get("title", "Unknown Title")
        url = doc.get("url", "Unknown URL")

        try:
            propositions = extract_propositions(text, ac)
            ac.add_propositions(propositions)

            agentic_chunks = ac.get_chunks(get_type="list_of_strings")
            for chunk_text in agentic_chunks:
                chunked_docs.append(Document(
                    page_content=chunk_text,
                    metadata={
                        "title": title,
                        "url": url,
                        "source": "agentic"
                    }
                ))

            ac = AgenticChunker()  # Reset after each document

        except Exception as e:
            print(f"[Warning] Skipped document: {title}, error: {e}")
            continue

    return chunked_docs

# Index construction
def build_index(documents: list):
    all_chunks = [doc.page_content for doc in documents]
    metadata = [doc.metadata for doc in documents]

    # Sparse (BM25)
    tokenized_corpus = [chunk.split(" ") for chunk in all_chunks]
    bm25 = BM25Okapi(tokenized_corpus)

    # Dense Embeddings
    embedding_model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device)
    dense_embeddings = embedding_model.encode(all_chunks, convert_to_numpy=True, normalize_embeddings=True)
    dim = dense_embeddings.shape[1]

    # FAISS (HNSW)
    index = faiss.IndexHNSWFlat(dim, 32)
    index.hnsw.efConstruction = 40
    faiss.normalize_L2(dense_embeddings)
    index.add(dense_embeddings)

    # Chroma
    chroma_db = Chroma.from_texts(
        texts=all_chunks,
        embedding=HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2'),
        metadatas=metadata,
        persist_directory="./chroma_agentic"
    )

    return bm25, index, dense_embeddings, all_chunks, metadata, chroma_db

# Hybrid retrieval
def retrieve_context(query: str, bm25, faiss_index, corpus: list, metadata: list, top_k: int = 50, rerank_k: int = 10) -> tuple:
    cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', device=device)
    embedding_model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device)

    tokenized_query = query.split(" ")
    bm25_scores = bm25.get_scores(tokenized_query)
    bm25_top_idx = np.argsort(bm25_scores)[::-1][:top_k]

    query_emb = embedding_model.encode(query, convert_to_numpy=True, normalize_embeddings=True)
    faiss.normalize_L2(query_emb.reshape(1, -1))
    _, dense_top_idx = faiss_index.search(query_emb.reshape(1, -1), top_k)

    candidate_indices = set(bm25_top_idx) | set(dense_top_idx[0])
    candidates = [(i, corpus[i], metadata[i]) for i in candidate_indices]

    pairs = [[query, chunk] for _, chunk, _ in candidates]
    scores = cross_encoder.predict(pairs)
    reranked = sorted(zip(scores, candidates), key=lambda x: x[0], reverse=True)[:rerank_k]

    contexts = [chunk for _, (_, chunk, _) in reranked]
    docs = [meta for _, (_, _, meta) in reranked]
    return contexts, docs

# Generate response (prompt only)
def generate_response(query: str, bm25, faiss_index, corpus: list, metadata: list) -> str:
    contexts, docs = retrieve_context(query, bm25, faiss_index, corpus, metadata)
    combined_context = "\n\n".join([f"{doc['source']}:\n{ctx}" for ctx, doc in zip(contexts, docs)])
    prompt = f"Retrieved Chunks:\n{combined_context}\n\nQuery: {query}"
    return prompt

# Main execution
if __name__ == "__main__":
    try:
        # Load JSON data
        with open("islamic_etiquette_knowledge_base.json", "r") as f1, open("Quran_app_Documentation.json", "r") as f2:
            etiquette_data = json.load(f1)
            quran_app_data = json.load(f2)

        combined_documents = etiquette_data + quran_app_data

        # Perform agentic chunking
        docs = perform_agentic_chunking_with_metadata(combined_documents)

        if not docs:
            raise ValueError("No chunked documents found. Please check your chunking process and input data.")

        # Build indices
        bm25, faiss_index, embeddings, corpus, metadata, chroma = build_index(docs)

        # Example query
        query = "What does the Quran say about Riba?"
        prompt = generate_response(query, bm25, faiss_index, corpus, metadata)
        print("\nGenerated Prompt:\n")
        print(prompt)

    except Exception as e:
        print(f"[Error] Main execution failed: {e}")

In [None]:
# Example query
query = "The quran app is good but not helpful in understanding the Quranic verses."
prompt = generate_response(query, bm25, faiss_index, corpus, metadata)
print("\nGenerated Prompt:\n")
print(prompt)


In [3]:
# Example query
query = "The quran app is good but i can't understand how to use the scheduling feature."

prompt = generate_response(query, bm25, faiss_index, corpus, metadata)
print("\nGenerated Prompt:\n")
print(prompt)
