# Expat Legal Aid Advisor ‚Äî Complete End-to-End Notebook (Enhanced)

This notebook sets up the full multi-agent system, runs tests with coverage, and launches a Gradio UI. It also includes **smoke tests**, **PDF/DOCX parsing checks**, and a **translation-fallback** path.

## 1) Install Dependencies

In [2]:
%%writefile requirements.txt
gradio
flask
pytest
coverage
cryptography
requests
google-generativeai
Flask-HTTPAuth
Flask-Limiter
gunicorn
PyPDF2
reportlab
python-docx
langdetect
jedi


Writing requirements.txt


In [3]:
!pip install -r requirements.txt

Collecting coverage (from -r requirements.txt (line 4))
  Downloading coverage-7.12.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl.metadata (9.1 kB)
Collecting Flask-HTTPAuth (from -r requirements.txt (line 8))
  Downloading Flask_HTTPAuth-4.8.0-py3-none-any.whl.metadata (2.9 kB)
Collecting Flask-Limiter (from -r requirements.txt (line 9))
  Downloading flask_limiter-4.0.0-py3-none-any.whl.metadata (6.2 kB)
Collecting gunicorn (from -r requirements.txt (line 10))
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting PyPDF2 (from -r requirements.txt (line 11))
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting reportlab (from -r requirements.txt (line 12))
  Downloading reportlab-4.4.5-py3-none-any.whl.metadata (1.7 kB)
Collecting python-docx (from -r requirements.txt (line 13))
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting langdetect (from -r requirements.txt (line 14))
  Downloa

## 2) Load Secrets (GOOGLE_API_KEY) from Colab

In [4]:
from google.colab import userdata
import os
from google.colab.userdata import SecretNotFoundError
from cryptography.fernet import Fernet # Import Fernet

# Helper function to get secrets with graceful fallback
def get_secret_or_fallback(key, fallback_value):
    try:
        return userdata.get(key)
    except SecretNotFoundError:
        print(f"‚ö†Ô∏è Secret '{key}' not found. Using fallback value.")
        return fallback_value

# Generate a default Fernet key for SESSION_SECRET if not provided
default_session_secret = Fernet.generate_key().decode()

os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY') or ''
assert os.environ['GOOGLE_API_KEY'], "GOOGLE_API_KEY is not set. Add it in Runtime -> Run time settings -> Secrets."
os.environ['SESSION_SECRET'] = get_secret_or_fallback('SESSION_SECRET', default_session_secret)
os.environ['FLASK_API_KEY'] = get_secret_or_fallback('FLASK_API_KEY', 'flask_api_key_fallback')
print("‚úÖ Secrets loaded into environment.")

‚ö†Ô∏è Secret 'SESSION_SECRET' not found. Using fallback value.
‚ö†Ô∏è Secret 'FLASK_API_KEY' not found. Using fallback value.
‚úÖ Secrets loaded into environment.


## 3) Create Folder Structure & __init__.py

In [5]:

import os
folders = ['project','project/core','project/memory','project/tools','project/agents','project/ui','project/tests']
for p in folders:
    os.makedirs(p, exist_ok=True)
    with open(os.path.join(p,'__init__.py'), 'w', encoding='utf-8') as f:
        f.write('# Package initializer')
print('‚úÖ Folders and __init__.py files created.')

‚úÖ Folders and __init__.py files created.


## 4) Write Core Modules

In [6]:

%%writefile project/core/context_engineering.py
import re
PRIVACY_DISCLAIMER = (
    "Privacy Notice: Your input may contain sensitive legal information. "
    "We do not persist document contents."
)

def sanitize_input(text: str) -> str:
    text = re.sub(r"<[^>]*>", "", str(text or ""))
    text = re.sub(r"\s+", " ", text).strip()
    return text[:10000]

class ContextEngine:
    def build_context(self, user_input, session_data=None, document_content=None):
        parts = []
        parts.append(f"Context(session={session_data or {}})")
        parts.append(f"Input={sanitize_input(user_input)}")
        if document_content:
            parts.append(f"Document={sanitize_input(document_content)}")
        return " ".join(parts)


Writing project/core/context_engineering.py


In [7]:

%%writefile project/core/observability.py
import logging, json
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(message)s')
class Observability:
    @staticmethod
    def log(event, payload=None, contains_pii=False):
        safe_payload = {'detail': '[REDACTED]'} if contains_pii else (payload or {})
        record = {
            'ts': datetime.utcnow().isoformat(),
            'event': event,
            'payload': safe_payload,
        }
        logging.info(json.dumps(record))


Writing project/core/observability.py


In [8]:

%%writefile project/core/a2a_protocol.py
import uuid
from datetime import datetime

def create_message(sender, receiver, payload):
    return {
        'task_id': str(uuid.uuid4()),
        'sender': sender,
        'receiver': receiver,
        'payload': payload,
        'timestamp': datetime.utcnow().isoformat()
    }


Writing project/core/a2a_protocol.py


## 5) Write Memory & Tools (with DOCX support)

In [9]:

%%writefile project/memory/session_memory.py
import os
from cryptography.fernet import Fernet
class SessionMemory:
    def __init__(self):
        key = os.getenv('SESSION_SECRET')
        self._key = key.encode() if key else Fernet.generate_key()
        self._fernet = Fernet(self._key)
        self._store = {}
    def store(self, k, v):
        self._store[k] = self._fernet.encrypt(str(v).encode())
    def retrieve(self, k):
        return self._fernet.decrypt(self._store[k]).decode() if k in self._store else None


Writing project/memory/session_memory.py


In [10]:
%%writefile project/tools/tools.py
# project/tools/tools.py
import os
import ast
import operator
import requests
import time
import google.generativeai as genai
from PyPDF2 import PdfReader
from docx import Document as DocxDocument

# --- Utilities ---
def retry_generic(func, retries=3, delay=2, exceptions=(Exception,)):
    for attempt in range(retries):
        try:
            return func()
        except exceptions:
            if attempt < retries - 1:
                time.sleep(delay)
            else:
                raise

def summarizer(text, max_len=200):
    text = str(text or "")
    return text[:max_len] + '...' if len(text) > max_len else text

# --- Calculator ---
class SafeCalculator:
    OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv}

    @classmethod
    def evaluate(cls, expr):
        try:
            node = ast.parse(expr, mode='eval').body
            return cls._eval(node)
        except Exception:
            return 'Invalid expression'

    @classmethod
    def _eval(cls, node):
        if isinstance(node, ast.Num):
            return node.n
        if isinstance(node, ast.BinOp):
            return cls.OPS[type(node.op)](cls._eval(node.left), cls._eval(node.right))
        if isinstance(node, ast.UnaryOp):
            if isinstance(node.op, ast.UAdd):
                return +cls._eval(node.operand)
            if isinstance(node.op, ast.USub):
                return -cls._eval(node.operand)
        raise ValueError('Unsupported')

