In [None]:
!pip install openai==0.28.1
!pip install tiktoken==0.6.0
!pip install langchain==0.1.20
!pip install langchain-openai
!pip install PyPDF2
from google.colab import userdata
userdata.get('OPENAI_API_KEY')

API_KEY = userdata.get('OPENAI_API_KEY')

print(API_KEY)

Collecting openai==0.28.1
  Downloading openai-0.28.1-py3-none-any.whl.metadata (11 kB)
Downloading openai-0.28.1-py3-none-any.whl (76 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.0/77.0 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: openai
  Attempting uninstall: openai
    Found existing installation: openai 1.52.2
    Uninstalling openai-1.52.2:
      Successfully uninstalled openai-1.52.2
Successfully installed openai-0.28.1
Collecting tiktoken==0.6.0
  Downloading tiktoken-0.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Downloading tiktoken-0.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tiktoken
Successfully installed tiktoken-0.6.0
Collecting langchain==0.1.20
  Downloading langchain-0.1.20-py3-none-any.whl.m

In [2]:
# First, install required packages
!pip install openai PyPDF2 chromadb langchain langchain-community langchain-openai tqdm

import os
from typing import List, Dict
import PyPDF2
from openai import OpenAI
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from google.colab import drive
from google.colab import userdata
import io
from tqdm.notebook import tqdm

def setup_api():
    api_key = userdata.get('OPENAI_API_KEY')  # Replace with your actual API key
    if not api_key:
        raise ValueError("API key not found")
    os.environ["OPENAI_API_KEY"] = api_key
    client = OpenAI(api_key=api_key)
    return api_key, client

class ElixirRAGSystem:
    def __init__(self, persist_directory: str = "/content/drive/MyDrive/vector_db"):
        """Initialize the RAG system with vector store configuration."""
        # Mount Google Drive
        drive.mount('/content/drive')

        # Set up API key and client
        self.api_key, self.client = setup_api()

        self.persist_directory = persist_directory
        self.embeddings = OpenAIEmbeddings(openai_api_key=self.api_key)
        self.vector_store = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )

    def load_existing_vectorstore(self) -> bool:
        """Try to load existing vector store."""
        try:
            if os.path.exists(self.persist_directory) and os.listdir(self.persist_directory):
                print("Found existing vector store, loading...")
                self.vector_store = Chroma(
                    persist_directory=self.persist_directory,
                    embedding_function=self.embeddings
                )
                print("Vector store loaded successfully!")
                return True
            return False
        except Exception as e:
            print(f"Error loading vector store: {str(e)}")
            return False

    def initialize_system(self, base_path: str = "/content/drive/MyDrive/ZLibrary", force_rebuild: bool = False):
        """Initialize the system, optionally forcing rebuild of vector store."""
        if not force_rebuild and self.load_existing_vectorstore():
            print("System initialized with existing vector store")
            return True

        print("No existing vector store found or rebuild forced. Creating new vector store...")
        try:
            # Process documents
            documents = self.process_pdfs(base_path)
            if not documents:
                print("No documents to process!")
                return False

            # Create vector store
            self.create_vector_store(documents)
            return True

        except Exception as e:
            print(f"Error initializing system: {str(e)}")
            return False

    def scan_elixir_pdfs(self, base_path: str) -> List[Dict]:
        """Scan for PDF files containing 'elixir' in their name or path."""
        elixir_pdfs = []

        if not os.path.exists(base_path):
            raise ValueError(f"Path not found: {base_path}")

        print("Scanning for Elixir-related PDFs...")
        for root, dirs, files in os.walk(base_path):
            for file in files:
                if file.lower().endswith('.pdf'):
                    full_path = os.path.join(root, file)
                    if 'elixir' in file.lower() or 'elixir' in full_path.lower():
                        file_info = {
                            'path': full_path,
                            'name': file,
                            'size': os.path.getsize(full_path) / (1024 * 1024)
                        }
                        elixir_pdfs.append(file_info)
                        print(f"Found Elixir PDF: {file} ({file_info['size']:.2f} MB)")

        print(f"\nFound {len(elixir_pdfs)} Elixir-related PDF files")
        return elixir_pdfs

    def parse_pdf(self, file_info: Dict) -> str:
        """Extract text from a PDF file."""
        try:
            text = ""
            with open(file_info['path'], 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                total_pages = len(pdf_reader.pages)
                print(f"Processing {file_info['name']} ({total_pages} pages)")

                for i, page in enumerate(pdf_reader.pages, 1):
                    text += page.extract_text()
                    if i % 10 == 0:
                        print(f"Processed {i}/{total_pages} pages")
            return text
        except Exception as e:
            print(f"Error processing {file_info['name']}: {str(e)}")
            return ""

    def process_pdfs(self, base_path: str) -> List[str]:
        """Process all Elixir PDF documents and split them into chunks."""
        documents = []
        pdf_files = self.scan_elixir_pdfs(base_path)

        if not pdf_files:
            print("No Elixir PDF files found!")
            return documents

        for file_info in tqdm(pdf_files, desc="Processing PDFs"):
            try:
                print(f"\nProcessing: {file_info['name']}")
                text = self.parse_pdf(file_info)
                if text:
                    chunks = self.text_splitter.split_text(text)
                    documents.extend(chunks)
                    print(f"Successfully processed: {file_info['name']}")
                    print(f"Created {len(chunks)} chunks from this document")
            except Exception as e:
                print(f"Error processing {file_info['name']}: {str(e)}")
                continue

        print(f"\nTotal chunks created: {len(documents)}")
        return documents

    def create_vector_store(self, documents: List[str]):
        """Create or update the vector store with document chunks."""
        if not documents:
            raise ValueError("No documents to process")

        print("Creating vector store...")
        self.vector_store = Chroma.from_texts(
            texts=documents,
            embedding=self.embeddings,
            persist_directory=self.persist_directory
        )
        self.vector_store.persist()
        print("Vector store created and persisted successfully")

    def query_vector_store(self, query: str, k: int = 3) -> List[Dict]:
        """Retrieve relevant documents based on a query."""
        if not self.vector_store:
            raise ValueError("Vector store not initialized. Please process documents first.")

        results = self.vector_store.similarity_search_with_relevance_scores(query, k=k)
        return [{"content": doc.page_content, "score": score} for doc, score in results]

    def generate_response(self, query: str, context_docs: List[Dict]) -> str:
        """Generate a response using OpenAI with retrieved context."""
        context = "\n\n".join([doc["content"] for doc in context_docs])

        prompt = f"""Given the following context from Elixir documentation:
        {context}

        Please answer the following question about Elixir:
        {query}
        """

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant specializing in Elixir programming. Provide accurate and technical answers based on the provided documentation context."},
                {"role": "user", "content": prompt}
            ]
        )

        return response.choices[0].message.content

    def process_query(self, query: str) -> Dict:
        """Process a query end-to-end: retrieve context and generate response."""
        context_docs = self.query_vector_store(query)
        response = self.generate_response(query, context_docs)
        return {
            "response": response,
            "context": context_docs
        }

