In [1]:
import os
import shutil
import signal
import atexit
import pandas as pd
import docx
import fitz
import cv2
import pytesseract
import numpy as np
import ipywidgets as widgets
from IPython.display import display, Image

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

from dotenv import load_dotenv
from groq import Groq

# Load Groq API key
load_dotenv()
GROQ_API_KEY = os.getenv("Groq_API_KEY")
if not GROQ_API_KEY:
    raise ValueError("Groq_API_KEY not found in .env file")


In [39]:
def load_document(file_name):
    _, ext = os.path.splitext(file_name)
    ext = ext.lower()
    if ext == ".pdf": return "pdf"
    elif ext == ".docx": return "docx"
    elif ext == ".csv": return "csv"
    elif ext in [".xls", ".xlsx"]: return "excel"
    elif ext in [".png", ".jpg", ".jpeg"]: return "image"
    else: raise ValueError(f"Unsupported file type: {ext}")

def read_file_as_text(file_path):
    doc_type = load_document(file_path)
    
    if doc_type == "pdf":
        text = ""
        with fitz.open(file_path) as pdf:
            for page in pdf:
                text += page.get_text()
        return text
    elif doc_type == "docx":
        doc = docx.Document(file_path)
        return "\n".join([p.text for p in doc.paragraphs])
    elif doc_type == "csv":
        df = pd.read_csv(file_path)
        return "\n".join([", ".join(map(str, row)) for row in df.values])
    elif doc_type == "excel":
        df = pd.read_excel(file_path)
        return "\n".join([", ".join(map(str, row)) for row in df.values])
    elif doc_type == "image":
        img = cv2.imread(file_path)
        if img is None:
            raise FileNotFoundError(f"Image not found: {file_path}")

        # --- Preprocessing ---
        def preprocess(image):
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            blur = cv2.GaussianBlur(gray, (3, 3), 0)
            _, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
            return thresh

        # --- OCR function ---
        def ocr_find(image):
            config = '--oem 3 --psm 6'
            return pytesseract.image_to_string(image, config=config)

        img_preprocessed = preprocess(img)
        text = ocr_find(img_preprocessed)

        #print("Detected text:\n", text)
        return text



In [3]:
VECTOR_DIR = 'vectorstore'
VECTOR_DB = os.path.join(VECTOR_DIR, 'db_faiss')

def cleanup_embeddings():
    if os.path.exists(VECTOR_DIR):
        shutil.rmtree(VECTOR_DIR)
        print("Old embeddings deleted.")

def handle_signal(signum, frame):
    print(f"\nSignal {signum} received! Cleaning up embeddings...")
    cleanup_embeddings()
    sys.exit(0)

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
atexit.register(cleanup_embeddings)

def convert_to_vector(all_content):
    if not os.path.exists(VECTOR_DIR):
        os.makedirs(VECTOR_DIR)

    if os.path.exists(VECTOR_DB):
        print("Loading existing vectorstore...")
        vector_store = FAISS.load_local(
            VECTOR_DB,
            HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-MiniLM-L6-v2",
                model_kwargs={'device': 'cpu'}
            ),
            allow_dangerous_deserialization=True
        )
    else:
        splitter = RecursiveCharacterTextSplitter(
            separators=["\n\n","\n"," "],
            chunk_size=3000,
            chunk_overlap=50
        )
        all_chunks = []
        for doc in all_content:
            chunks = splitter.split_text(doc.page_content)
            all_chunks.extend(chunks)
        
        embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={'device': 'cpu'}
        )
        vector_store = FAISS.from_texts(all_chunks, embeddings)
        vector_store.save_local(VECTOR_DB)
        print("Vector store created and saved.")
    return vector_store

def find_relevant_chunk(query, vector_store, top_k=100):
    return vector_store.similarity_search(query, k=top_k)


In [62]:
mikey = Groq(api_key=GROQ_API_KEY)

prompt_template_string = """
You are a lightweight assistant. Ground answers in the context provided. 
If answer not found, say: “I don't know from the current knowledge base.” 
Context: {context} 
Question: {question} 
Answer:
"""

prompt = PromptTemplate(
    input_variables=["context", "question"], 
    template=prompt_template_string
)

class GroqLLM:
    def __init__(self, client, prompt_template, model="llama-3.1-8b-instant"):
        self.client = client
        self.model = model
        self.prompt_template = prompt_template

    def __call__(self, inputs: dict) -> str:
        prompt_text = self.prompt_template.format(**inputs)
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt_text}],
            max_tokens=500,
            temperature=0.7,
        )
        return response.choices[0].message.content.strip()

llm = GroqLLM(mikey, prompt)
rag_chain = llm | StrOutputParser()

def call_llm(query: str, context: str) -> str:
    inputs = {"context": context, "question": query}
    try:
        return rag_chain.invoke(input=inputs)
    except Exception as e:
        print("Groq API error:", e)
        return "Error generating response."

import tempfile
import os
import ipywidgets as widgets
from IPython.display import display

uploader = widgets.FileUpload(
    accept='.pdf,.docx,.csv,.xls,.xlsx,.png,.jpg,.jpeg', 
    multiple=True
)
display(uploader)

all_content = []

def process_upload(change):
    all_content.clear()
    for uploaded_file in uploader.value:
        filename = uploaded_file.name
        print(f"Processing: {filename}")
        ext = os.path.splitext(filename)[1]
        with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
            tmp.write(uploaded_file.content)
            tmp_path = tmp.name
        try:
            text = read_file_as_text(tmp_path)
            all_content.append(Document(page_content=text))
            print("ext extracted (first 500 chars):\n", text[:500])
        except Exception as e:
            print(f"Error processing {filename}: {e}")
        finally:
            os.remove(tmp_path)

uploader.observe(process_upload, names='value')


FileUpload(value=(), accept='.pdf,.docx,.csv,.xls,.xlsx,.png,.jpg,.jpeg', description='Upload', multiple=True)

In [61]:
vector_store = None

def init_vector_store(_):
    global vector_store
    if not all_content:
        print("No files uploaded yet.")
        return
    vector_store = convert_to_vector(all_content)
    print("Vector store ready.")

btn_vector = widgets.Button(description="process")
btn_vector.on_click(init_vector_store)
display(btn_vector)


Button(description='process', style=ButtonStyle())

In [60]:
import ipywidgets as widgets
from IPython.display import display, HTML

# --- Query box ---
query_box = widgets.Text(
    placeholder='Type your question here...',
    description='Query:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='80%')
)

# --- Output widget ---
output_area = widgets.Output()

# --- Scrollable display function ---
def display_scrollable_answer(answer, width='1000px', height='300px'):
    display(HTML(f"""
    <div style="
        width:{width};
        height:{height};
        overflow:auto;
        white-space:pre-wrap;
        border:1px solid #ccc;
        padding:10px;
        font-family:monospace;
        background:#f9f9f9;
    ">
    {answer}
    </div>
    """))

# --- Query handling ---
def run_query(change):
    query = query_box.value.strip()
    if not query:
        return

    with output_area:
        output_area.clear_output()
        print(f"Query: {query}\n")
        try:
            context = find_relevant_chunk(query, vector_store)
            answer = call_llm(query, context)
            display_scrollable_answer(answer)  #scrollable output here
        except Exception as e:
            print("Error:", e)

# --- Use 'on_submit' replacement ---
query_box.continuous_update = False
query_box.observe(run_query, names='value')

# --- Display UI ---
display(query_box, output_area)



Text(value='', continuous_update=False, description='Query:', layout=Layout(width='80%'), placeholder='Type yo…

Output()