# --- Simple local search ---
class SimpleSearch:
    def __init__(self, corpus=None):
        self.corpus = corpus or []

    def add(self, doc_id, text):
        self.corpus.append({"id": doc_id, "text": str(text or "")})

    def query(self, q, top_k=3):
        ql = [w for w in str(q or "").lower().split() if w]
        scored = []
        for item in self.corpus:
            textl = item["text"].lower()
            score = sum(textl.count(w) for w in ql)
            if score:
                scored.append((score, item["id"], item["text"]))
        return sorted(scored, key=lambda x: -x[0])[:top_k]

# --- Domain tools ---
class DomainTools:
    VISA_KEYWORDS = ['visa', 'residence', 'permit', 'work', 'study', 'family', 'application', 'document']

    @classmethod
    def extract_visa_requirements(cls, text):
        t = str(text or "").lower()
        found = [k for k in cls.VISA_KEYWORDS if k in t]
        return {"has_visa_context": bool(found), "matched_keywords": found}

# --- Translator ---
class GoogleTranslator:
    ENDPOINT = 'https://translation.googleapis.com/language/translate/v2'

    def __init__(self):
        self.api_key = os.getenv('GOOGLE_API_KEY')
        if not self.api_key:
            raise RuntimeError('GOOGLE_API_KEY missing')

    def translate(self, text, target='en', source='auto'):
        def do():
            resp = requests.post(
                self.ENDPOINT,
                params={'key': self.api_key},
                json={'q': text, 'target': target, 'source': source, 'format': 'text'},
                timeout=10,
            )
            resp.raise_for_status()
            return resp.json()['data']['translations'][0]['translatedText']
        return retry_generic(do)

# --- File extractors ---
def extract_pdf_text(pdf_path):
    # Temporarily remove broad exception handling for debugging
    reader = PdfReader(pdf_path)
    pages = []
    for page in reader.pages:
        pages.append(page.extract_text() or "")
    return "".join(pages)

def extract_docx_text(docx_path):
    try:
        doc = DocxDocument(docx_path)
        return "".join(p.text for p in doc.paragraphs)
    except Exception:
        return ""

# --- Gemini LLM ---
class GeminiLLM:
    MAX_RESPONSE_LENGTH = 2000 # Define max response length

    def __init__(self):
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            raise RuntimeError('GOOGLE_API_KEY missing for GeminiLLM')
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-2.5-flash-lite')

    def generate_response(self, user_question_original, user_question_en=None, document_content_en=None, citations=None, document_content_original=None, reply_language='en'):
        def _call():
            from project.ui.i18n import get_language_name
            prompt_parts = [
                "You are an expert legal aid advisor for expats. ",
                "If document translations are provided, use them for reasoning; ",
                "otherwise, reason directly over the original language content. ",
            ]
            prompt_parts.append(f"Original user question (language hint): {summarizer(user_question_original, 200)}")
            if user_question_en:
                prompt_parts.append(f"English-translated user question: {summarizer(user_question_en, 500)}")
            if document_content_en:
                prompt_parts.append(f"Translated legal document (for user's preferred language): {summarizer(document_content_en, 2000)}")
            elif document_content_original:
                prompt_parts.append(f"Original-language legal document (context): {summarizer(document_content_original, 2000)}")
            if citations:
                prompt_parts.append(f"Relevant excerpts: {summarizer(' | '.join(citations), 500)}")
            prompt_parts.append("Provide a precise, structured, and legally sound answer. Cite relevant parts when possible.")
            # FIX #3: Removed contradictory "Always respond in the language of the original user question"
            # Now only enforce the preferred reply language
            lang_name = get_language_name(reply_language)
            prompt_parts.append(f"Reply ONLY in {lang_name}. Do not use any other language.")
            response = self.model.generate_content("".join(prompt_parts))

            # Summarize the final response to ensure it's within limits
            return summarizer(response.text, self.MAX_RESPONSE_LENGTH)
        try:
            return retry_generic(_call)
        except Exception:
            return "Unable to generate response at this time."


Writing project/tools/tools.py


## 6) Write Agents & Main Orchestrator (translation fallback)

In [11]:
%%writefile project/agents/planner.py
from project.core.context_engineering import sanitize_input
class Planner:
    def plan(self, user_input, document_content=None, document_language='auto', preferred_language='en'):
        return {
            "tasks": [{
                "action": "process",
                "details": {
                    "user_input": sanitize_input(user_input),
                    "document": sanitize_input(document_content) if document_content else None,
                    "document_language": document_language,
                    "preferred_language": preferred_language
                }
            }]
        }


Writing project/agents/planner.py


In [12]:
%%writefile project/agents/evaluator.py
# project/agents/evaluator.py
import re

class Evaluator:
    def _estimate_confidence(self, text):
        txt = str(text or '')
        if not txt.strip() or len(txt.strip()) < 20:
            return 0.55
        if len(txt) > 500:
            return 0.92
        if 'keyword' in txt.lower() or 'found' in txt.lower():
            return 0.88
        return 0.80

    def _polish_text(self, raw):
        if not raw:
            return 'I could not generate a meaningful answer based on the provided information.'
        cleaned = re.sub(r'(?i)validated response: ?', '', str(raw)).strip()
        cleaned = cleaned.replace('Document processed.', 'After reviewing your document,')
        if not cleaned.lower().startswith(('here', 'after', 'based', 'i')):
            cleaned = "Here is my assessment: " + cleaned
        return cleaned

    def evaluate(self, result):
        return {'response': self._polish_text(result), 'confidence': round(self._estimate_confidence(result), 2)}

Writing project/agents/evaluator.py


In [13]:
%%writefile project/agents/worker.py
# project/agents/worker.py
from project.tools.tools import GoogleTranslator, GeminiLLM, SimpleSearch, DomainTools
from project.core.context_engineering import ContextEngine
from langdetect import detect, LangDetectException

