<a href="https://colab.research.google.com/github/ahmedellaboudy/IBMwatsonxRAGpipeline/blob/main/FinalIBMWatsonxPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# CELL 1: Installations
!pip install -q chromadb sentence-transformers transformers torch langchain langchain-community langchain-huggingface accelerate bitsandbytes

In [None]:
# CELL 2: Import Libraries
import json
import torch
import warnings
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFacePipeline
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
from google.colab import files
warnings.filterwarnings('ignore')

In [None]:
# CELL 3: RAG Pipeline Class
class WatsonXRAGPipeline:
    def __init__(self, json_file_path: str, persist_directory: str = "./watsonx_chroma_db"):
        self.json_file_path = json_file_path
        self.persist_directory = persist_directory
        self.embedding_model = None
        self.vectorstore = None
        self.llm_chain = None

    def load_chunks_from_json(self):
        print("Loading chunks from JSON file...")

        with open(self.json_file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        chunks = data.get('chunks', [])
        metadata = data.get('metadata', {})

        print(f"Loaded {len(chunks)} chunks")
        print(f"Sources: {metadata.get('unique_sources', 'Unknown')}")

        return chunks, metadata

    def setup_embeddings_and_vectorstore(self, chunks):
        print("Setting up embeddings and vectorstore...")

        self.embedding_model = HuggingFaceEmbeddings(
            model_name="all-MiniLM-L6-v2",
            model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )

        documents = []
        for chunk in chunks:
            doc = Document(
                page_content=chunk['text'],
                metadata={
                    'source': chunk['metadata']['source'],
                    'url': chunk['metadata']['url'],
                    'chunk_id': chunk['id']
                }
            )
            documents.append(doc)

        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embedding_model,
            persist_directory=self.persist_directory
        )

        print(f"Vectorstore created with {len(documents)} documents")
        return self.vectorstore

    def setup_llm(self):
        print("Loading language model...")

        model_name = "Qwen/Qwen2.5-3B-Instruct"
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        try:
            if torch.cuda.is_available():
                model = AutoModelForCausalLM.from_pretrained(
                    model_name,
                    torch_dtype=torch.bfloat16,
                    device_map="auto",
                    trust_remote_code=True
                )
                device_info = "GPU with bfloat16"
            else:
                model = AutoModelForCausalLM.from_pretrained(
                    model_name,
                    torch_dtype=torch.float32,
                    trust_remote_code=True
                )
                device_info = "CPU with float32"
        except Exception as e:
            print(f"Primary loading failed, using fallback: {e}")
            model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=torch.float16,
                device_map="auto" if torch.cuda.is_available() else None,
                trust_remote_code=True
            )
            device_info = "fallback with float16"

        print(f"Model loaded on {device_info}")

        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            return_full_text=False,
            max_new_tokens=300,
            do_sample=True,
            temperature=0.1,
            repetition_penalty=1.15,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id
        )

        llm = HuggingFacePipeline(pipeline=pipe)

        prompt_template = PromptTemplate(
            input_variables=["context", "question"],
            template="""You are an IBM watsonx documentation assistant. Provide accurate, concise answers based on the provided context.

Guidelines:
- Use only information from the provided context
- Be direct and specific
- Include relevant URLs when mentioned in the context
- If information is not available, state this clearly

Context: {context}

Question: {question}

Answer:"""
        )

        self.llm_chain = LLMChain(llm=llm, prompt=prompt_template, verbose=False)
        print("Language model setup complete")

        return self.llm_chain

    def retrieve_context(self, query: str, k: int = 4) -> str:
        if not self.vectorstore:
            return ""

        docs = self.vectorstore.similarity_search(query, k=k)

        if not docs:
            return ""

        context_parts = []
        seen_sources = set()

        for doc in docs:
            source = doc.metadata.get('source', 'Unknown Source')
            url = doc.metadata.get('url', '')

            if source not in seen_sources:
                context_parts.append(f"Source: {source}")
                if url:
                    context_parts.append(f"URL: {url}")
                seen_sources.add(source)
                context_parts.append("")

            context_parts.append(doc.page_content.strip())
            context_parts.append("")

        return "\n".join(context_parts)

    def clean_response(self, response: str) -> str:
        if isinstance(response, dict):
            response = response.get("text", str(response))

        response = response.strip()

        cleanup_phrases = [
            "Based on the context", "According to the context",
            "The context mentions", "From the provided information",
            "The provided context shows", "Based on the information provided"
        ]

        for phrase in cleanup_phrases:
            response = response.replace(phrase, "").strip()

        response = " ".join(response.split())

        return response

    def query(self, question: str) -> dict:
        if not self.vectorstore or not self.llm_chain:
            return {
                "error": "Pipeline not properly initialized",
                "answer": "System not ready. Please initialize the pipeline first."
            }

        try:
            context = self.retrieve_context(question, k=4)

            if not context:
                return {
                    "question": question,
                    "answer": "I don't have information about that in the IBM watsonx documentation.",
                    "sources": []
                }

            response = self.llm_chain.invoke({"question": question, "context": context})
            answer = self.clean_response(response.get("text", response))

            docs = self.vectorstore.similarity_search(question, k=3)
            sources = []
            seen_titles = set()

            for doc in docs:
                title = doc.metadata.get('source', 'Unknown Source')
                url = doc.metadata.get('url', '')

                if title not in seen_titles:
                    sources.append({"title": title, "url": url})
                    seen_titles.add(title)

            return {
                "question": question,
                "answer": answer,
                "sources": sources,
                "context_used": True
            }

        except Exception as e:
            return {
                "question": question,
                "error": f"Error processing query: {str(e)}",
                "answer": "I encountered an error while processing your question. Please try again."
            }


