In [1]:
!pip install -qU transformers[sentencepiece] sentence-transformers faiss-cpu langchain gradio python-multipart pdfminer.six


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.7/93.7 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m77.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m471.5/471.5 kB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.8/156.8 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.2/46.2 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.8/56.8 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m208.3/208.3 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
HF_TOKEN = "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-hftoken"


In [5]:
# GenAI-based Study Assistant (Colab-ready)
# Author: ChatGPT
# Run this entire file in Google Colab (create a new notebook, paste into a cell or upload as a .py and run).
# Features:
# - Upload / load documents (txt, pdf via plain text extraction)
# - Build FAISS vector store using sentence-transformers embeddings
# - Simple Retrieval-Augmented Generation (RAG) using a Hugging Face generation model
# - Utilities: summarize document, generate quizzes, ask questions (Q&A)
# - Lightweight Gradio UI to interact with the assistant in Colab

# --- Instructions before running ---
# 1) (Recommended) In Colab, set your Hugging Face token in the following cell or as an environment variable:
#    HF_TOKEN = "<your_huggingface_token>"
#    You can create one at https://huggingface.co/settings/tokens ("Write" scope not needed for inference).
# 2) Run the notebook. If you get out-of-memory issues, switch to a smaller model (see MODEL_NAME below).

# --- Important notes ---
# - This script uses CPU / GPU depending on Colab runtime. If you have GPU enabled, transformers will use it.
# - Models like 'google/flan-t5-large' are large; consider using 'google/flan-t5-small' or 'tiiuae/falcon-mini' for smaller footprint.

# ----------------- Setup & Installs -----------------

# If running inside Colab, uncomment and run the installs below. If running locally, install equivalent packages.

# !pip install -qU transformers[sentencepiece] sentence-transformers faiss-cpu langchain gradio python-multipart pdfminer.six

# ----------------- Imports -----------------
from typing import List, Tuple
import os
import tempfile
import math
import json

# Text processing
from pathlib import Path

# Transformers for generation
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# Sentence Transformers for embeddings
from sentence_transformers import SentenceTransformer

# Vector store
import faiss
import numpy as np

# PDF reading
from pdfminer.high_level import extract_text

# Simple web UI
import gradio as gr

# ----------------- Configuration -----------------
# Choose models here. If you have GPU, prefer larger models; otherwise select small models.
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"  # small and fast
# Generation models (seq2seq) - flan variants are good. Choose smaller if memory is limited.
GEN_MODEL = "google/flan-t5-small"  # change to 'google/flan-t5-large' if you have GPU and memory

# Number of tokens to generate for answers
MAX_NEW_TOKENS = 256

# Number of retrieved docs
TOP_K = 4

# ----------------- Helpers -----------------

def read_text_file(path: str) -> str:
    return Path(path).read_text(encoding="utf-8")


def read_pdf_file(path: str) -> str:
    # extract_text from pdfminer
    return extract_text(path)


def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """Split long text into overlapping chunks (approx chunk_size words)."""
    words = text.split()
    chunks = []
    i = 0
    while i < len(words):
        chunk = " ".join(words[i : i + chunk_size])
        chunks.append(chunk)
        i += chunk_size - overlap
    return chunks

# ----------------- Embeddings & FAISS -----------------
class FaissIndex:
    def __init__(self, embedding_model_name: str = EMBEDDING_MODEL):
        self.embedding_model_name = embedding_model_name
        self.embedder = SentenceTransformer(embedding_model_name)
        self.index = None
        self.id2meta = {}
        self.dim = self.embedder.get_sentence_embedding_dimension()

    def build(self, texts: List[str], metadatas: List[dict] = None):
        """Build a FAISS index from texts. metadatas is parallel list of dicts."""
        embs = self.embedder.encode(texts, show_progress_bar=True, convert_to_numpy=True)
        self.index = faiss.IndexFlatIP(self.dim)
        # normalize for cosine similarity
        faiss.normalize_L2(embs)
        self.index.add(embs)
        self.id2meta = {i: (texts[i], metadatas[i] if metadatas else {}) for i in range(len(texts))}

    def add(self, texts: List[str], metadatas: List[dict] = None):
        start = len(self.id2meta)
        embs = self.embedder.encode(texts, show_progress_bar=False, convert_to_numpy=True)
        faiss.normalize_L2(embs)
        if self.index is None:
            self.index = faiss.IndexFlatIP(self.dim)
        self.index.add(embs)
        for i, t in enumerate(texts):
            self.id2meta[start + i] = (t, (metadatas[i] if metadatas else {}))

    def search(self, query: str, k: int = TOP_K) -> List[Tuple[str, float, dict]]:
        q_emb = self.embedder.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(q_emb)
        D, I = self.index.search(q_emb, k)
        results = []
        for score, idx in zip(D[0], I[0]):
            if idx == -1:
                continue
            text, meta = self.id2meta[idx]
            results.append((text, float(score), meta))
        return results