class Worker:
    def __init__(self):
        self.llm = GeminiLLM()
        self.context_engine = ContextEngine()
        self.search = SimpleSearch()

    def _detect_language(self, text):
        """Auto-detect language using langdetect. Returns language code or 'en' on failure."""
        if not text or len(text) < 3:
            return 'en'
        try:
            detected = detect(text)
            # Map common codes: pt->es, zh-cn->auto, etc.
            lang_map = {'pt': 'es', 'zh-cn': 'en', 'zh-tw': 'en'}
            return lang_map.get(detected, detected)
        except LangDetectException:
            return 'en'

    def _translate_safe(self, text, target='en'):
        try:
            if not text:
                return None
            translator = GoogleTranslator()
            return translator.translate(text, target=target)
        except Exception as e:
            # Log but don't raise; fallback to original
            print(f"‚ö†Ô∏è Translation failed: {str(e)}")
            return None

    def execute(self, task):
        a = task.get("action")
        d = task.get("details", {})
        if a != "process":
            return "Unknown action"

        user_input_original = d.get("user_input", "")
        document = d.get("document")
        document_language = d.get("document_language", "auto")
        preferred_language = d.get("preferred_language", 'en')

        # Auto-detect document language if 'auto' is specified
        if document_language == "auto" and document:
            document_language = self._detect_language(document)
            print(f"üìç Auto-detected document language: {document_language}")

        # Translate question to English for internal reasoning
        user_input_en = self._translate_safe(user_input_original, target='en')

        # FIX #1 & #4: Translate document to preferred language (for user communication)
        # Also use document_language as source hint for better translation
        document_translated = self._translate_safe(document, target=preferred_language) if document else None

        # Also translate to English for search/reasoning if preferred language is not English
        document_en = self._translate_safe(document, target='en') if document and preferred_language != 'en' else document_translated

        # Build local search corpus using English content for better search accuracy
        doc_for_search = document_en or document or ""
        if doc_for_search:
            self.search.add("doc", doc_for_search)
        citations = [t for _, _, t in self.search.query(user_input_en or user_input_original, top_k=2)]
        domain_info = DomainTools.extract_visa_requirements(doc_for_search)

        # LLM generation: Pass translated document and enforced reply language
        # Use document_translated (in preferred language) for reasoning
        base = self.llm.generate_response(
            user_question_original=user_input_original,
            user_question_en=user_input_en,
            document_content_en=document_translated,
            citations=citations,
            document_content_original=document if (document and not document_translated) else None,
            reply_language=preferred_language
        )

        if domain_info.get("has_visa_context"):
            matched = ', '.join(domain_info.get('matched_keywords', []))
            base = f"{base}\n\n(Detected visa-related context: {matched})"

        return base


Writing project/agents/worker.py


In [14]:
%%writefile project/main_agent.py
# project/main_agent.py
from project.agents.planner import Planner
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator
from project.core.context_engineering import PRIVACY_DISCLAIMER
from project.core.observability import Observability
from project.core.a2a_protocol import create_message
from project.memory.session_memory import SessionMemory

class MainAgent:
    def __init__(self):
        self.planner = Planner()
        self.worker = Worker()
        self.evaluator = Evaluator()
        self.memory = SessionMemory()

    def handle_message(self, user_input, document_content=None, document_language='auto', preferred_language='en'):
        Observability.log('start', {'input_len': len(str(user_input))}, contains_pii=True)
        plan = self.planner.plan(user_input, document_content, document_language, preferred_language)
        task = plan["tasks"][0]
        msg = create_message('planner', 'worker', task)
        Observability.log('a2a_msg', {'id': msg['task_id'], 'from': msg['sender'], 'to': msg['receiver']})
        result = self.worker.execute(task)
        msg2 = create_message('worker', 'evaluator', {'result_preview': str(result)[:60]})
        Observability.log('a2a_msg', {'id': msg2['task_id'], 'from': msg2['sender'], 'to': msg2['receiver']})
        eval_result = self.evaluator.evaluate(result)
        eval_result['response'] = f"{eval_result.get('response','')}\n\n{PRIVACY_DISCLAIMER}"
        self.memory.store('last_question', user_input)
        self.memory.store('last_response', eval_result['response'])
        Observability.log('end', {'confidence': eval_result['confidence']})
        return eval_result

def run_agent(user_input, document_content=None, document_language='auto', preferred_language='en'):
    return MainAgent().handle_message(user_input, document_content, document_language, preferred_language)


Writing project/main_agent.py


## 7) UI Strings, API & Demo

In [15]:
%%writefile project/ui/i18n.py
TRANSLATIONS = {
    'en': {'title': 'Expat Legal Aid Advisor', 'welcome': 'Welcome', 'disclaimer': 'Privacy Notice'},
    'es': {'title': 'Asesor Legal', 'welcome': 'Bienvenido', 'disclaimer': 'Aviso de privacidad'},
    'fr': {'title': 'Conseiller Juridique', 'welcome': 'Bienvenue', 'disclaimer': 'Avis de confidentialit√©'},
    'nl': {'title': 'Expat Juridisch Advies', 'welcome': 'Welkom', 'disclaimer': 'Privacyverklaring'},
    'de': {'title': 'Expat-Rechtsberatung', 'welcome': 'Willkommen', 'disclaimer': 'Datenschutzerkl√§rung'}
}

LANGUAGE_MAP = {
    'en': 'English',
    'es': 'Spanish',
    'fr': 'French',
    'nl': 'Dutch',
    'de': 'German'
}

def t(key, lang='en'):
    return TRANSLATIONS.get(lang, TRANSLATIONS['en']).get(key, key)

def get_language_name(lang_code):
    """Return human-readable language name for LLM prompt."""
    return LANGUAGE_MAP.get(lang_code, lang_code)


Writing project/ui/i18n.py


In [16]:
%%writefile project/app.py
from flask import Flask, request, jsonify
from flask_httpauth import HTTPBasicAuth
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import os
from project.main_agent import run_agent
from project.core.context_engineering import PRIVACY_DISCLAIMER


def create_app():
    app = Flask(__name__)
    auth = HTTPBasicAuth()
    limiter = Limiter(app=app, key_func=get_remote_address, storage_uri='memory:///')

    @auth.verify_password
    def verify_password(username, password):
        api_key = request.headers.get('X-API-Key') or password
        if api_key and api_key == os.getenv('FLASK_API_KEY'):
            return username
        return None

    @app.route('/query', methods=['POST'])
    @auth.login_required
    @limiter.limit('10 per minute')
    def query():
        data = request.json or {}
        agent_response = run_agent(data.get('input', ''), data.get('document_content'))
        return jsonify({'response': agent_response, 'privacy': PRIVACY_DISCLAIMER})

    @app.errorhandler(429)
    def ratelimit_handler(e):
        return jsonify({'code': 429, 'name': 'Rate Limit Exceeded', 'description': 'You have exceeded your rate limit.'}), 429

    return app

if __name__ == '__main__':
    app = create_app()
    app.run(host='0.0.0.0', port=5000)

Writing project/app.py


In [17]:
%%writefile project/run_demo.py
import os
import sys
from unittest.mock import patch

print("DEBUG: Starting run_demo.py script", flush=True)

if __name__ == '__main__':
    print("DEBUG: Inside __main__ block.", flush=True)
    if os.getenv('E2E_TEST_MODE') == 'true':
        print("DEBUG: E2E_TEST_MODE is ON. Applying mock.", flush=True)
        try:
            # The patch needs to wrap the execution of run_agent
            with patch('project.tools.tools.GeminiLLM') as MockGeminiLLM: # Patch the actual class used by Worker
                mock = MockGeminiLLM.return_value
                mock.generate_response.return_value = 'Mocked LLM response for Hello! This is a demo.'
                print("DEBUG: GeminiLLM mocked.", flush=True)
                from project.main_agent import run_agent # Import here to ensure it uses the patched GeminiLLM when instantiated
                print("Running agent...", flush=True)
                result = run_agent('Hello! This is a demo.')
                print("DEBUG: Agent run completed.", flush=True)
                print(result.get('response', 'Error: No response key in agent output.'), flush=True)
                print("Agent run finished.", flush=True)
        except Exception as e:
            print(f"ERROR DURING MOCKED AGENT EXECUTION: {e}", file=sys.stderr, flush=True)
            sys.exit(1)
    else:
        print("DEBUG: E2E_TEST_MODE is OFF. Running real agent.", flush=True)
        from project.main_agent import run_agent
        print("Running agent...", flush=True)
        try:
            result = run_agent('Hello! This is a demo.')
            print("DEBUG: Agent run completed.", flush=True)
            print(result.get('response', 'Error: No response key in agent output.'), flush=True)
            print("Agent run finished.", flush=True)
        except Exception as e:
            print(f"ERROR DURING REAL AGENT EXECUTION: {e}", file=sys.stderr, flush=True)
            sys.exit(1)

