In [9]:
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import fitz
import re

def chunk_text(text, max_tokens=1000, overlap=200):
    words = text.split()
    return [' '.join(words[i:i + max_tokens]) for i in range(0, len(words), max_tokens - overlap)]

class DocumentLoader:
    def load(self, path):
        return open(path, 'r', encoding='utf-8').read() if path.endswith('.txt') else '\n'.join(page.get_text() for page in fitz.open(path))

class VectorDB:
    def __init__(self, client):
        self.client = client
        self.data = []
        self.stop_words = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"])

    def add(self, docs, metadata):
        for doc, meta in zip(docs, metadata):
            for i, chunk in enumerate(chunk_text(doc)):
                embedding = self.embed(chunk)
                if embedding:
                    self.data.append({"embedding": embedding, "metadata": f"{meta} (chunk {i+1})", "text": chunk})

    def embed(self, text):
        try:
            return self.client.embeddings.create(input=text, model="text-embedding-ada-002").data[0].embedding
        except Exception as e:
            print(f"Embedding error: {e}")
            return []

    def extract_keywords(self, text):
        words = re.findall(r'\b\w+\b', text.lower())
        return [word for word in words if word not in self.stop_words]

    def search(self, query, k=3):
        keywords = self.extract_keywords(query)
        relevant_chunks = []
        
        for d in self.data:
            score = sum(keyword in d["text"].lower() for keyword in keywords) / len(keywords)
            if score > 0:
                relevant_chunks.append((d["metadata"], d["text"], score))
        
        if len(relevant_chunks) < k:
            q_embed = self.embed(query)
            if q_embed:
                sims = cosine_similarity([q_embed], [d["embedding"] for d in self.data])[0]
                sorted_indices = sims.argsort()[::-1]
                for i in sorted_indices:
                    if len(relevant_chunks) >= k:
                        break
                    if (self.data[i]["metadata"], self.data[i]["text"], sims[i]) not in relevant_chunks:
                        relevant_chunks.append((self.data[i]["metadata"], self.data[i]["text"], sims[i]))
        
        return sorted(relevant_chunks, key=lambda x: x[2], reverse=True)[:k]

class SystemRolePrompt:
    def __init__(self, template):
        self.template = template
    def create_message(self):
        return {"role": "system", "content": self.template}

class UserRolePrompt:
    def __init__(self, template):
        self.template = template
    def create_message(self, **kwargs):
        return {"role": "user", "content": self.template.format(**kwargs)}

RAG_PROMPT_TEMPLATE = """
You are a helpful AI assistant tasked with answering questions based on the provided context.
Use the following guidelines:
1. Only use information from the given context to answer the query.
2. If the context doesn't contain relevant information, respond with "I don't have enough information to answer this question."
3. Provide concise and accurate answers, citing the source (metadata) when possible.
4. If the information is ambiguous or contradictory, explain the discrepancies.
5. Do not make up or infer information that is not present in the context.
"""

USER_PROMPT_TEMPLATE = """
Context:
{context}

User Query: {user_query}

Please provide a clear and concise answer based on the above context.
"""

rag_prompt = SystemRolePrompt(RAG_PROMPT_TEMPLATE)
user_prompt = UserRolePrompt(USER_PROMPT_TEMPLATE)

class RetrievalAugmentedQAPipeline:
    def __init__(self, llm: OpenAI, vector_db: VectorDB):
        self.llm = llm
        self.vector_db = vector_db

    def run_pipeline(self, user_query: str) -> dict:
        context_list = self.vector_db.search(user_query, k=3)
        context_prompt = "\n\n".join([f"Source: {meta} (Relevance: {sim:.2f})\n{text}" for meta, text, sim in context_list])
        formatted_system_prompt = rag_prompt.create_message()
        formatted_user_prompt = user_prompt.create_message(user_query=user_query, context=context_prompt)
        try:
            response = self.llm.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[formatted_system_prompt, formatted_user_prompt],
                max_tokens=300
            )
            answer = response.choices[0].message.content.strip()
        except Exception as e:
            print(f"Error generating response: {e}")
            answer = "I'm sorry, I encountered an error while processing your query."
        return {"response": answer, "context": [(meta, text[:200] + "...", sim) for meta, text, sim in context_list]}

def main():
    api_key = input("Enter OpenAI API key: ").strip()
    client = OpenAI(api_key=api_key)
    loader = DocumentLoader()
    db = VectorDB(client)
    txt_doc = loader.load("/Users/annatucker/AEI4/Week 1/Day 2/data/PMarcaBlogs.txt")
    pdf_doc = loader.load("/Users/annatucker/AEI4/Week 1/Day 2/data/The-pmarca-Blog-Archives copy.pdf")
    db.add([txt_doc, pdf_doc], ["Text File", "PDF File"])
    pipeline = RetrievalAugmentedQAPipeline(client, db)
    query = "What is the 'Michael Eisner Memorial Weak Executive Problem'?"
    result = pipeline.run_pipeline(query)
    print(f"Response: {result['response']}\n")
    print("Context:")
    for meta, text, sim in result['context']:
        print(f"Source: {meta}, Relevance: {sim:.2f}")
        print(f"Text preview: {text}")
        print("=" * 50)

if __name__ == "__main__":
    main()

Response: The 'Michael Eisner Memorial Weak Executive Problem' refers to the phenomenon where a CEO or startup founder hires someone weak into an executive role for a function they previously excelled in, possibly due to a reluctance to let go of that function. This can result in hiring executives who are not as strong or competent as needed for the role, ultimately impacting the company's success.

Context:
Source: Text File (chunk 21), Relevance: 1.00
Text preview: output– accomplishment. Validate it by reference checking peers, reports, and bosses. Along the way, reference check personality and teamwork, but look Xrst and foremost for a pattern of output. Fiah,...
Source: PDF File (chunk 21), Relevance: 1.00
Text preview: succceed without him, then your ideal executive hire is someone who will succeed without you. • Beware hiring a big company executive for a startup.The executive skill sets required for a big company ...
Source: Text File (chunk 24), Relevance: 0.50
Text preview: h