In [2]:
import os
import json
from typing import List, Dict
from pathlib import Path

import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from langdetect import detect, DetectorFactory

In [3]:
DetectorFactory.seed = 0 #fix langdetect randomness

In [4]:
# base paths
BASE_DIR = Path().resolve()
DOCS_DIR = BASE_DIR / "data" / "docs"
INDEX_DIR = BASE_DIR / "index"

DOCS_DIR, INDEX_DIR

(PosixPath('/Users/arturschaefer/Documents/Freelancing/Repos/rag-llm-example/data/docs'),
 PosixPath('/Users/arturschaefer/Documents/Freelancing/Repos/rag-llm-example/index'))

In [5]:
# load markdown documents

def load_documents(docs_dir: Path) -> List[Dict]:
    documents = []
    for path in sorted(docs_dir.glob("*.md")):
        text = path.read_text(encoding="utf-8")
        lines = text.splitlines()
        
        # use first Markdown heading as title
        if lines and lines[0].lstrip().startswith("#"):
            title = lines[0].lstrip("#").strip()
        else:
            title = path.stem
        
        documents.append({
            "id": path.stem,
            "title": title,
            "text": text
        })
    return documents

docs = load_documents(DOCS_DIR)
len(docs), [d["id"] for d in docs]


(5,
 ['employee_handbook',
  'it_security',
  'remote_work_policy',
  'travel_expenses',
  'vacation_policy'])

In [6]:
# chunk documents (simple double-newline split)

def chunk_document(doc: Dict, min_chars: int = 40) -> List[Dict]:
    raw_chunks = [c.strip() for c in doc["text"].split("\n\n") if c.strip()]
    chunks = []

    for i, c in enumerate(raw_chunks):
        if len(c) < min_chars:
            continue
        chunks.append({
            "chunk_id": f"{doc['id']}-{i}",
            "doc_id": doc["id"],
            "title": doc["title"],
            "text": c,
        })
    return chunks

all_chunks = []
for d in docs:
    all_chunks.extend(chunk_document(d))

len(all_chunks)


20

In [7]:
# create embeddings

EMBED_MODEL = "sentence-transformers/distiluse-base-multilingual-cased-v2"
embed_model = SentenceTransformer(EMBED_MODEL)

texts = [c["text"] for c in all_chunks]
embeddings = embed_model.encode(texts, convert_to_numpy=True, normalize_embeddings=True)


In [8]:
# save embeddings

np.save(INDEX_DIR / "embeddings.npy", embeddings)
with open(INDEX_DIR / "chunks.json", "w", encoding="utf-8") as f:
    json.dump(all_chunks, f, ensure_ascii=False, indent=2)


In [9]:
# retriever (semantic search)

def retrieve(query: str, k: int = 5) -> List[Dict]:
    q_emb = embed_model.encode([query], normalize_embeddings=True)[0]
    sims = embeddings @ q_emb  # cosine similarity since normalized
    top_idx = np.argsort(-sims)[:k]

    results = []
    for idx in top_idx:
        c = all_chunks[idx]
        results.append({
            "chunk_id": c["chunk_id"],
            "doc_id": c["doc_id"],
            "title": c["title"],
            "text": c["text"],
            "score": float(sims[idx]),
        })
    return results

retrieve("Wie viele Urlaubstage habe ich?", k=3)


[{'chunk_id': 'vacation_policy-1',
  'doc_id': 'vacation_policy',
  'title': 'Vacation Policy',
  'text': '**1. Annual Leave Entitlement**  \nEmployees receive 25 days of paid vacation per year. Leave is accrued monthly during the first year.',
  'score': 0.31855595111846924},
 {'chunk_id': 'vacation_policy-2',
  'doc_id': 'vacation_policy',
  'title': 'Vacation Policy',
  'text': '**2. Request Procedure**  \nVacation requests must be submitted via the HR portal at least two weeks in advance. Approval is subject to team workload and staffing requirements.',
  'score': 0.259296715259552},
 {'chunk_id': 'remote_work_policy-1',
  'doc_id': 'remote_work_policy',
  'title': 'Remote Work Policy',
  'text': '**1. Eligibility**  \nEmployees may work remotely up to three days per week, subject to manager approval.',
  'score': 0.2439272403717041}]