print("DEBUG: End of run_demo.py script.", flush=True)

Writing project/run_demo.py


## 8) Coverage & README

In [18]:

%%writefile .coveragerc
[run]
source = project


Writing .coveragerc


In [19]:

%%writefile README.md
# Expat Legal Aid Advisor (Colab-ready)
Full multi-agent project with tests, coverage, Flask API, and Gradio UI (mandatory).


Writing README.md


## 9) Tests

In [20]:
%%writefile project/tests/test_unit.py
import os, requests, pytest
from unittest.mock import patch
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator
from project.main_agent import run_agent
from project.core.context_engineering import sanitize_input, ContextEngine
from project.core.observability import Observability
from project.core.a2a_protocol import create_message
from project.memory.session_memory import SessionMemory
from project.tools.tools import SafeCalculator, summarizer, GoogleTranslator, GeminiLLM, SimpleSearch, DomainTools, extract_pdf_text
import logging

def test_evaluator_confidence_and_polish():
    e = Evaluator()
    assert e.evaluate('short')['confidence'] >= 0.55
    long = 'a' * 600
    assert e._estimate_confidence(long) == 0.92
    assert e._polish_text('') == 'I could not generate a meaningful answer based on the provided information.'
    assert e._polish_text('Document processed. Some text.').startswith('After reviewing your document,')

def test_planner_action_and_sanitize():
    from project.agents.planner import Planner
    plan = Planner().plan('<b>Hi</b>')
    assert plan['tasks'][0]['action'] == 'process'
    assert sanitize_input('<i>x</i>') == 'x'

def test_planner_with_language_parameters():
    """Test that Planner correctly passes language parameters."""
    from project.agents.planner import Planner
    p = Planner()
    plan = p.plan('¬øHola?', None, 'es', 'es')
    assert plan['tasks'][0]['details']['document_language'] == 'es'
    assert plan['tasks'][0]['details']['preferred_language'] == 'es'

def test_context_engine_builds_context():
    ctx = ContextEngine().build_context('Q?', {'user': 'alice'}, 'doc text')
    assert "Context(session={'user': 'alice'})" in ctx
    assert 'Input=Q?' in ctx
    assert 'Document=doc text' in ctx

def test_observability_logs(caplog):
    caplog.set_level(logging.INFO) # Set logging level to INFO
    Observability.log('start', {'a': 1})
    assert any('"event": "start"' in rec.message for rec in caplog.records)

def test_a2a_message_has_fields():
    m = create_message('planner', 'worker', {'x': 1})
    assert {'task_id','sender','receiver','payload','timestamp'}.issubset(m.keys())

def test_session_memory_with_secret(monkeypatch):
    from cryptography.fernet import Fernet
    key = Fernet.generate_key().decode()
    monkeypatch.setenv('SESSION_SECRET', key)
    mem = SessionMemory()
    mem.store('k','v')
    assert mem.retrieve('k') == 'v'

def test_session_memory_without_secret(monkeypatch):
    monkeypatch.delenv('SESSION_SECRET', raising=False)
    mem = SessionMemory()
    mem.store('k','v')
    assert mem.retrieve('k') == 'v'

def test_tools_calculator_and_summarizer():
    assert SafeCalculator.evaluate('1+2') == 3
    assert SafeCalculator.evaluate('+10') == 10
    assert 'Invalid' in SafeCalculator.evaluate('foo(1)')
    assert summarizer('x'*300).endswith('...')

def test_simple_search_and_domain_tools():
    s = SimpleSearch()
    s.add('d1', 'visa application requires documents')
    s.add('d2', 'residence permit and study visa')
    hits = s.query('visa application', top_k=2)
    assert len(hits) >= 1
    info = DomainTools.extract_visa_requirements('You need a work permit and visa.')
    assert info['has_visa_context'] and 'visa' in info['matched_keywords']

def test_extract_pdf_text(tmp_path):
    from reportlab.pdfgen import canvas
    pdf_path = tmp_path / 'test.pdf'
    c = canvas.Canvas(str(pdf_path)) # Convert PosixPath to string
    c.drawString(100, 750, 'This is a PDF test for extraction.')
    c.save()
    text = extract_pdf_text(str(pdf_path))
    assert 'PDF test for extraction' in text

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_worker_translates_q_and_doc_and_passes_citations(MockTrans, MockLLM):
    mock_llm = MockLLM.return_value
    mock_llm.generate_response.return_value = 'LLM generated response'
    mock_trans = MockTrans.return_value
    mock_trans.translate.side_effect = ['Translated Question', 'Translated Document']
    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    task = {'action':'process','details':{'user_input':'¬øRequisitos?', 'document':'documento en espa√±ol', 'document_language': 'es', 'preferred_language': 'es'}}
    out = w.execute(task)
    assert 'LLM generated response' in out # Updated assertion
    assert mock_trans.translate.call_count >= 2
    args, kwargs = mock_llm.generate_response.call_args
    assert 'citations' in kwargs
    del os.environ['GOOGLE_API_KEY']

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_worker_domain_hint_appended(MockTrans, MockLLM):
    mock_llm = MockLLM.return_value
    mock_llm.generate_response.return_value = 'LLM generated response'
    mock_trans = MockTrans.return_value
    mock_trans.translate.side_effect = ['Translated Question', 'Translated Document mentioning visa']
    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    task = {'action':'process','details':{'user_input':'¬øRequisitos?', 'document':'visa doc', 'document_language': 'auto', 'preferred_language': 'es'}}
    out = w.execute(task)
    assert 'Detected visa-related context' in out
    del os.environ['GOOGLE_API_KEY']

def test_worker_unknown_action():
    with patch('project.agents.worker.GeminiLLM'):
        w = Worker()
    assert w.execute({'action': 'translate', 'details': {'user_input':'hi'}}) == 'Unknown action'

def test_worker_language_auto_detection():
    """Test that Worker detects document language when 'auto' is specified."""
    with patch('project.agents.worker.GeminiLLM'):
        w = Worker()
        # Spanish text
        spanish_text = 'Este es un documento en espa√±ol sobre visa de trabajo.'
        detected = w._detect_language(spanish_text)
        assert detected in ['es', 'en']  # Might detect as Spanish or fallback to English