In [None]:
# CELL 4: Helper Functions
def upload_json_file():
    """Upload JSON file to Colab"""
    print("Please upload your IBM_WatsonX_chunks JSON file:")
    uploaded = files.upload()

    if not uploaded:
        raise ValueError("No file uploaded!")

    filename = list(uploaded.keys())[0]
    print(f"Uploaded file: {filename}")
    return filename

def initialize_pipeline():
    """Initialize all pipeline components"""
    print("=" * 60)
    print("IBM watsonx RAG Pipeline Setup")
    print("=" * 60)

    json_filename = upload_json_file()
    json_file_path = f"/content/{json_filename}"
    print(f"JSON file ready at: {json_file_path}")

    print("\nInitializing RAG Pipeline...")
    print("-" * 40)

    rag_pipeline = WatsonXRAGPipeline(json_file_path)
    chunks, metadata = rag_pipeline.load_chunks_from_json()
    vectorstore = rag_pipeline.setup_embeddings_and_vectorstore(chunks)
    llm_chain = rag_pipeline.setup_llm()

    print("\nPipeline initialization complete!")
    print("Ready to answer questions about IBM watsonx.")

    return rag_pipeline

def start_chat(rag_pipeline):
    """Interactive chat interface"""
    print("\n" + "=" * 60)
    print("IBM watsonx Documentation Assistant")
    print("=" * 60)
    print("Ask questions about IBM watsonx documentation.")
    print("Commands: 'exit' to quit")
    print("-" * 60)

    while True:
        try:
            user_input = input("\nYou: ").strip()

            if not user_input:
                continue

            if user_input.lower() in {'exit', 'quit', 'stop', 'q'}:
                print("\nGoodbye! Thank you for using the IBM watsonx Documentation Assistant.")
                break

            result = rag_pipeline.query(user_input)

            if 'error' in result and result['error']:
                print(f"Error: {result['answer']}")
            else:
                print(f"Answer: {result['answer']}")

                if result.get('sources'):
                    print("\nRelevant documentation:")
                    for i, source in enumerate(result['sources'], 1):
                        print(f"{i}. {source['title']}")
                        if source['url']:
                            print(f"   {source['url']}")

            print("-" * 50)

        except KeyboardInterrupt:
            print("\n\nSession interrupted. Goodbye!")
            break
        except Exception as e:
            print(f"Unexpected error: {str(e)}")
            print("Please try again.")


In [None]:
# CELL 5: Main Execution Pipeline
def main():
    """Main execution function"""
    try:
        rag_pipeline = initialize_pipeline()
        start_chat(rag_pipeline)

    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        print("Please check your setup and try again.")

In [None]:
# CELL 6: Run the application
if __name__ == "__main__":
    main()