
# 🔎 RAG on Egypt Labour Law 14/2025 — Colab Notebook

This notebook builds a **Retrieval-Augmented Generation (RAG)** pipeline that:
- Scrapes & cleans the two provided webpages (English + Arabic)
- Splits by headers and chunks with overlap
- Embeds with a **multilingual** Sentence-Transformers model
- Stores vectors in **FAISS**
- Retrieves top-*k* chunks and generates an answer using **OpenAI** (if key set) or a **local Transformers** fallback

**Sources:**
- https://eg.andersen.com/egypts-labour-law-14-2025/
- https://manshurat.org/content/qnwn-lml-ljdyd-2025

> If you have an OpenAI API key, set it in the cell below for stronger answers. Otherwise, the notebook will use a local `Flan-T5` fallback.


In [None]:

#@title ⬇️ Install dependencies
!pip -q install requests beautifulsoup4 readability-lxml lxml html5lib
!pip -q install sentence-transformers faiss-cpu
!pip -q install openai transformers accelerate torch --upgrade


In [None]:

#@title 🔐 (Optional) Set your OpenAI API key
#@markdown If you have an OpenAI key, paste it here. If not, leave blank and the notebook will use a local Transformers fallback (Flan-T5).
OPENAI_API_KEY = ""  #@param {type:"string"}
import os
if OPENAI_API_KEY:
    os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
    print("✅ OpenAI API key set")
else:
    print("ℹ️ No OpenAI key provided — using local fallback model (Flan-T5).")


In [None]:

#@title 🧩 Imports & configuration
import os, re, time, pickle, pathlib, logging
from typing import List, Dict, Tuple

import requests
from bs4 import BeautifulSoup
from readability import Document
import numpy as np
import faiss

from sentence_transformers import SentenceTransformer

# Optional LLMs
try:
    import openai
except Exception:
    openai = None

try:
    from transformers import pipeline
except Exception:
    pipeline = None

# Config
EMBEDDING_MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"  # Arabic + English
EMB_DIM = 384
URLS = [
    "https://eg.andersen.com/egypts-labour-law-14-2025/",
    "https://manshurat.org/content/qnwn-lml-ljdyd-2025",
]
INDEX_DIR = pathlib.Path("rag_index")
INDEX_DIR.mkdir(exist_ok=True)
FAISS_PATH = INDEX_DIR / "faiss.index"
META_PATH = INDEX_DIR / "metadata.pkl"

# Chunking / retrieval
MAX_CHARS = 350
OVERLAP = 60
TOP_K = 6
MAX_CONTEXT_CHARS = 8000

# Generation
OPENAI_MODEL = "gpt-4o-mini"
LOCAL_LLM = "google/flan-t5-base"
MAX_GEN_TOKENS = 512

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")


In [None]:

#@title 🧼 Scraping & text cleaning helpers
def fetch_html(url: str, retries: int = 3, timeout: int = 30) -> str:
    for i in range(retries):
        try:
            r = requests.get(url, timeout=timeout, headers={"User-Agent": "RAG/1.0"})
            r.raise_for_status()
            return r.text
        except Exception as e:
            logging.warning(f"Fetch failed ({i+1}/{retries}) {url}: {e}")
            time.sleep(1.5)
    raise RuntimeError(f"Could not fetch {url}")

def readability_clean(html: str) -> str:
    try:
        return Document(html).summary(html_partial=True)
    except Exception:
        return html

def html_to_text_keep_headers(html: str) -> str:
    soup = BeautifulSoup(html, "html5lib")
    for tag in soup(["script", "style", "noscript"]):
        tag.decompose()

    lines = []
    ctx = soup.body if soup.body else soup
    for el in ctx.descendants:
        if getattr(el, "name", None) and re.fullmatch(r"h[1-6]", el.name, flags=re.I):
            txt = el.get_text(" ", strip=True)
            if txt:
                level = int(el.name[1])
                lines.append(f"\n{'#'*level} {txt}\n")
        elif getattr(el, "name", None) == "p":
            txt = el.get_text(" ", strip=True)
            if txt:
                lines.append(txt)

    text = "\n".join(lines)
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text).strip()
    return text