@patch('project.tools.tools.requests')
def test_google_translator_success(mock_requests):
    mock_requests.post.return_value.raise_for_status.return_value = None
    mock_requests.post.return_value.json.return_value = {'data': {'translations':[{'translatedText':'Hola'}]}}
    os.environ['GOOGLE_API_KEY'] = 'key'
    tr = GoogleTranslator()
    assert tr.translate('Hello', 'es') == 'Hola'
    del os.environ['GOOGLE_API_KEY']

@patch('project.tools.tools.requests')
def test_google_translator_api_error(mock_requests):
    mock_requests.post.return_value.raise_for_status.side_effect = requests.exceptions.RequestException('API Error')
    os.environ['GOOGLE_API_KEY'] = 'key'
    tr = GoogleTranslator()
    with pytest.raises(requests.exceptions.RequestException):
        tr.translate('Hello','es')
    del os.environ['GOOGLE_API_KEY']

def test_gemini_llm_init_no_key(monkeypatch):
    from project.tools.tools import GeminiLLM
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)
    with pytest.raises(RuntimeError, match='missing for GeminiLLM'):
        GeminiLLM()

@patch('google.generativeai.GenerativeModel')
def test_gemini_llm_generate_response(MockModel, monkeypatch):
    from project.tools.tools import GeminiLLM
    mock = MockModel.return_value
    mock.generate_content.return_value.text = 'Mocked'
    monkeypatch.setenv('GOOGLE_API_KEY','k')
    llm = GeminiLLM()
    assert llm.generate_response('Hola?', 'Hello?') == 'Mocked'
    mock.generate_content.side_effect = Exception('LLM down')
    assert llm.generate_response('Hola?', 'Hello?') == 'Unable to generate response at this time.'
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)

@patch('google.generativeai.GenerativeModel')
def test_gemini_llm_with_language_enforcement(MockModel, monkeypatch):
    """Test that GeminiLLM appends language enforcement to prompt."""
    from project.tools.tools import GeminiLLM
    mock = MockModel.return_value
    mock.generate_content.return_value.text = 'Spanish response'
    monkeypatch.setenv('GOOGLE_API_KEY','k')
    llm = GeminiLLM()
    result = llm.generate_response('¬øHola?', reply_language='es')
    # Verify that the prompt included language enforcement
    call_args = mock.generate_content.call_args
    prompt = call_args[0][0] if call_args[0] else ''
    assert 'Spanish' in prompt or 'reply ONLY' in prompt
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_run_agent_returns_polished_dict(MockTrans, MockLLM, monkeypatch):
    MockLLM.return_value.generate_response.return_value = 'Mocked LLM response for Hello!'
    MockTrans.return_value.translate.side_effect = ['Translated Question']
    monkeypatch.setenv('GOOGLE_API_KEY','k')
    out = run_agent('Hello!')
    assert isinstance(out, dict) and 'response' in out and 'confidence' in out
    assert ('Here is my assessment' in out['response']) or ('After reviewing your document' in out['response'])
    assert 'Privacy Notice' in out['response']
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_run_agent_with_language_parameters(MockTrans, MockLLM, monkeypatch):
    """Test that run_agent accepts and passes language parameters."""
    MockLLM.return_value.generate_response.return_value = 'Spanish response'
    MockTrans.return_value.translate.return_value = 'Translated'
    monkeypatch.setenv('GOOGLE_API_KEY','k')
    out = run_agent('¬øPregunta?', None, 'es', 'es')
    assert isinstance(out, dict)
    assert 'response' in out and 'confidence' in out
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_worker_translates_document_to_preferred_language(MockTrans, MockLLM):
    """FIX #5: Test that document is translated to preferred language, not always English."""
    mock_llm = MockLLM.return_value # Corrected typo from MockLLm to MockLLM
    mock_llm.generate_response.return_value = 'Spanish response about visa'
    mock_trans = MockTrans.return_value
    # First call: question to English, Second call: document to Spanish
    mock_trans.translate.side_effect = ['Pregunta en ingl√©s', 'Documento traducido al espa√±ol']

    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    task = {
        'action': 'process',
        'details': {
            'user_input': '¬øCuales son los requisitos?',
            'document': 'Este es un documento en espa√±ol sobre visa',
            'document_language': 'es',
            'preferred_language': 'es'
        }
    }
    out = w.execute(task)

    # Verify translator was called with preferred language as target
    assert mock_trans.translate.call_count >= 2
    # Check that one of the calls targeted 'es' (preferred language)
    calls = [call[1].get('target') for call in mock_trans.translate.call_args_list if call[1]]
    assert 'es' in calls, f"Expected 'es' in translation targets, got {calls}"

    assert 'Spanish response about visa' in out # Updated assertion
    del os.environ['GOOGLE_API_KEY']

def test_google_translator_no_api_key(monkeypatch):
    """GoogleTranslator should raise when GOOGLE_API_KEY is missing."""
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)
    with pytest.raises(RuntimeError):
        GoogleTranslator()

@patch('project.tools.tools.requests')
def test_google_translator_retries_on_timeout(mock_requests, monkeypatch):
    """Simulate a transient timeout on first POST, success on retry."""
    from requests.exceptions import Timeout
    class FakeResp:
        def raise_for_status(self):
            return None
        def json(self):
            return {'data': {'translations':[{'translatedText':'Hola'}]}}

    # Set API key before instantiating GoogleTranslator
    monkeypatch.setenv('GOOGLE_API_KEY', 'key')

    call_count = {'n': 0}
    def post_side_effect(*args, **kwargs):
        if call_count['n'] == 0:
            call_count['n'] += 1
            raise Timeout('simulated timeout')
        return FakeResp()
    with patch('project.tools.tools.requests.post', side_effect=post_side_effect):
        tr = GoogleTranslator()
        result = tr.translate('Hello', 'es')
        assert result == 'Hola'
    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_worker_translate_exception_uses_original_doc(MockTrans, MockLLM):
    """If translator fails, Worker should pass the original document to the LLM (fallback path)."""
    mock_llm = MockLLM.return_value
    def capture_generate(*args, **kwargs):
        # When translation fails document_translated is None, so document_content_original should be passed
        assert kwargs.get('document_content_original') is not None, 'Expected original document in LLM args on translation failure'
        return 'LLM fallback response'
    mock_llm.generate_response.side_effect = capture_generate
    mock_trans = MockTrans.return_value
    # Simulate translator failing for any call
    mock_trans.translate.side_effect = Exception('forced translator failure')
    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    task = {'action':'process','details':{'user_input':'Pregunta','document':'documento original','document_language':'es','preferred_language':'es'}}
    out = w.execute(task)
    assert 'LLM fallback response' in out # Updated assertion
    del os.environ['GOOGLE_API_KEY']

@patch('project.agents.worker.GeminiLLM')
def test_worker_translate_safe_none(MockLLM):
    """_translate_safe should return None when given None and should not raise."""
    with patch('project.agents.worker.GeminiLLM'):
        w = Worker()
        assert w._translate_safe(None) is None

def test_worker_empty_text_detection():
    """Test _detect_language with empty/short text returns 'en' as fallback."""
    with patch('project.agents.worker.GeminiLLM'):
        w = Worker()
        assert w._detect_language('') == 'en'
        assert w._detect_language('a') == 'en'
        assert w._detect_language(None) == 'en'