# ----------------- Generator (RAG) -----------------
class RAGGenerator:
    def __init__(self, gen_model_name: str = GEN_MODEL, hf_token: str = None):
        # If using HF hub model requiring token, set environment variable HUGGINGFACEHUB_API_TOKEN
        if hf_token:
            os.environ["HUGGINGFACEHUB_API_TOKEN"] = hf_token
        self.tokenizer = AutoTokenizer.from_pretrained(gen_model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name)
        self.pipe = pipeline(
            "text2text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            device=-1,  # CPU by default; set to 0 for GPU if available and supported
        )

    def generate_answer(self, question: str, contexts: List[str], max_new_tokens: int = MAX_NEW_TOKENS) -> str:
        # Build prompt by concatenating retrieved contexts
        prompt = """You are a helpful study assistant. Use the following context pieces from documents to answer the question concisely and accurately. If the answer is not present in the context, say 'I don't know based on the provided materials.'\n\n"""
        for i, c in enumerate(contexts):
            prompt += f"Context {i+1}: {c}\n\n"
        prompt += f"Question: {question}\nAnswer:"
        out = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        return out[0]["generated_text"].strip()

    def summarize(self, text: str, max_new_tokens: int = 128) -> str:
        prompt = f"Summarize the following text in clear bullet points:\n\n{text}\n\nSummary:"
        out = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        return out[0]["generated_text"].strip()

    def generate_quiz(self, text: str, num_questions: int = 5, max_new_tokens: int = 256) -> str:
        prompt = (
            f"Create {num_questions} multiple-choice questions (4 choices each) from the following text. Include the correct answer letter.\n\n{text}\n\nQuestions:"
        )
        out = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        return out[0]["generated_text"].strip()

# ----------------- App Logic -----------------

# We'll keep a global index and generator instance for the Gradio app session
INDEX = None
GEN = None
DOCUMENTS = []


def init(hf_token: str = None):
    global INDEX, GEN
    INDEX = FaissIndex()
    GEN = RAGGenerator(gen_model_name=GEN_MODEL, hf_token=hf_token)
    return "Initialized models (embedder + generator)."


def add_documents_from_text(text: str, source_name: str = "user_doc"):
    global INDEX, DOCUMENTS
    chunks = chunk_text(text, chunk_size=250, overlap=50)
    metas = [{"source": source_name, "chunk_id": i} for i in range(len(chunks))]
    if INDEX.index is None:
        INDEX.build(chunks, metas)
    else:
        INDEX.add(chunks, metas)
    DOCUMENTS.extend([(source_name, c) for c in chunks])
    return f"Added {len(chunks)} chunks from {source_name}."


def upload_and_index(file):
    # Gradio file input gives a tempfile-like object with .name
    p = file.name
    suffix = p.split('.')[-1].lower()
    if suffix in ['txt', 'md']:
        text = read_text_file(p)
    elif suffix in ['pdf']:
        text = read_pdf_file(p)
    else:
        # attempt to read as text
        try:
            text = read_text_file(p)
        except Exception as e:
            return f"Unsupported file type: {e}"
    return add_documents_from_text(text, source_name=Path(p).name)


def answer_question(query: str) -> str:
    # Retrieve top contexts
    results = INDEX.search(query, k=TOP_K)
    contexts = [r[0] for r in results]
    # Generate answer
    answer = GEN.generate_answer(query, contexts)
    # return answer with sources
    srcs = [r[2].get('source', f'chunk_{i}') for i, r in enumerate(results)]
    meta_str = '\n'.join([f"Context {i+1} score={r[1]:.3f} source={r[2].get('source','-')}" for i, r in enumerate(results)])
    return f"Answer:\n{answer}\n\nRetrieved contexts:\n{meta_str}"


def summarize_document(doc_index: int = 0) -> str:
    # Summarize a particular document chunk (doc_index in DOCUMENTS)
    _, text = DOCUMENTS[doc_index]
    return GEN.summarize(text)


def generate_quiz_from_doc(doc_index: int = 0, num_questions: int = 5) -> str:
    _, text = DOCUMENTS[doc_index]
    return GEN.generate_quiz(text, num_questions=num_questions)

# ----------------- Gradio UI -----------------

def launch_gradio(hf_token: str = None):
    init(hf_token=hf_token)

    with gr.Blocks() as demo:
        gr.Markdown("# GenAI Study Assistant (Colab Demo)")
        with gr.Row():
            with gr.Column():
                upload = gr.File(label="Upload document (txt / pdf)")
                btn_upload = gr.Button("Upload & Index")
                txt_manual = gr.Textbox(lines=8, label="Or paste text here")
                btn_add_text = gr.Button("Add pasted text")
                btn_init = gr.Button("(Re)initialize models")
            with gr.Column():
                query = gr.Textbox(label="Ask a question")
                btn_ask = gr.Button("Ask")
                out_qa = gr.Textbox(label="Answer & Retrieved contexts", lines=8)

        with gr.Row():
            doc_index = gr.Number(value=0, label="Document chunk index (for summarization / quiz)")
            btn_summary = gr.Button("Summarize chunk")
            out_summary = gr.Textbox(label="Summary", lines=6)
            btn_quiz = gr.Button("Generate quiz from chunk")
            out_quiz = gr.Textbox(label="Quiz", lines=8)

        # Callbacks
        btn_init.click(lambda hf=hf_token: init(hf), inputs=[], outputs=[gr.Textbox(visible=False)])
        btn_upload.click(upload_and_index, inputs=[upload], outputs=[out_summary])
        btn_add_text.click(lambda t: add_documents_from_text(t, source_name='pasted_text'), inputs=[txt_manual], outputs=[out_summary])
        btn_ask.click(answer_question, inputs=[query], outputs=[out_qa])
        btn_summary.click(lambda idx: summarize_document(int(idx)), inputs=[doc_index], outputs=[out_summary])
        btn_quiz.click(lambda idx: generate_quiz_from_doc(int(idx), num_questions=5), inputs=[doc_index], outputs=[out_quiz])

    demo.launch(share=False)

# ----------------- If run as script -----------------
if __name__ == '__main__':
    print("This is a module. To run the Gradio demo, call launch_gradio(hf_token='<YOUR_HF_TOKEN>')")
    # Example: launch_gradio(hf_token=os.getenv('HF_TOKEN'))

launch_gradio(hf_token=HF_TOKEN)





This is a module. To run the Gradio demo, call launch_gradio(hf_token='<YOUR_HF_TOKEN>')


Device set to use cpu


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.
* To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>