In [10]:
# language detection

def detect_language(text: str) -> str:
    cleaned = text.strip()
    if not cleaned:
        return "en"
     
    lang = detect(cleaned)
    if lang in ["de", "en"]:
        return lang
    else: return "en"

In [20]:
# build the rag prompt - optimized for flan-t5

def build_prompt(query: str, contexts: List[Dict]) -> str:
    user_lang = detect_language(query)

    # keep context short â€“ flan-t5-base has limited context
    context_parts = []
    for c in contexts:
        context_parts.append(c["text"])
    context_str = "\n\n".join(context_parts)

    if user_lang == "de":
        # Simplified instruction that works better with flan-t5
        instruction = (
            "Answer the question based on the context below. Answer in full sentences. "
            "Answer in German.\n\n"
        )
        prompt = (
            instruction +
            "Context:\n" + context_str + "\n\n" +
            "Frage: " + query + "\n" +
            "Antwort:"
        )
    else:
        instruction = (
            "Answer the question based on the context below. Answer in full sentences.\n\n"
        )
        prompt = (
            instruction +
            "Context:\n" + context_str + "\n\n" +
            "Question: " + query + "\n" +
            "Answer:"
        )

    return prompt


In [21]:
# local llm using transformers

LLM_NAME = "google/flan-t5-base"

tokenizer = AutoTokenizer.from_pretrained(LLM_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(LLM_NAME)

llm_pipeline = pipeline(
    "text2text-generation",
    model=model,
    tokenizer=tokenizer,
    device_map="auto",  # or "cpu"
)


Device set to use mps:0


In [22]:
# llm call helper - optimized parameters

def call_local_llm(prompt: str, max_new_tokens: int = 128, temperature: float = 0.3) -> str:
    result = llm_pipeline(
        prompt,
        max_new_tokens=max_new_tokens,
        do_sample=True,  # Enable sampling for better multilingual output
        temperature=temperature,
        top_p=0.95,
        repetition_penalty=1.2
    )
    raw = result[0]["generated_text"]
    return raw.strip()


In [23]:
# NOTE: For better German performance, consider these alternatives:
# - "google/mt5-base" (better multilingual support)
# - "google/flan-t5-large" (better overall quality)
# - Use API: OpenAI GPT-3.5/4, Anthropic Claude, etc.
# flan-t5-base has limited multilingual capabilities


In [24]:
# full rag pipeline - optimized retrieval

def rag_answer(query: str, k: int = 3) -> Dict:
    # Use fewer contexts (k=3 instead of 5) for better focus
    contexts = retrieve(query, k=k)
    prompt = build_prompt(query, contexts)
    answer = call_local_llm(prompt, max_new_tokens=100)

    return {
        "answer": answer,
        "sources": contexts
    }


In [25]:
res = rag_answer("how many vacation days do I get?", k=3)

print("Antwort:")
print(res["answer"])

print("\n\nQuellen:")
for s in res["sources"]:
    print(f"- {s['title']}: {s['text'][:100]}... (score: {s['score']:.3f})")


Antwort:
25


Quellen:
- Vacation Policy: **1. Annual Leave Entitlement**  
Employees receive 25 days of paid vacation per year. Leave is accr... (score: 0.346)
- Travel & Expense Guidelines: **3. Meals**  
Per-diem allowances follow local regulations. Receipts must be uploaded within ten da... (score: 0.274)
- Remote Work Policy: **1. Eligibility**  
Employees may work remotely up to three days per week, subject to manager appro... (score: 0.265)