@patch('project.agents.worker.GeminiLLM')
def test_worker_execute_without_document(MockLLM):
    """Test Worker.execute when no document is provided."""
    mock_llm = MockLLM.return_value
    mock_llm.generate_response.return_value = 'LLM response'
    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    task = {
        'action': 'process',
        'details': {
            'user_input': 'What are visa requirements?',
            'document': None,
            'document_language': 'auto',
            'preferred_language': 'en'
        }
    }
    out = w.execute(task)
    assert out == 'LLM response'
    del os.environ['GOOGLE_API_KEY']

@patch('project.agents.worker.GeminiLLM')
@patch('project.agents.worker.GoogleTranslator')
def test_worker_document_language_auto_detection_flow(MockTrans, MockLLM):
    """Test Worker auto-detects document language and translates accordingly."""
    mock_llm = MockLLM.return_value
    mock_llm.generate_response.return_value = 'Analyzed response'
    mock_trans = MockTrans.return_value
    mock_trans.translate.side_effect = ['Question EN', 'Document EN']
    os.environ['GOOGLE_API_KEY'] = 'fake'
    w = Worker()
    spanish_doc = 'Este es un documento sobre visa y permiso de residencia.'
    task = {
        'action': 'process',
        'details': {
            'user_input': '¬øRequisitos?',
            'document': spanish_doc,
            'document_language': 'auto',
            'preferred_language': 'en'
        }
    }
    out = w.execute(task)
    assert 'Analyzed response' in out # Updated assertion
    # Verify translator was called
    assert mock_trans.translate.call_count >= 1
    del os.environ['GOOGLE_API_KEY']

def test_summarizer_edge_cases():
    """Test summarizer with various edge cases."""
    assert summarizer('') == ''
    assert summarizer(None) == ''
    assert summarizer('short text') == 'short text'
    long_text = 'x' * 250
    result = summarizer(long_text)
    assert result.endswith('...')
    assert len(result) == 203  # 200 + '...'

@patch('project.tools.tools.requests')
def test_google_translator_with_different_languages(mock_requests):
    """Test GoogleTranslator with various language pairs."""
    mock_requests.post.return_value.raise_for_status.return_value = None
    mock_requests.post.return_value.json.return_value = {'data': {'translations':[{'translatedText':'Bonjour'}]}}
    os.environ['GOOGLE_API_KEY'] = 'key'
    tr = GoogleTranslator()
    result = tr.translate('Hello', target='fr')
    assert result == 'Bonjour'
    # Verify the call was made with correct parameters
    call_args = mock_requests.post.call_args
    assert call_args[1]['json']['target'] == 'fr'
    del os.environ['GOOGLE_API_KEY']

def test_safe_calculator_division():
    """Test SafeCalculator with division operations."""
    assert SafeCalculator.evaluate('10/2') == 5.0
    assert SafeCalculator.evaluate('5/2') == 2.5

def test_safe_calculator_complex_expression():
    """Test SafeCalculator with complex expressions."""
    assert SafeCalculator.evaluate('2 + 3 * 4') == 14
    assert SafeCalculator.evaluate('(2 + 3) * 4') == 20


Writing project/tests/test_unit.py


In [21]:
%%writefile project/tests/test_integration.py
import os
from unittest.mock import patch
from project.main_agent import MainAgent
import logging

@patch('project.main_agent.Worker')
def test_integration_with_observability_and_a2a(MockWorker, monkeypatch, caplog):
    # Ensure caplog captures INFO level messages
    caplog.set_level(logging.INFO)

    mock_w = MockWorker.return_value
    mock_w.execute.return_value = 'Mocked LLM response for Check visa'
    monkeypatch.setenv('GOOGLE_API_KEY','k')
    res = MainAgent().handle_message('Check visa')
    assert isinstance(res, dict) and 'response' in res and 'confidence' in res

    # Check for log messages. The messages are JSON strings, so we need to check if the string contains the JSON fragment.
    assert any('"event": "start"' in r.message for r in caplog.records), "'start' event not found in logs"
    assert any('"event": "end"' in r.message for r in caplog.records), "'end' event not found in logs"

    monkeypatch.delenv('GOOGLE_API_KEY', raising=False)


Writing project/tests/test_integration.py


In [22]:
%%writefile project/tests/test_app.py
import os, pytest
from project.app import create_app # Import create_app factory
from flask_limiter import Limiter # Still needed for type hinting or if you manually manipulate Limiter
from flask_limiter.util import get_remote_address
from unittest.mock import patch

@pytest.fixture
def client(monkeypatch):
    # Create a fresh app for each test
    test_app = create_app() # Get app
    test_app.config['TESTING'] = True
    monkeypatch.setenv('FLASK_API_KEY', 'test_api_key')

    # Push an application context to ensure everything is set up correctly
    with test_app.test_client() as c:
        with test_app.app_context(): # Ensure app context for extensions like limiter
            yield c
    monkeypatch.delenv('FLASK_API_KEY', raising=False)

def test_auth_required(client):
    r = client.post('/query', json={'input': 'hello'})
    assert r.status_code in (401, 403)

@patch('project.app.run_agent')
def test_query_success(mock_run_agent, client):
    mock_run_agent.return_value = {'response': 'ok', 'confidence': 0.9}
    headers = {'X-API-Key': 'test_api_key'}
    r = client.post('/query', headers=headers, json={'input': 'hello', 'document_content': 'text'})
    assert r.status_code == 200
    data = r.get_json()
    assert 'response' in data and 'privacy' in data
    assert data['response']['response'] == 'ok'

@patch('project.app.run_agent')
def test_rate_limiting(mock_run_agent, client):
    mock_run_agent.return_value = {'response': 'ok', 'confidence': 0.9}
    headers = {'X-API-Key': 'test_api_key'}
    for i in range(10):
        r = client.post('/query', headers=headers, json={'input': f'hello {i}'})
        assert r.status_code == 200
    r = client.post('/query', headers=headers, json={'input': 'exceed'})
    assert r.status_code == 429
    assert r.get_json()['description'] == 'You have exceeded your rate limit.'


Writing project/tests/test_app.py


In [23]:
%%writefile project/tests/test_e2e.py
import subprocess, sys, os

def test_e2e(monkeypatch):
    env = os.environ.copy()
    env['GOOGLE_API_KEY'] = 'fake_key_for_e2e'
    env['E2E_TEST_MODE'] = 'true'
    p = subprocess.run([sys.executable, 'project/run_demo.py'], capture_output=True, text=True, env=env)
    stdout = p.stdout # Use raw stdout for assertion
    # Check for specific debug messages and the mocked response
    assert "E2E_TEST_MODE is ON. Initializing mocked LLM." in stdout
    assert "Running agent..." in stdout
    assert "Mocked LLM response for Hello! This is a demo." in stdout
    assert "Agent run finished." in stdout
    assert p.returncode == 0, f"Subprocess failed with error: {p.stderr}"


