diff --git a/backend/app/main.py b/backend/app/main.py index 5074d1c..7ff74a0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,3 +1,9 @@ +from pathlib import Path + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles + from backend.app.api import ( routes_analysis, routes_evaluation, @@ -8,8 +14,6 @@ routes_taxonomy_workbench, ) from backend.app.core.logging import configure_logging -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware configure_logging() @@ -38,5 +42,11 @@ app.include_router(routes_reports.router, prefix="/api") @app.get("/health") +@app.get("/api/health") def health() -> dict[str, str]: return {"status": "ok"} + + +FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist" +if FRONTEND_DIST.exists(): + app.mount("/", StaticFiles(directory=FRONTEND_DIST, html=True), name="frontend") diff --git a/build_backend.py b/build_backend.py deleted file mode 100644 index 4018aae..0000000 --- a/build_backend.py +++ /dev/null @@ -1,100 +0,0 @@ -from __future__ import annotations - -import hashlib -import zipfile -from pathlib import Path - -NAME = "argument_risk_engine" -VERSION = "0.1.0" -DIST = f"{NAME}-{VERSION}.dist-info" -ROOT = Path(__file__).parent.resolve() - - -def _metadata() -> str: - return "\n".join([ - "Metadata-Version: 2.1", - "Name: argument-risk-engine", - f"Version: {VERSION}", - "Summary: Local taxonomy-grounded argument risk analysis dashboard.", - "Requires-Python: >=3.10", - "Provides-Extra: dev", - "", - ]) - - -def _entry_points() -> str: - return "\n".join([ - "[console_scripts]", - "uvicorn=uvicorn.main:main", - "", - ]) - - -def _wheel() -> str: - return "\n".join([ - "Wheel-Version: 1.0", - "Generator: argument-risk-engine-local-backend", - "Root-Is-Purelib: true", - "Tag: py3-none-any", - "", - ]) - - -def _record_line(path: str, data: bytes) -> str: - digest = hashlib.sha256(data).digest() - import base64 - encoded = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() - return f"{path},sha256={encoded},{len(data)}" - - -def build_wheel(wheel_directory, config_settings=None, metadata_directory=None): - return _write_wheel(Path(wheel_directory), editable=False) - - -def build_editable(wheel_directory, config_settings=None, metadata_directory=None): - return _write_wheel(Path(wheel_directory), editable=True) - - -def get_requires_for_build_wheel(config_settings=None): - return [] - - -def get_requires_for_build_editable(config_settings=None): - return [] - - -def prepare_metadata_for_build_wheel(metadata_directory, config_settings=None): - dist = Path(metadata_directory) / DIST - dist.mkdir(parents=True, exist_ok=True) - (dist / "METADATA").write_text(_metadata()) - (dist / "WHEEL").write_text(_wheel()) - return DIST - - -def prepare_metadata_for_build_editable(metadata_directory, config_settings=None): - return prepare_metadata_for_build_wheel(metadata_directory, config_settings) - - -def _write_wheel(out_dir: Path, editable: bool) -> str: - out_dir.mkdir(parents=True, exist_ok=True) - filename = f"{NAME}-{VERSION}-py3-none-any.whl" - wheel_path = out_dir / filename - records: list[str] = [] - with zipfile.ZipFile(wheel_path, "w", zipfile.ZIP_DEFLATED) as zf: - files: dict[str, bytes] = { - f"{DIST}/METADATA": _metadata().encode(), - f"{DIST}/WHEEL": _wheel().encode(), - f"{DIST}/entry_points.txt": _entry_points().encode(), - } - if editable: - files["argument_risk_engine_editable.pth"] = f"{ROOT}\n{ROOT / 'engine'}\n".encode() - else: - for base in [ROOT / "engine" / "argument_risk_engine", ROOT / "backend"]: - for path in base.rglob("*.py"): - files[str(path.relative_to(base.parent))] = path.read_bytes() - for path, data in files.items(): - zf.writestr(path, data) - records.append(_record_line(path, data)) - record_path = f"{DIST}/RECORD" - zf.writestr(record_path, "\n".join(records + [f"{record_path},,"]) + "\n") - return filename diff --git a/data/taxonomy/packs/starter-pack.yaml b/data/taxonomy/packs/starter-pack.yaml index 90247a8..fe5278b 100644 --- a/data/taxonomy/packs/starter-pack.yaml +++ b/data/taxonomy/packs/starter-pack.yaml @@ -19,7 +19,14 @@ ], "mitigation": "Ask for scope, counterexamples, and supporting evidence.", "active": true, - "metadata": {} + "metadata": {}, + "negative_examples": [ + "Every backup completed successfully according to the job log." + ], + "minimum_evidence_requirement": "Evidence must show a broad quantifier applied to an unsupported group, behavior, or conclusion, not a bounded or sourced observation.", + "common_false_positives": [ + "Bounded inventory, log, policy, or technical statements using absolute words literally." + ] }, { "id": "unsupported_causal_claim", @@ -37,7 +44,14 @@ ], "mitigation": "Request causal evidence and consider alternative explanations.", "active": true, - "metadata": {} + "metadata": {}, + "negative_examples": [ + "The incident report names three causes and asks for more evidence." + ], + "minimum_evidence_requirement": "Evidence must include causal wording presented as a conclusion without cited support or alternative explanations.", + "common_false_positives": [ + "Cautious causal hypotheses, cited incident reports, or requests for additional causal evidence." + ] }, { "id": "dehumanizing_language", @@ -55,7 +69,14 @@ ], "mitigation": "Escalate for careful human review and contextual assessment.", "active": true, - "metadata": {} + "metadata": {}, + "negative_examples": [ + "The novel describes animals in a literal zoo." + ], + "minimum_evidence_requirement": "Evidence must show dehumanizing terms applied to a person or group of people.", + "common_false_positives": [ + "Literal references to non-human animals, pests, software parasites, or fictional creatures." + ] } ] -} \ No newline at end of file +} diff --git a/engine/argument_risk_engine/analyzer.py b/engine/argument_risk_engine/analyzer.py index 6feb830..bb85e50 100644 --- a/engine/argument_risk_engine/analyzer.py +++ b/engine/argument_risk_engine/analyzer.py @@ -33,10 +33,18 @@ def analyze_text( ) -> dict[str, Any]: taxonomy_pack = pack or default_taxonomy_pack() normalized_text = text or "" + requested_mode = mode or DEFAULT_MODE + requested_provider_id = model_provider_id or DEFAULT_MODEL_PROVIDER_ID + warnings: list[str] = [] + if requested_mode != DEFAULT_MODE or requested_provider_id != DEFAULT_MODEL_PROVIDER_ID: + warnings.append( + "LLM-backed analysis is not enabled in this release; /analyze uses deterministic_baseline only." + ) + mode = DEFAULT_MODE + model_provider_id = DEFAULT_MODEL_PROVIDER_ID claims = extract_claims(normalized_text) claims_out: list[dict[str, Any]] = [] all_scores: list[float] = [] - warnings: list[str] = [] any_review = False for index, claim in enumerate(claims, start=1): @@ -122,7 +130,7 @@ def analyze_text( "model_provider_id": model_provider_id, "model_name": DEFAULT_MODEL_NAME, "llm_used": False, - "deterministic_fallback_used": False if mode == DEFAULT_MODE else allow_deterministic_fallback, + "deterministic_fallback_used": False, "claims": claims_out, "overall_risk_score": overall, "risk_level": risk_level(overall), diff --git a/engine/argument_risk_engine/classification/deterministic.py b/engine/argument_risk_engine/classification/deterministic.py index b80e3cb..5e5b6e0 100644 --- a/engine/argument_risk_engine/classification/deterministic.py +++ b/engine/argument_risk_engine/classification/deterministic.py @@ -1,5 +1,7 @@ from __future__ import annotations +import re + from argument_risk_engine.retrieval.candidate_filter import is_healthy_suppressor from argument_risk_engine.taxonomy.models import ActivationStatus, TaxonomyEntry @@ -38,6 +40,8 @@ def classify_deterministic( continue if _exclusion_triggered(haystack, entry.exclusion_criteria): continue + if entry.id == "overgeneralization" and not _has_unsupported_universal_claim(claim): + continue evidence = _best_evidence_span(haystack, entry, candidate) if evidence is None: @@ -76,6 +80,69 @@ def classify_deterministic( return results[:limit] +def _has_unsupported_universal_claim(claim: str) -> bool: + """Return true only for broad, unsupported universal claims. + + This keeps trigger words such as "always", "never", "all", and + "everyone" from classifying bounded observations, quoted words, + documented rules, or operational statements as overgeneralization. + """ + + lower = claim.strip().lower() + quoted_or_literal_pattern = ( + r"[\"']?(always|never|all|every|everyone|nobody|none|no)[\"']?" + r"\s+(is|are|means|appears|used|reserved)\b" + ) + if re.search(quoted_or_literal_pattern, lower): + return False + bounded_or_supported = [ + "according to", + "based on", + "system log", + "job log", + "manifest", + "packing list", + "style guide", + "handbook", + "launch notes", + "archive", + "shelf", + "bin ", + "exact search", + "should ", + "must ", + ] + if any(marker in lower for marker in bounded_or_supported): + return False + universal_patterns = [ + r"\beveryone\b.+\b(always|never|all|caused|will|fail|ignored?|understood|received|does|is|are)\b", + r"\beveryone\s+(in|on|who)\b.+\b(always|never|will|fail|ignored?|understood|does|is|are)\b", + ( + r"\b(all|every|no|none of|nobody)\b.+" + r"\b(is|are|was|were|will|proves?|shows?|hated|matters|benefits?|work|failed|useless|broken|unusable)\b" + ), + r"\balways\b.+\b(because|even though|caused?|proves?|shows?)\b", + r"\bnever\b.+\b(because|proves?|shows?|benefits?|works?)\b", + ] + if not any(re.search(pattern, lower) for pattern in universal_patterns): + return False + weak_evidence_markers = [ + "because one", + "after a single", + "from one", + "only two", + "first ", + "single ", + "whole ", + "entire ", + "always", + "never", + "everyone", + "nobody", + "none", + ] + return any(marker in lower for marker in weak_evidence_markers) + def _entry(candidate: object) -> TaxonomyEntry: return getattr(candidate, "entry", candidate) diff --git a/engine/argument_risk_engine/taxonomy/models.py b/engine/argument_risk_engine/taxonomy/models.py index a4b2c74..b9b24e3 100644 --- a/engine/argument_risk_engine/taxonomy/models.py +++ b/engine/argument_risk_engine/taxonomy/models.py @@ -239,6 +239,7 @@ def default_taxonomy_pack() -> TaxonomyPack: detection_level="structural", signals=["always", "never", "everyone", "all", "none"], positive_examples=["Everyone in that group is dishonest."], + negative_examples=["Every backup completed successfully according to the job log."], minimum_evidence_requirement="Evidence span showing an overbroad quantifier applied as support.", common_false_positives=["Legitimate quantified claims with adequate evidence."], enabled_for_mvp=True, diff --git a/fastapi/__init__.py b/fastapi/__init__.py deleted file mode 100644 index 4c4ff3f..0000000 --- a/fastapi/__init__.py +++ /dev/null @@ -1,79 +0,0 @@ -from __future__ import annotations - - -class Response: - def __init__(self, content='', media_type='text/plain', status_code=200, headers=None): - self.content = content - self.media_type = media_type - self.status_code = status_code - self.headers = headers or {} - - -class APIRouter: - def __init__(self, prefix='', tags=None): - self.prefix = prefix - self.routes = {} - - def _add(self, method, path, fn): - self.routes[(method, self.prefix + path)] = fn - return fn - - def get(self, path='', **kwargs): - def deco(fn): - return self._add('GET', path, fn) - return deco - - def post(self, path='', **kwargs): - def deco(fn): - return self._add('POST', path, fn) - return deco - - def put(self, path='', **kwargs): - def deco(fn): - return self._add('PUT', path, fn) - return deco - - def patch(self, path='', **kwargs): - def deco(fn): - return self._add('PATCH', path, fn) - return deco - - -class FastAPI: - def __init__(self, **kwargs): - self.routes = {} - - def add_middleware(self, *args, **kwargs): - return None - - def include_router(self, router, prefix=''): - for (method, path), fn in router.routes.items(): - self.routes[(method, prefix + path)] = fn - - def get(self, path='', **kwargs): - def deco(fn): - self.routes[('GET', path)] = fn - return fn - return deco - - def post(self, path='', **kwargs): - def deco(fn): - self.routes[('POST', path)] = fn - return fn - return deco - - def patch(self, path='', **kwargs): - def deco(fn): - self.routes[('PATCH', path)] = fn - return fn - return deco - - -class UploadFile: - def __init__(self, filename='', file=None): - self.filename = filename - self.file = file - - -def File(default=None, **kwargs): - return default diff --git a/fastapi/middleware/__init__.py b/fastapi/middleware/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/fastapi/middleware/cors.py b/fastapi/middleware/cors.py deleted file mode 100644 index 9543402..0000000 --- a/fastapi/middleware/cors.py +++ /dev/null @@ -1,2 +0,0 @@ -class CORSMiddleware: - pass diff --git a/fastapi/testclient/__init__.py b/fastapi/testclient/__init__.py deleted file mode 100644 index 4fc5ae5..0000000 --- a/fastapi/testclient/__init__.py +++ /dev/null @@ -1,155 +0,0 @@ -from __future__ import annotations - -import inspect -from io import BytesIO -from urllib.parse import parse_qs, urlparse - -from fastapi import Response, UploadFile - - -class _Resp: - def __init__(self, payload, status_code=200): - self._payload = payload - self.status_code = getattr(payload, 'status_code', status_code) - self.content = getattr(payload, 'content', b'') - self.headers = getattr(payload, 'headers', {}) - - def json(self): - def conv(v): - if isinstance(v, Response): - return v.content - if hasattr(v, 'model_dump'): - return v.model_dump() - if isinstance(v, dict): - return {k: conv(i) for k, i in v.items()} - if isinstance(v, list): - return [conv(i) for i in v] - if hasattr(v, 'value'): - return v.value - return v - return conv(self._payload) - - -class TestClient: - def __init__(self, app): - self.app = app - - def get(self, path): - fn, kwargs, query = _resolve(self.app, 'GET', path) - if not fn: - return _Resp({'detail': 'not found'}, 404) - kwargs.update(query) - return _Resp(_call(fn, kwargs)) - - def post(self, path, json=None, files=None): - fn, kwargs, query = _resolve(self.app, 'POST', path) - if not fn: - return _Resp({'detail': 'not found'}, 404) - kwargs.update(query) - kwargs.update(_json_args(fn, json or {})) - kwargs.update(_file_args(files)) - return _Resp(_call(fn, kwargs)) - - def put(self, path, json=None): - fn, kwargs, query = _resolve(self.app, 'PUT', path) - if not fn: - return _Resp({'detail': 'not found'}, 404) - kwargs.update(query) - kwargs.update(_json_args(fn, json or {})) - return _Resp(_call(fn, kwargs)) - - def patch(self, path, json=None): - fn, kwargs, query = _resolve(self.app, 'PATCH', path) - if not fn: - return _Resp({'detail': 'not found'}, 404) - kwargs.update(query) - kwargs.update(_json_args(fn, json or {})) - return _Resp(_call(fn, kwargs)) - - -def _resolve(app, method, raw_path): - parsed = urlparse(raw_path) - path = parsed.path - query = {key: values[-1] for key, values in parse_qs(parsed.query).items()} - exact = app.routes.get((method, path)) - if exact: - return exact, {}, query - path_parts = [part for part in path.split('/') if part] - for (route_method, route_path), fn in app.routes.items(): - if route_method != method: - continue - route_parts = [part for part in route_path.split('/') if part] - if len(route_parts) != len(path_parts): - continue - kwargs = {} - matched = True - for route_part, path_part in zip(route_parts, path_parts, strict=True): - if route_part.startswith('{') and route_part.endswith('}'): - kwargs[route_part[1:-1]] = path_part - elif route_part != path_part: - matched = False - break - if matched: - return fn, kwargs, query - return None, {}, query - - -def _call(fn, kwargs): - sig = inspect.signature(fn) - accepted = {} - for name, param in sig.parameters.items(): - if name in kwargs: - accepted[name] = kwargs[name] - elif param.default is inspect._empty: - ann = _resolve_annotation(fn, param.annotation) - if isinstance(kwargs, dict) and ann is not inspect._empty: - try: - accepted[name] = ann(**kwargs) - except Exception: - pass - return fn(**accepted) - - -def _json_args(fn, data): - if not data: - return {} - sig = inspect.signature(fn) - required_model_params = [] - for name, param in sig.parameters.items(): - if name in {'file'}: - continue - ann = _resolve_annotation(fn, param.annotation) - if ann is inspect._empty or ann in {str, int, bool, float}: - continue - if param.default is inspect._empty: - required_model_params.append((name, ann)) - if len(required_model_params) == 1: - name, cls = required_model_params[0] - try: - return {name: cls(**data)} - except Exception: - return {name: data} - return data - - -def _resolve_annotation(fn, ann): - if isinstance(ann, str): - builtin = {'str': str, 'int': int, 'bool': bool, 'float': float}.get(ann) - if builtin is not None: - return builtin - return getattr(inspect.getmodule(fn), ann, fn.__globals__.get(ann, ann)) - return ann - - -def _file_args(files): - if not files: - return {} - file_info = files.get('file') if isinstance(files, dict) else None - if file_info is None: - return {} - filename, content, *_ = file_info - if hasattr(content, 'read'): - file_obj = content - else: - file_obj = BytesIO(content if isinstance(content, bytes) else str(content).encode()) - return {'file': UploadFile(filename=filename, file=file_obj)} diff --git a/frontend/index.html b/frontend/index.html index 5572dd9..74c4f11 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -4,10 +4,9 @@ Argument-Risk-Engine -
- + diff --git a/frontend/package-lock.json b/frontend/package-lock.json deleted file mode 100644 index eeb8603..0000000 --- a/frontend/package-lock.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "name": "argument-risk-engine-frontend", - "version": "0.1.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "argument-risk-engine-frontend", - "version": "0.1.0", - "devDependencies": {} - } - } -} diff --git a/frontend/package.json b/frontend/package.json index 8c86959..79e9567 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -4,10 +4,19 @@ "private": true, "type": "module", "scripts": { - "dev": "node scripts/dev_server.mjs", - "build": "node scripts/build_frontend.mjs", - "preview": "node scripts/dev_server.mjs" + "dev": "vite", + "build": "vite build", + "preview": "vite preview" }, - "dependencies": {}, - "devDependencies": {} + "dependencies": { + "react": "^18.2.0", + "react-dom": "^18.2.0" + }, + "devDependencies": { + "@types/react": "^18.2.66", + "@types/react-dom": "^18.2.22", + "@vitejs/plugin-react": "^4.3.1", + "typescript": "^5.5.4", + "vite": "^5.4.0" + } } diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index aa13e82..cc5a0bc 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -18,7 +18,7 @@ import type { TaxonomyValidationResult, } from './types' -export const API_BASE = (import.meta.env.VITE_API_BASE ?? 'http://localhost:8000/api').replace(/\/$/, '') +export const API_BASE = (import.meta.env.VITE_API_BASE ?? '/api').replace(/\/$/, '') const REVIEW_KEY = 'are.review.records' const REPORT_KEY = 'are.generated.reports' diff --git a/pydantic/__init__.py b/pydantic/__init__.py deleted file mode 100644 index 753ff96..0000000 --- a/pydantic/__init__.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations -from typing import Any - -class FieldInfo: - def __init__(self, default: Any = None, default_factory=None, **kwargs): - self.default = default - self.default_factory = default_factory - self.kwargs = kwargs - -def Field(default: Any = None, default_factory=None, **kwargs): - return FieldInfo(default, default_factory, **kwargs) - -def field_validator(*fields, **kwargs): - def deco(fn): - return fn - return deco - -class BaseModel: - def __init__(self, **data): - annotations = {} - for cls in reversed(self.__class__.mro()): - annotations.update(getattr(cls, '__annotations__', {})) - for name in annotations: - if name in data: - value = data[name] - else: - default = getattr(self.__class__, name, None) - if isinstance(default, FieldInfo): - value = default.default_factory() if default.default_factory else default.default - else: - value = default - setattr(self, name, value) - for name, value in data.items(): - if not hasattr(self, name): - setattr(self, name, value) - - @classmethod - def model_validate(cls, data): - return cls(**data) - - def model_dump(self, mode: str | None = None): - def conv(v): - if isinstance(v, BaseModel): - return v.model_dump(mode=mode) - if isinstance(v, list): - return [conv(i) for i in v] - if isinstance(v, dict): - return {k: conv(i) for k, i in v.items()} - if hasattr(v, 'value'): - return v.value - return v - return {k: conv(v) for k, v in self.__dict__.items()} diff --git a/pyproject.toml b/pyproject.toml index 26144b0..1d783fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,6 @@ [build-system] -requires = [] -build-backend = "build_backend" -backend-path = ["."] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" [project] name = "argument-risk-engine" diff --git a/uvicorn/__init__.py b/uvicorn/__init__.py deleted file mode 100644 index a5e68f6..0000000 --- a/uvicorn/__init__.py +++ /dev/null @@ -1,208 +0,0 @@ -from __future__ import annotations - -import http.server -import importlib -import inspect -import json -import socketserver -from email.parser import BytesParser -from email.policy import default as email_policy -from io import BytesIO -from types import SimpleNamespace -from urllib.parse import parse_qs, urlparse - -from fastapi import Response, UploadFile - - -def run(app_path: str, host: str = "127.0.0.1", port: int = 8000, reload: bool = False): - """Run a tiny local HTTP server for the repository's FastAPI-compatible app. - - This project ships lightweight FastAPI/Uvicorn shims so the demo can run in - restricted/offline environments. The server intentionally supports only the - routing and request features used by the app: JSON bodies, query strings, - dynamic path parameters, and single-file multipart uploads. - """ - app = _load_app(app_path) - - class Handler(http.server.BaseHTTPRequestHandler): - def do_GET(self): - self._dispatch("GET") - - def do_POST(self): - self._dispatch("POST") - - def do_PUT(self): - self._dispatch("PUT") - - def do_PATCH(self): - self._dispatch("PATCH") - - def _dispatch(self, method: str) -> None: - fn, kwargs, query = _resolve(app, method, self.path) - if not fn: - self._send_json({"detail": "not found"}, status=404) - return - kwargs.update(query) - try: - body_kwargs = _body_args(fn, self) - kwargs.update(body_kwargs) - payload = _call(fn, kwargs) - self._send_payload(payload) - except Exception as error: # pragma: no cover - defensive server boundary - self._send_json({"detail": str(error)}, status=500) - - def _send_payload(self, payload) -> None: - if isinstance(payload, Response): - content = payload.content - if isinstance(content, str): - content = content.encode("utf-8") - self.send_response(payload.status_code) - self.send_header("Content-Type", payload.media_type) - for key, value in payload.headers.items(): - self.send_header(key, value) - self.end_headers() - self.wfile.write(content or b"") - return - self._send_json(_to_jsonable(payload)) - - def _send_json(self, payload, status: int = 200) -> None: - body = json.dumps(_to_jsonable(payload), ensure_ascii=False).encode("utf-8") - self.send_response(status) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(body) - - def log_message(self, *args): - return - - class ReusableTCPServer(socketserver.TCPServer): - allow_reuse_address = True - allow_reuse_port = True - - with ReusableTCPServer((host, port), Handler) as httpd: - print(f"Backend: http://{host}:{port}", flush=True) - httpd.serve_forever() - - -def _load_app(app_path: str): - module_name, _, attr = app_path.partition(":") - module = importlib.import_module(module_name) - return getattr(module, attr or "app") - - -def _resolve(app, method: str, raw_path: str): - parsed = urlparse(raw_path) - path = parsed.path.rstrip("/") or "/" - query = {key: values[-1] for key, values in parse_qs(parsed.query).items()} - exact = app.routes.get((method, path)) or app.routes.get((method, parsed.path)) - if exact: - return exact, {}, query - path_parts = [part for part in path.split("/") if part] - for (route_method, route_path), fn in app.routes.items(): - if route_method != method: - continue - route_parts = [part for part in route_path.split("/") if part] - if len(route_parts) != len(path_parts): - continue - kwargs = {} - matched = True - for route_part, path_part in zip(route_parts, path_parts, strict=True): - if route_part.startswith("{") and route_part.endswith("}"): - kwargs[route_part[1:-1]] = path_part - elif route_part != path_part: - matched = False - break - if matched: - return fn, kwargs, query - return None, {}, query - - -def _body_args(fn, handler) -> dict: - length = int(handler.headers.get("Content-Length", "0") or 0) - if length <= 0: - return {} - content_type = handler.headers.get("Content-Type", "") - body = handler.rfile.read(length) - if content_type.startswith("application/json"): - data = json.loads(body.decode("utf-8") or "{}") - return _json_args(fn, data) - if content_type.startswith("multipart/form-data"): - return _multipart_args(content_type, body) - return {} - - -def _json_args(fn, data: dict) -> dict: - if not data: - return {} - sig = inspect.signature(fn) - required_model_params = [] - for name, param in sig.parameters.items(): - if name == "file": - continue - ann = _resolve_annotation(fn, param.annotation) - if ann is inspect._empty or ann in {str, int, bool, float}: - continue - if param.default is inspect._empty: - required_model_params.append((name, ann)) - if len(required_model_params) == 1: - name, cls = required_model_params[0] - try: - return {name: cls(**data)} - except Exception: - return {name: data} - return data - - -def _multipart_args(content_type: str, body: bytes) -> dict: - message = BytesParser(policy=email_policy).parsebytes( - b"Content-Type: " + content_type.encode("utf-8") + b"\r\n\r\n" + body - ) - result = {} - for part in message.iter_parts(): - name = part.get_param("name", header="content-disposition") - filename = part.get_filename() - payload = part.get_payload(decode=True) or b"" - if filename: - result[name or "file"] = UploadFile(filename=filename, file=BytesIO(payload)) - elif name: - result[name] = payload.decode(part.get_content_charset() or "utf-8") - return result - - -def _call(fn, kwargs): - sig = inspect.signature(fn) - accepted = {} - for name, param in sig.parameters.items(): - if name in kwargs: - accepted[name] = kwargs[name] - elif param.default is inspect._empty: - ann = _resolve_annotation(fn, param.annotation) - if ann is not inspect._empty: - try: - accepted[name] = ann(**kwargs) - except Exception: - pass - return fn(**accepted) - - -def _resolve_annotation(fn, ann): - if isinstance(ann, str): - builtin = {"str": str, "int": int, "bool": bool, "float": float}.get(ann) - if builtin is not None: - return builtin - return getattr(inspect.getmodule(fn), ann, fn.__globals__.get(ann, ann)) - return ann - - -def _to_jsonable(value): - if hasattr(value, "model_dump"): - return value.model_dump() - if isinstance(value, dict): - return {key: _to_jsonable(item) for key, item in value.items()} - if isinstance(value, list): - return [_to_jsonable(item) for item in value] - if hasattr(value, "value"): - return value.value - if isinstance(value, SimpleNamespace): - return vars(value) - return value diff --git a/uvicorn/__main__.py b/uvicorn/__main__.py deleted file mode 100644 index 77b6ffa..0000000 --- a/uvicorn/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -from uvicorn.main import main - -if __name__ == "__main__": - main() diff --git a/uvicorn/main.py b/uvicorn/main.py deleted file mode 100644 index 3aa5726..0000000 --- a/uvicorn/main.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import annotations - -import argparse - -from uvicorn import run - - -def main() -> None: - parser = argparse.ArgumentParser(prog="uvicorn") - parser.add_argument("app_path") - parser.add_argument("--host", default="127.0.0.1") - parser.add_argument("--port", default=8000, type=int) - parser.add_argument("--reload", action="store_true") - args = parser.parse_args() - run(args.app_path, host=args.host, port=args.port, reload=args.reload) - - -if __name__ == "__main__": - main()