def create_chat_interface(rag_system):
    """Create an interactive chat interface for the RAG system."""

    def print_separator():
        print("\n" + "="*80 + "\n")

    def print_help():
        print("\nAvailable commands:")
        print("- 'exit': Exit the chat")
        print("- 'help': Show this help message")
        print("- 'sources': Toggle showing sources")
        print("- 'clear': Clear the screen")
        print("- 'rebuild': Rebuild the vector store from documents")

    show_sources = True
    print("\nElixir Documentation Chat Interface")
    print("Ask questions about Elixir and get answers from your documents")
    print_help()

    while True:
        print_separator()
        query = input("Your question: ").strip()

        if not query:
            continue

        if query.lower() == 'exit':
            print("\nGoodbye!")
            break

        if query.lower() == 'help':
            print_help()
            continue

        if query.lower() == 'sources':
            show_sources = not show_sources
            print(f"\nShowing sources: {show_sources}")
            continue

        if query.lower() == 'clear':
            print("\n" * 50)
            continue

        if query.lower() == 'rebuild':
            print("\nRebuilding vector store...")
            rag_system.initialize_system(force_rebuild=True)
            continue

        try:
            result = rag_system.process_query(query)

            print_separator()
            print("Answer:", result["response"])

            if show_sources:
                print_separator()
                print("Sources:")
                for idx, ctx in enumerate(result["context"], 1):
                    print(f"\nSource {idx} (Relevance: {ctx['score']:.2f}):")
                    print(f"{ctx['content'][:200]}...")

        except Exception as e:
            print(f"\nError: {str(e)}")
            print("Please try rephrasing your question or check if the system is properly initialized.")

def start_chat():
    try:
        print("Initializing RAG system...")
        rag = ElixirRAGSystem()

        if not rag.initialize_system():
            print("Failed to initialize the system. Please check your setup.")
            return

        create_chat_interface(rag)

    except Exception as e:
        print(f"Error initializing the system: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    print("Starting Elixir Documentation Chat System...")
    start_chat()

Collecting langchain-community
  Downloading langchain_community-0.3.5-py3-none-any.whl.metadata (2.9 kB)
Collecting SQLAlchemy<3,>=1.4 (from langchain)
  Downloading SQLAlchemy-2.0.35-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting httpx-sse<0.5.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting langchain
  Downloading langchain-0.3.7-py3-none-any.whl.metadata (7.1 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.6.1-py3-none-any.whl.metadata (3.5 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading marshmallow-3.23.1-py3-none-any.whl.metadata (7.5 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain

  self.vector_store = Chroma(


Vector store loaded successfully!
System initialized with existing vector store

Elixir Documentation Chat Interface
Ask questions about Elixir and get answers from your documents

Available commands:
- 'exit': Exit the chat
- 'help': Show this help message
- 'sources': Toggle showing sources
- 'clear': Clear the screen
- 'rebuild': Rebuild the vector store from documents


Your question: usage of pipe operators 


Answer: In Elixir, the pipe operator (`|>`) is used to take the result of one operation and pass (or "pipe") it along as the first argument to a function in the next operation. This is often used to chain together multiple operations in a clear and concise way.

For example, if you have the functions `prev(arg1, arg2)` and `next(arg3, arg4)`, you can use the pipe operator to pass the result of `prev` as the first argument to `next`, like so:

`prev(arg1, arg2) |> next(arg3, arg4)`

Which is equivalent to:

`next(prev(arg1, arg2), arg3, arg4)`

The pipe operator is especially

KeyboardInterrupt: Interrupted by user