Writing project/tests/test_e2e.py


## 10) Run Tests & Coverage (‚â• 90%)

In [24]:
import subprocess

print("Re-running tests and generating coverage report...")
# Run pytest with coverage
pytest_command = ['coverage', 'run', '--rcfile=.coveragerc', '-m', 'pytest', 'project/tests']
pytest_process = subprocess.run(pytest_command, capture_output=True, text=True, check=False)
print(pytest_process.stdout)
print(pytest_process.stderr)

# Generate and print the coverage report
coverage_report_command = ['coverage', 'report', '--rcfile=.coveragerc', '--show-missing']
coverage_report_process = subprocess.run(coverage_report_command, capture_output=True, text=True, check=False)
print(coverage_report_process.stdout)
print('\u2705 Test execution and coverage report generated.')

Re-running tests and generating coverage report...
platform linux -- Python 3.12.12, pytest-8.4.2, pluggy-1.6.0
rootdir: /content
plugins: langsmith-0.4.47, typeguard-4.4.4, anyio-4.11.0
collected 39 items

project/tests/test_app.py ...                                            [  7%]
project/tests/test_e2e.py F                                              [ 10%]
project/tests/test_integration.py .                                      [ 12%]
project/tests/test_unit.py ..................................            [100%]

___________________________________ test_e2e ___________________________________

monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7dd53d9460c0>

    def test_e2e(monkeypatch):
        env = os.environ.copy()
        env['GOOGLE_API_KEY'] = 'fake_key_for_e2e'
        env['E2E_TEST_MODE'] = 'true'
        p = subprocess.run([sys.executable, 'project/run_demo.py'], capture_output=True, text=True, env=env)
        stdout = p.stdout # Use raw stdout for assertio

In [None]:
import gradio as gr
from project.main_agent import run_agent
from project.tools.tools import extract_pdf_text, extract_docx_text
from project.ui.i18n import t
import subprocess, os

MAX_Q = 15
MAX_Q_LEN = 5000
MAX_DOC_LEN = 1000000
VALID_LANGS = {'auto', 'en', 'es', 'fr', 'nl', 'de'}

