In [None]:
# Imports
!pip -q install langchain faiss-cpu sentence-transformers transformers openai langchain_community google-search-results streamlit colabcode pyngrok

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m391.6 kB/s[0m eta [36m0:00:00[0m
Requested uvicorn==0.13.1 from https://files.pythonhosted.org/packages/ef/67/546c35e9fffb585ea0608ba3bdcafe17ae402e304367203d0b08d6c23051/uvicorn-0.13.1-py3-none-any.whl (from colabcode) has invalid metadata: .* suffix can only be used with `==` or `!=` operators
    python-dotenv (>=0.13.*) ; extra == 'standard'
                   ~~~~~~~^
Please use pip<24.1 if you need to use this version.[0m[33m
Requested uvicorn==0.13.1 from https://files.pythonhosted.org/packages/ef/67/546c35e9fffb585ea0608ba3bdcafe17ae402e304367203d0b08d6c23051/uvicorn-0.13.1-py3-none-any.whl (from colabcode) has invalid metadata: .* suffix can only be used with `==` or `!=` operators
    python-dotenv (>=0.13.*) ; extra == 'standard'
                   ~~~~~~~^
Please use pip<24.1 if you need to use this version.[0m[33m
Requested uvicor

In [None]:
import os
import getpass
import logging

logging.basicConfig(
    format="%(asctime)s %(levelname)s: %(message)s",
    level=logging.INFO
)

os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
os.environ["SERPAPI_API_KEY"] = getpass.getpass("Enter your SERP API key: ")
os.environ["NGROK_API_KEY"] = getpass.getpass("Enter your NGROK API key: ")

logging.info("Successfully retrieved and set all API keys.")

Enter your OpenAI API key: ··········
Enter your SERP API key: ··········
Enter your NGROK API key: ··········


In [None]:
%%writefile tour_rag_service.py
from __future__ import annotations

import re, os, json, logging, datetime
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from transformers import pipeline
from langchain.chat_models import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage

logging.basicConfig(
    format="%(asctime)s %(levelname)s  %(message)s",
    datefmt="%H:%M:%S",
    level=logging.INFO,
)
log = logging.getLogger("TourRAG")

_DOMAIN_RE = re.compile(
    r"\b(concerts?|tours?|gigs?|venues?|setlists?|support acts?|festivals?|arenas?|stadiums?)\b",
    re.I,
)
_ARTIST_RE = re.compile(
    r"\b(tours?|concerts?|bands?|artists?|singers?|gigs?|shows?)\b", re.I
)

def _is_tour_query(txt: str) -> bool:
    return bool(_DOMAIN_RE.search(txt))

def _looks_like_artist_query(txt: str) -> bool:
    return bool(_ARTIST_RE.search(txt)) and len(txt.split()) <= 15

def _extract_artist(q: str) -> str:
    cleaned = re.sub(r"[^\w\s]", " ", q)
    cleaned = re.sub(
        r"\b(upcoming|concerts?|tours?|shows?|dates?|gigs?)\b", " ", cleaned, flags=re.I
    )
    words = [w for w in cleaned.split() if len(w) >= 2]
    return " ".join(words).strip()

def _snip(acc: str, res) -> str:
    if isinstance(res, list):
        text = "\n".join(d.get("snippet", "") for d in res if isinstance(d, dict))
    else:
        text = str(res)
    return (acc + "\n" + text).strip()

class _VecDB:
    def __init__(self, dim: int):
        self.idx = faiss.IndexFlatIP(dim)
        self.txt: List[str]  = []
        self.meta: List[Dict] = []

    def add(self, emb: np.ndarray, text: str, meta: Dict):
        self.idx.add(emb.astype("float32")[None])
        self.txt.append(text)
        self.meta.append(meta)

    def search_all(self, emb: np.ndarray, min_sim: float = 0.25):
        if not self.idx.ntotal:
            return []
        D, I = self.idx.search(emb.astype("float32")[None], self.idx.ntotal)
        hits = [
            (float(sim), self.txt[i], self.meta[i])
            for sim, i in zip(D[0], I[0])
            if i != -1 and sim >= min_sim
        ]
        hits.sort(key=lambda t: t[0], reverse=True)
        return [(txt, meta) for _, txt, meta in hits]

class TourRAG:
    def __init__(self, save_data=True):
        self.emb   = SentenceTransformer("multi-qa-MiniLM-L6-cos-v1")
        self.summ  = pipeline("summarization", model="facebook/bart-large-cnn")
        self.llm   = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
        self.vs    = _VecDB(self.emb.get_sentence_embedding_dimension())
        self.save_data = save_data

        try:
            from langchain_community.utilities import SerpAPIWrapper
            self.web = SerpAPIWrapper() if os.getenv("SERPAPI_API_KEY") else None
        except Exception:
            self.web = None

        self.train_root = Path("training_data")
        if self.save_data:
            self.train_root.mkdir(exist_ok=True)

    def _embed(self, txt: str) -> np.ndarray:
        return self.emb.encode(txt, convert_to_numpy=True, normalize_embeddings=True)

    def _summarize(self, txt: str, max_tokens: int = 130) -> str:
        words = txt.split()
        if len(words) <= 950:
            return self.summ(
                txt, max_length=max_tokens, min_length=20, do_sample=False
            )[0]["summary_text"]
        out = []
        for i in range(0, len(words), 950):
            chunk = " ".join(words[i : i + 950])
            out.append(
                self.summ(
                    chunk, max_length=max_tokens, min_length=20, do_sample=False
                )[0]["summary_text"]
            )
        return " ".join(out)

    def ingest(self, text: str) -> str:
        if not _is_tour_query(text):
            raise ValueError("Sorry, I cannot ingest documents with other themes.")
        summary = self._summarize(text)
        self.vs.add(self._embed(summary), summary, {"doc_id": self.vs.idx.ntotal})
        return summary

    def answer(self, query: str) -> str:
        # 1️⃣  try RAG
        rag_hits = self.vs.search_all(self._embed(query))
        if rag_hits:
            ctx = "\n\n".join(f"- {txt}" for txt, _ in rag_hits[:20])
            guard = (
                "You are a QA assistant. Use ONLY the facts that appear in the context.\n"
                "If the context lacks the answer, respond with exactly:\n"
                "    I don't know.\n"
                "====  CONTEXT  ====\n"
                f"{ctx}\n"
                "==== END CONTEXT ===="
            )
            ans = self.llm([SystemMessage(content=guard), HumanMessage(content=query)]).content.strip()
            if ans.lower() != "i don't know.":
                self._save_training_sample(
                    query=query,
                    answer=ans,
                    meta={"mode": "rag", "context": ctx}
                )
                return ans

        # 2️⃣  not a tour query => bail early
        if not _is_tour_query(query) and not _looks_like_artist_query(query):
            return "I don't know."

        # 3️⃣  artist lookup on the Web
        if _looks_like_artist_query(query) and self.web is not None:
            ans, snippets = self._answer_via_web(query)
            if ans.lower() != "i don't know.":
                self._save_training_sample(
                    query=query,
                    answer=ans,
                    meta={"mode": "web", "snippets": snippets}
                )
            return ans

        return "I don't know."

    def _answer_via_web(self, query: str) -> Tuple[str, str]:
        artist  = _extract_artist(query)
        variants = [
            f"{artist} tour dates 2025 2026",
            f"{artist} upcoming concerts 2025",
            f"{artist} world tour 2024 2025",
            query,
        ]

        snippets = ""
        for q in variants:
            try:
                res = self.web.run(q)
                snippets = _snip(snippets, res)
                if len(snippets) >= 600:
                    break
            except Exception as e:
                log.error(f"SerpAPI error on '{q}': {e}")

        if not snippets:
            return "I don't know.", snippets

        guard = (
            "Using ONLY the information inside WEB SNIPPETS, answer the user.\n"
            "If the snippets do not contain the answer, respond with exactly:\n"
            "    I don't know.\n"
            "====  WEB SNIPPETS  ====\n"
            f"{snippets}\n"
            "==== END SNIPPETS ===="
        )
        ans = self.llm([SystemMessage(content=guard), HumanMessage(content=query)]).content.strip()
        return ans, snippets

    def _save_training_sample(self, *, query: str, answer: str, meta: Dict):
        """Persist a single (<system> + <user> + assistant) triple.

        Output is wrapped in <output> tags to ease post‑processing.
        """
        if not self.save_data:
          return

        iso_date = datetime.date.today().isoformat()
        doc_dir  = self.train_root / iso_date
        doc_dir.mkdir(exist_ok=True)

        idx = len(list(doc_dir.glob("sample_*.json")))
        fname = doc_dir / f"sample_{idx:03d}.json"

        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": query}
                ]
            },
            {
                "role": "assistant",
                "content": [
                    {"type": "text", "text": f"<output>{answer}</output>"}
                ]
            },
        ]

        record = {
            "messages": messages,
            "meta": meta,
        }

        try:
            with fname.open("w", encoding="utf-8") as fh:
                json.dump(record, fh, indent=2, ensure_ascii=False)
            log.info(f"Training sample saved → {fname}")
        except Exception as e:
            log.error(f"Could not write training sample: {e}")


Writing tour_rag_service.py


In [None]:
%%writefile app.py
import streamlit as st
from tour_rag_service import TourRAG

st.set_page_config(page_title="🎤 Concert‑Tour RAG", page_icon="🎸", layout="wide")

rag = st.session_state.get("rag") or TourRAG()
st.session_state["rag"] = rag

st.title("🎤 Concert‑Tour RAG Assistant")

with st.form(key="io_form"):
    uploaded_file = st.file_uploader(
        "📄 Upload document (optional)",
        type=["txt"],
        help="Select a .txt or .md file to ingest"
    )
    doc_text = ""
    if uploaded_file is not None:
        raw = uploaded_file.read()
        doc_text = raw.decode("utf-8", errors="replace")
        st.text_area(
            "📄 Document preview",
            value=doc_text,
            height=220,
            disabled=True
        )

    query_text = st.text_input(
        "💬 Question (optional)",
        placeholder="Ask about tour dates, venues, set‑lists …"
    )

    submitted = st.form_submit_button("Submit")

if submitted:
    if not doc_text.strip() and not query_text.strip():
        st.warning("Please upload a document, enter a question, or both.")
        st.stop()

    # ---------- 1) ingest (if a document was supplied) ----------
    if doc_text.strip():
        try:
            summary = rag.ingest(doc_text)
            st.success("✅ Document ingested successfully!")
            with st.expander("📑 Generated summary"):
                st.write(summary)
        except ValueError as e:
            st.error(str(e))
            st.stop()

    # ---------- 2) question‑answering (if a question was supplied) ----------
    if query_text.strip():
        answer = rag.answer(query_text)
        st.markdown("**🎯 Answer:**")
        st.write(answer)

st.caption(
    "ℹ️ Answers are sourced from your ingested documents; "
    "if the information is absent *and* the prompt looks like an artist name, "
    "the assistant does a live web lookup."
)


Writing app.py


In [None]:
from pyngrok import conf, ngrok
import subprocess, shlex, time, atexit

# authenticate
conf.get_default().auth_token = os.getenv("NGROK_API_KEY")

public_url = ngrok.connect(8501, "http")
print(f"🌐 Streamlit UI available at: {public_url}")

# start the Streamlit server
proc = subprocess.Popen(
    shlex.split("streamlit run app.py --server.port 8501 --server.headless true")
)

# ensure clean shutdown
def _cleanup():
    proc.terminate()
    ngrok.kill()

atexit.register(_cleanup)

# keep notebook alive
while True:
    time.sleep(60)


🌐 Streamlit UI available at: NgrokTunnel: "https://e690-34-82-81-90.ngrok-free.app" -> "http://localhost:8501"


KeyboardInterrupt: 