def split_by_headers(text: str) -> List[Tuple[str, str]]:
    pat = re.compile(r"^(#{1,6})\s+(.*)$", flags=re.MULTILINE)
    sections, last, title = [], 0, "Document"
    for m in pat.finditer(text):
        start = m.start()
        if start > last:
            body = text[last:start].strip()
            if body:
                sections.append((title, body))
        title = m.group(2).strip()
        last = m.end()
    tail = text[last:].strip()
    if tail:
        sections.append((title, tail))
    if not sections:
        sections = [("Document", text)]
    return sections

def chunk_text(text: str, max_len: int = MAX_CHARS, overlap: int = OVERLAP) -> List[str]:
    text = text.strip()
    if len(text) <= max_len:
        return [text]
    out, start = [], 0
    while start < len(text):
        end = min(start + max_len, len(text))
        out.append(text[start:end].strip())
        if end == len(text):
            break
        start = max(0, end - overlap)
    return out

def make_docs_from_url(url: str) -> List[Dict]:
    html = fetch_html(url)
    main = readability_clean(html)
    text = html_to_text_keep_headers(main)
    sections = split_by_headers(text)

    docs, sec_id = [], 0
    for title, body in sections:
        body = re.sub(r"\n{3,}", "\n\n", body).strip()
        for i, chunk in enumerate(chunk_text(body)):
            if len(chunk) < 30:
                continue
            docs.append({
                "id": f"{url}::sec{sec_id}::chunk{i}",
                "url": url,
                "section": title,
                "chunk_id": i,
                "text": chunk
            })
        sec_id += 1
    return docs


In [None]:

#@title 🔢 Embeddings & FAISS index
class Embedder:
    def __init__(self, model_name: str = EMBEDDING_MODEL_NAME):
        logging.info(f"Loading embeddings model: {model_name}")
        self.model = SentenceTransformer(model_name)

    def encode(self, texts: List[str]) -> np.ndarray:
        embs = self.model.encode(
            texts,
            batch_size=32,
            show_progress_bar=True,
            convert_to_numpy=True,
            normalize_embeddings=True
        )
        return embs.astype(np.float32)

class FaissIndex:
    def __init__(self, dim: int, index_path: pathlib.Path, meta_path: pathlib.Path):
        self.dim = dim
        self.index_path = index_path
        self.meta_path = meta_path
               self.index = None
        self.metadata: List[Dict] = []

    def build(self, embeddings: np.ndarray, metadata: List[Dict]):
        index = faiss.IndexFlatIP(self.dim)  # cosine if normalized
        index.add(embeddings)
        self.index = index
        self.metadata = metadata

    def save(self):
        if self.index is None:
            raise RuntimeError("No index to save")
        faiss.write_index(self.index, str(self.index_path))
        with open(self.meta_path, "wb") as f:
            pickle.dump(self.metadata, f)
        logging.info(f"Saved index to {self.index_path} and metadata to {self.meta_path}")

    def load(self):
        self.index = faiss.read_index(str(self.index_path))
        with open(self.meta_path, "rb") as f:
            self.metadata = pickle.load(f)
        logging.info("Loaded FAISS index & metadata")

    def search(self, query_emb: np.ndarray, top_k: int = TOP_K) -> List[Tuple[float, Dict]]:
        if self.index is None:
            raise RuntimeError("Index not loaded")
        D, I = self.index.search(query_emb.astype(np.float32), top_k)
        hits = []
        for score, idx in zip(D[0], I[0]):
            if idx == -1:
                continue
            hits.append((float(score), self.metadata[idx]))
        return hits


In [None]:

#@title 🧠 Answer generation (OpenAI or local Transformers)
class AnswerGenerator:
    def __init__(self):
        self.use_openai = bool(openai) and bool(os.getenv("OPENAI_API_KEY"))
        if self.use_openai:
            openai.api_key = os.getenv("OPENAI_API_KEY")
            logging.info("Using OpenAI for generation.")
        else:
            if not pipeline:
                raise RuntimeError("Transformers not installed and no OPENAI_API_KEY set.")
            logging.info("Using local transformers (Flan-T5) for generation.")
            self.pipe = pipeline("text2text-generation", model=LOCAL_LLM)

    def generate(self, prompt: str) -> str:
        if self.use_openai:
            resp = openai.chat.completions.create(
                model=OPENAI_MODEL,
                temperature=0.2,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=MAX_GEN_TOKENS,
            )
            return resp.choices[0].message.content.strip()
        else:
            out = self.pipe(prompt, max_new_tokens=MAX_GEN_TOKENS, do_sample=False)[0]["generated_text"]
            return out.strip()


In [None]:

#@title 🧱 RAG pipeline
class RAGPipeline:
    def __init__(self, urls: List[str], embedder: Embedder, index: FaissIndex, generator: AnswerGenerator):
        self.urls = urls
        self.embedder = embedder
        self.index = index
        self.generator = generator

    def ingest(self, force_rebuild: bool = False):
        if FAISS_PATH.exists() and META_PATH.exists() and not force_rebuild:
            logging.info("Index exists—loading from disk.")
            self.index.load()
            return

        all_docs: List[Dict] = []
        for url in self.urls:
            docs = make_docs_from_url(url)
            logging.info(f"{url} → {len(docs)} chunks")
            all_docs.extend(docs)

        embeddings = self.embedder.encode([d["text"] for d in all_docs])
        self.index.build(embeddings, all_docs)
        self.index.save()

    def _build_prompt(self, query: str, retrieved: List[Tuple[float, Dict]]) -> str:
        parts, total = [], 0
        for score, m in retrieved:
            block = f"\n[Source: {m['url']} | Section: {m.get('section','')}] Score={score:.3f}\n{m['text']}\n"
            if total + len(block) > MAX_CONTEXT_CHARS:
                break
            parts.append(block)
            total += len(block)

        context = "\n".join(parts).strip()
        instructions = (
            "You are a legal assistant answering about Egypt’s Labour Law 14/2025.\n"
            "Ground answers ONLY in the context below (Arabic or English). If unsure, say so.\n"
            "Where relevant, cite the URL in-line as (Source: <url>). Use bullets when helpful."
        )

        return f"""{instructions}

Question:
{query}

Context:
{context}

Answer (concise and specific):
"""

    def retrieve(self, query: str) -> List[Tuple[float, Dict]]:
        q_emb = self.embedder.encode([query])
        return self.index.search(q_emb, top_k=TOP_K)

    def answer(self, query: str) -> Dict:
        hits = self.retrieve(query)
        prompt = self._build_prompt(query, hits)
        text = self.generator.generate(prompt)
        return {"query": query, "answer": text, "retrieved": hits}


In [None]:

#@title 🚀 Build the index and run demo questions
embedder = Embedder()
index = FaissIndex(EMB_DIM, FAISS_PATH, META_PATH)
generator = AnswerGenerator()
rag = RAGPipeline(URLS, embedder, index, generator)

# Set force_rebuild=True to re-scrape/re-embed
rag.ingest(force_rebuild=False)

questions = [
    "What are the key changes in Egypt’s Labour Law 14/2025?",
    "What are the rules for maternity leave in Egypt?"
]

def pretty(result: Dict):
    print("\n" + "=" * 80)
    print("Q:", result["query"])
    print("-" * 80)
    print(result["answer"])
    print("-" * 80)
    print("Top sources:")
    seen = set()
    for score, m in result["retrieved"]:
        if m["id"] in seen:
            continue
        seen.add(m["id"])
        print(f"* {m['url']} | Section: {m.get('section','')} | Score: {score:.3f}")
    print("=" * 80)

for q in questions:
    pretty(rag.answer(q))


In [None]:

#@title 💬 Ask your own questions
your_question = "Summarize penalties related to working hours"  #@param {type:"string"}
res = rag.answer(your_question)
print(res["answer"])