def read_doc_file(doc_path):
    """Convert .doc to .txt using LibreOffice headless conversion."""
    try:
        txt_path = doc_path.replace('.doc', '.txt')
        subprocess.run(['soffice', '--headless', '--convert-to', 'txt', '--outdir', os.path.dirname(doc_path), doc_path], check=True, timeout=30, capture_output=True)
        with open(txt_path, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        raise RuntimeError("‚ùå LibreOffice not installed. Please install it or upload a .docx/.pdf file instead.")
    except subprocess.TimeoutExpired:
        raise RuntimeError("‚ùå .doc conversion timed out. Document may be too large.")
    except Exception as e:
        raise RuntimeError(f"‚ùå Failed to convert .doc file: {str(e)}")

def validate_inputs(user_input, doc_content, doc_lang, pref_lang):
    """Validate user inputs before processing."""
    errors = []

    # Validate question
    if not user_input or len(user_input.strip()) == 0:
        errors.append("‚ùå Question cannot be empty.")
    elif len(user_input) > MAX_Q_LEN:
        errors.append(f"‚ùå Question too long (max {MAX_Q_LEN} chars).")

    # Validate document
    if doc_content and len(doc_content) > MAX_DOC_LEN:
        errors.append(f"‚ùå Document too large (max {MAX_DOC_LEN} chars).")

    # Validate language codes
    if doc_lang not in VALID_LANGS:
        errors.append(f"‚ùå Invalid document language: {doc_lang}")
    if pref_lang not in {'en', 'es', 'fr', 'nl', 'de'}:
        errors.append(f"‚ùå Invalid reply language: {pref_lang}")

    return errors

def process_input(user_input, legal_document, ui_lang, pref_lang, doc_lang, consent_given, counter_state):
    print("--- Starting process_input function ---")
    if not consent_given:
        print("--- Consent not given ---")
        return (gr.update(value=f"### {t('title', ui_lang)}\n\n{t('welcome', ui_lang)}\n\n**{t('disclaimer', ui_lang)}**\n\nPlease agree to the privacy notice to proceed."), counter_state)

    current = counter_state or 0
    if current >= MAX_Q:
        print("--- Question limit reached ---")
        return (gr.update(value=f"### {t('title', ui_lang)}\n\nLimit reached: You can ask a maximum of {MAX_Q} questions per session."), current)

    doc_content = None
    try:
        if legal_document is not None:
            name = legal_document.name.lower()
            print(f"--- Processing document: {name} ---")
            if name.endswith('.pdf'):
                doc_content = extract_pdf_text(legal_document.name)
            elif name.endswith('.docx'):
                doc_content = extract_docx_text(legal_document.name)
            elif name.endswith('.doc'):
                doc_content = read_doc_file(legal_document.name)
            else:
                with open(legal_document.name, 'r', encoding='utf-8') as f:
                    doc_content = f.read()
            print(f"--- Document processed, content length: {len(doc_content) if doc_content else 0} ---")
    except Exception as e:
        print(f"--- Error reading document: {type(e).__name__} - {str(e)} ---")
        return (gr.update(value=f"### {t('title', ui_lang)}\n\n**Error reading document:**\n\n**Error Type:** {type(e).__name__}\n**Message:** {str(e)}\n\nPlease try another file."), current)

    # Validate all inputs
    validation_errors = validate_inputs(user_input, doc_content, doc_lang, pref_lang)
    if validation_errors:
        error_msg = "\n".join(validation_errors)
        print(f"--- Input validation error: {error_msg} ---")
        return (gr.update(value=f"### {t('title', ui_lang)}\n\n**Input Error:**\n\n{error_msg}"), current)

    try:
        print("--- Running main agent ---")
        result = run_agent(user_input, doc_content, doc_lang, pref_lang)
        print("--- Main agent finished ---")
    except Exception as e:
        print(f"--- Processing Error from main agent: {str(e)} ---")
        return (gr.update(value=f"### {t('title', ui_lang)}\n\n**Processing Error:**\n\n{str(e)}\n\nPlease try again."), current)

    current += 1
    print("--- Returning result from process_input ---")
    return (gr.update(value=f"### {t('title', ui_lang)}\n\n{result['response']}\n\n**Confidence:** {result['confidence']}"), current)

with gr.Blocks() as demo:
    gr.Markdown('# Expat Legal Aid Advisor')
    consent_group = gr.Group(visible=True)
    with consent_group:
        gr.Markdown('**Privacy Notice**')
        gr.Markdown("Your input may contain sensitive legal information. We do not persist document contents.")
        consent = gr.Checkbox(label='I agree to the privacy notice', value=False)
    main_group = gr.Group(visible=False)
    with main_group:
        # Input section
        gr.Markdown('### üìù Your Input')
        user_in = gr.Textbox(label='ü§î Your Legal Question', placeholder='Ask in any language (e.g., English, Spanish, French, etc.)', lines=3)
        file_in = gr.File(label='üìÑ Legal Document (Optional)', file_count='single') # Removed file_types constraint

        # Language configuration section
        gr.Markdown("### üåê Language Configuration")
        gr.Markdown(
            '**How it works:** The system will translate your document to your chosen communication language, \n'  # Corrected line
            'analyze it, and respond in that language. Auto-detection identifies the document language automatically.'
        )

        # FIX #2: Define all language dropdowns at same scope level (outside Row) to avoid scope issues
        ui_lang = gr.Dropdown(
            choices=['en', 'es', 'fr', 'nl', 'de'],
            value='en',
            label='üé® UI Display Language',
            info='Language for interface labels and messages'
        )

        # Create a row for communication and document language dropdowns
        with gr.Row():
            pref_lang = gr.Dropdown(
                choices=['en', 'es', 'fr', 'nl', 'de'],
                value='en',
                label='üí¨ Communication Language',
                info='Select the language you want to communicate in and receive responses'
            )
            doc_lang = gr.Dropdown(
                choices=['auto', 'en', 'es', 'fr', 'nl', 'de'],
                value='auto',
                label='üìã Document Language',
                info='Choose language or select "auto" to auto-detect'
            )

        # Translation flow info
        gr.Markdown(
            '**üîÑ Translation Flow:**\n'
            '1. Document language is detected or you specify it\n'
            '2. Document is translated to your communication language\n'
            '3. System analyzes and reasons over the translated content\n'
            '4. Response is generated in your chosen language'
        )

        gr.Markdown('**‚úÖ Supported Languages:** English, Spanish, French, Dutch, German')

        # Submit section
        gr.Markdown('### ‚ö° Process')
        submit = gr.Button('Submit', variant='primary')
        out = gr.Markdown()
        counter_state = gr.State(0)

    def toggle(consent_val):
        return gr.update(visible=not consent_val), gr.update(visible=consent_val)
    consent.change(toggle, inputs=[consent], outputs=[consent_group, main_group])
    submit.click(fn=process_input, inputs=[user_in, file_in, ui_lang, pref_lang, doc_lang, consent, counter_state], outputs=[out, counter_state])

demo.launch(debug=True)


Overwriting project/gradio_ui.py


## 12) Smoke Tests

### 12.1) Agent Smoke (mock LLM to verify Planner‚ÜíWorker‚ÜíEvaluator pipeline)

In [None]:
%%writefile project/tests/smoke_test.py
import os
from unittest.mock import patch
from project.main_agent import run_agent

# Ensure the directory exists before writing the file
os.makedirs('project/tests', exist_ok=True)

file_content = """\
import os
from unittest.mock import patch
from project.main_agent import run_agent
os.environ['E2E_TEST_MODE'] = 'true'
USE_MOCK_FOR_FALLBACK_SMOKE = True
if USE_MOCK_FOR_FALLBACK_SMOKE:
    with patch('project.tools.tools.GeminiLLM') as MockGeminiLLM:
        mock = MockGeminiLLM.return_value
        mock.generate_response.return_value = 'Mocked response: Pipeline OK.'
        res = run_agent('¬øCu√°les son los requisitos de visa?', 'Este documento menciona visa y permiso.', 'es', 'es')
        print('Response:', res['response'][:200])
        print('Confidence:', res['confidence'])
else:
    res = run_agent('What are visa requirements?', 'This document mentions visa and permit.', 'en', 'en')
    print('Response:', res['response'][:200])
    print('Confidence:', res['confidence'])
print('‚úÖ Agent smoke test executed.')
"""

with open('project/tests/smoke_test.py', 'w', encoding='utf-8') as f:
    f.write(file_content)

print('Writing project/tests/smoke_test.py')

### 12.2) PDF Parsing Check (generate a PDF and extract its text)

In [None]:
%%writefile -a project/tests/smoke_test.py
import os
from reportlab.pdfgen import canvas
from project.tools.tools import extract_pdf_text

# Ensure the directory exists
os.makedirs('project/tests', exist_ok=True)

file_content = """\
from reportlab.pdfgen import canvas
from project.tools.tools import extract_pdf_text
pdf_path = 'smoke_test.pdf'
c = canvas.Canvas(pdf_path)
c.drawString(100, 750, 'This is a smoke test PDF with visa and permit text.')
c.save()
parsed_text = extract_pdf_text(pdf_path)
print('Parsed PDF contains:', 'visa' in parsed_text and 'permit' in parsed_text)
print('Parsed snippet:', parsed_text[:120])
"""

with open('project/tests/smoke_test.py', 'a', encoding='utf-8') as f:
    f.write(file_content)

print('Appending to project/tests/smoke_test.py for PDF parsing check.')

### 12.3) DOCX Parsing Check (generate a DOCX and extract its text)

In [None]:
import os
from docx import Document
from project.tools.tools import extract_docx_text

# Ensure the directory exists
os.makedirs('project/tests', exist_ok=True)

file_content = """\
from docx import Document
from project.tools.tools import extract_docx_text

path = 'smoke_test.docx'
doc = Document()
doc.add_paragraph('This is a smoke test DOCX with residence permit text.')
doc.save(path)
parsed_docx = extract_docx_text(path)
print('Parsed DOCX contains:', 'residence permit' in parsed_docx)
print('Parsed snippet:', parsed_docx[:120])
"""

with open('project/tests/smoke_test.py', 'a', encoding='utf-8') as f:
    f.write(file_content)

print('Appending to project/tests/smoke_test.py for DOCX parsing check.')

### 12.4) Translation Failure Fallback (force translator to fail; LLM uses original doc)

In [None]:
%%writefile -a project/tests/smoke_test.py
import os
from unittest.mock import patch
from project.main_agent import run_agent

# Ensure the directory exists
os.makedirs('project/tests', exist_ok=True)

file_content = """\
from unittest.mock import patch
from project.main_agent import run_agent

# Force translator failure
with patch('project.tools.tools.GoogleTranslator.translate', side_effect=Exception('Forced failure')):
    res = run_agent('Pregunta en espa√±ol sobre residencia', 'Documento original en espa√±ol con detalles de visa y permiso de trabajo.')
    print('Response (fallback path):', res['response'][:200])
    print('Confidence:', res['confidence'])
print('‚úÖ Fallback smoke test executed.')
"""

with open('project/tests/smoke_test.py', 'a', encoding='utf-8') as f:
    f.write(file_content)

print('Appending to project/tests/smoke_test.py for Translation Fallback check.')

In [None]:
%%writefile -a project/tests/smoke_test.py
from reportlab.pdfgen import canvas
from project.tools.tools import extract_pdf_text
pdf_path = 'smoke_test.pdf'
c = canvas.Canvas(str(pdf_path)) # Convert PosixPath to string
c.drawString(100, 750, 'This is a smoke test PDF with visa and permit text.')
c.save()
parsed_text = extract_pdf_text(pdf_path)
print('Parsed PDF contains:', 'visa' in parsed_text and 'permit' in parsed_text)
print('Parsed snippet:', parsed_text[:120])

