diff --git a/.env.example b/.env.example index 5284fe9a..8f041aaa 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,6 @@ # Shared runtime (optional defaults) LOG_LEVEL=INFO +RUNTIME_ENV=local # Queue transport (optional defaults) REDIS_URL=redis://redis:6379/0 @@ -12,7 +13,9 @@ REDIS_SOCKET_TIMEOUT=5.0 POSTGRES_URL=postgresql://postgres:postgres@postgres:5432/workflows POSTGRES_DB=workflows POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres # change in production +# Change in production +POSTGRES_PASSWORD=postgres +POSTGRES_HOST_BIND=127.0.0.1 # Optional: expose postgres on host for local debugging POSTGRES_PORT=5432 @@ -26,6 +29,7 @@ JOB_RESULT_TTL_SECONDS=3600 # Internal transfer storage (optional defaults for local compose) MINIO_ENDPOINT=http://minio:9000 MINIO_ROOT_USER=internal +# Change in production MINIO_ROOT_PASSWORD=change-me MINIO_INTERNAL_BUCKET=internal-transfers MINIO_HOST_BIND=127.0.0.1 @@ -49,12 +53,17 @@ RESUME_KEYWORDS=resume,cv,curriculum OPENAI_API_KEY= OPENAI_BASE_URL= OPENAI_MODEL=gpt-4o-mini +CRM_SYNC_ENABLED=true +CRM_SYNC_INTERVAL_SECONDS=900 +CRM_SYNC_PAGE_SIZE=200 # Discord bot (required for bot runtime) DISCORD_BOT_TOKEN=your_bot_token_here HEALTHCHECK_PORT=3000 DISCORD_SENDMSG_CHARACTER_LIMIT=2000 CHECK_EMAIL_WAIT=2 +AUDIT_API_BASE_URL= +AUDIT_API_TIMEOUT_SECONDS=2.0 # Required for Discord bot commands that post to channels CHANNEL_ID=1391742724666822798 diff --git a/AGENTS.md b/AGENTS.md index dcc01d16..a0172dd8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,10 +25,16 @@ This repo contains multiple services: - `apps/worker`: webhook ingest API and queue consumer - `docker-compose.yml`: stack orchestration with Redis, Postgres, and MinIO +4. Human audit logging +- Human-triggered CRM actions from Discord should write to the worker audit ingest endpoint. +- Audit logging must be best effort: command flows should never fail if audit writes fail. +- Keep reusable audit-write logic outside individual cogs. + ## Common Paths - Bot core: `apps/discord_bot/src/five08/discord_bot/bot.py` - Bot config: `apps/discord_bot/src/five08/discord_bot/config.py` +- Bot audit helper: `apps/discord_bot/src/five08/discord_bot/utils/audit.py` - Worker API: `apps/worker/src/five08/worker/api.py` - Worker consumer: `apps/worker/src/five08/worker/consumer.py` - Shared settings: `packages/shared/src/five08/settings.py` @@ -87,6 +93,7 @@ async def setup(bot: commands.Bot) -> None: - Add shared env/config in `packages/shared/src/five08/settings.py`. - Add service-specific settings in local service `config.py` by subclassing shared settings. - Keep secrets in env vars, not code. +- For Discord CRM audit writes, use `AUDIT_API_BASE_URL` and shared `WEBHOOK_SHARED_SECRET`. ## Agent Guidelines diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index c005b454..4519d644 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -109,6 +109,13 @@ async def setup(bot: commands.Bot) -> None: - extract skills (LLM when configured, heuristic fallback otherwise) - update contact skills field in EspoCRM - Manual queueing is available via `POST /process-contact/{contact_id}`. +- Human action audit ingest is available at `POST /audit/events`. + +## Discord CRM Audit Flow + +- CRM slash commands in `apps/discord_bot/src/five08/discord_bot/cogs/crm.py` emit best-effort audit events for human actions. +- Audit writing is centralized in `apps/discord_bot/src/five08/discord_bot/utils/audit.py`. +- Audit writes must never break command execution; failures are logged as warnings only. ## Environment Variables @@ -116,6 +123,7 @@ Use `.env.example` as source of truth. Key categories: - Shared queue/runtime: `REDIS_URL`, `REDIS_QUEUE_NAME`, `POSTGRES_URL`, `JOB_MAX_ATTEMPTS`, `JOB_RETRY_BASE_SECONDS`, `JOB_RETRY_MAX_SECONDS`, `LOG_LEVEL`, webhook settings - Bot credentials/integrations: Discord, email, Espo, Kimai +- Discord CRM audit writer: `AUDIT_API_BASE_URL`, `AUDIT_API_TIMEOUT_SECONDS` (plus shared `WEBHOOK_SHARED_SECRET`) - Worker controls: `WORKER_NAME`, `WORKER_QUEUE_NAMES`, `WORKER_BURST` - Worker CRM processing: `MAX_ATTACHMENTS_PER_CONTACT`, `MAX_FILE_SIZE_MB`, `ALLOWED_FILE_TYPES`, `RESUME_KEYWORDS`, `OPENAI_API_KEY`, `OPENAI_BASE_URL`, `OPENAI_MODEL` diff --git a/README.md b/README.md index e7c137c3..605b0de7 100644 --- a/README.md +++ b/README.md @@ -41,13 +41,18 @@ Migrations: - Job states: `queued`, `running`, `succeeded`, `failed`, `dead`, `canceled`. - Idempotency key is unique and optional. - Attempts are stored with `run_after`/retry state so delivery failures are never lost. +- Human audit events are persisted in `audit_events`. +- CRM identity cache is persisted in `people`. ### Worker API Endpoints - `GET /health`: Redis/Postgres/worker health check. - `POST /webhooks/{source}`: Generic webhook enqueue endpoint. - `POST /webhooks/espocrm`: EspoCRM webhook endpoint (expects array payload). +- `POST /webhooks/espocrm/people-sync`: EspoCRM contact-change webhook for people cache sync. - `POST /process-contact/{contact_id}`: Manually enqueue one contact skills job. +- `POST /sync/people`: Manually enqueue a full CRM->people cache sync. +- `POST /audit/events`: Persist one human audit event (`discord` or `admin_dashboard`). ## Local Development @@ -87,59 +92,94 @@ docker compose up --build ## Environment Variables -### Shared (bot + worker) - -- `REDIS_URL` (default: `redis://redis:6379/0`) -- `REDIS_QUEUE_NAME` (default: `jobs.default`) -- `REDIS_KEY_PREFIX` (default: `jobs`) -- `ESPO_BASE_URL` (required by both bot and worker) -- `ESPO_API_KEY` (required by both bot and worker) -- `JOB_TIMEOUT_SECONDS` (default: `600`) -- `JOB_RESULT_TTL_SECONDS` (default: `3600`) -- `WEBHOOK_SHARED_SECRET` (required; requests are rejected when unset) -- `POSTGRES_URL` (default: `postgresql://postgres:postgres@postgres:5432/workflows`) -- `POSTGRES_DB` (default: `workflows`) -- `POSTGRES_USER` (default: `postgres`) -- `POSTGRES_PASSWORD` (default: `postgres`) -- `JOB_MAX_ATTEMPTS` (default: `8`) -- `JOB_RETRY_BASE_SECONDS` (default: `5`) -- `JOB_RETRY_MAX_SECONDS` (default: `300`) -- `WEBHOOK_INGEST_HOST` (default: `0.0.0.0`) -- `WEBHOOK_INGEST_PORT` (default: `8090`) -- `LOG_LEVEL` (default: `INFO`) -- `MINIO_ENDPOINT` (default: `http://minio:9000`) -- `MINIO_INTERNAL_BUCKET` (default: `internal-transfers`) -- `MINIO_ROOT_USER` (default: `internal`) -- `MINIO_ROOT_PASSWORD` -- `MINIO_HOST_BIND` (default: `127.0.0.1`; set to `0.0.0.0` to expose MinIO) -- `MINIO_API_PORT` (default: `9000`) -- `MINIO_CONSOLE_PORT` (default: `9001`) -- `MINIO_ACCESS_KEY` / `MINIO_SECRET_KEY` (compatibility aliases; use `MINIO_ROOT_USER` and `MINIO_ROOT_PASSWORD` by default for internal transfers) - -### Discord Bot - -- `DISCORD_BOT_TOKEN` -- `CHANNEL_ID` -- `EMAIL_USERNAME` -- `EMAIL_PASSWORD` -- `IMAP_SERVER` -- `SMTP_SERVER` -- `KIMAI_BASE_URL` -- `KIMAI_API_TOKEN` -- Optional: `CHECK_EMAIL_WAIT`, `DISCORD_SENDMSG_CHARACTER_LIMIT`, `HEALTHCHECK_PORT` +Use `.env.example` as the source of truth for defaults. + +### Core Runtime (Bot + Worker) + +- `Required`: `ESPO_BASE_URL`, `ESPO_API_KEY` +- `Optional`: `LOG_LEVEL` (default: `INFO`) +- `Optional`: `RUNTIME_ENV` (default: `local`; non-local values require explicit `POSTGRES_URL` and `MINIO_ROOT_PASSWORD`) + +### Queue + Job Runtime + +- `Optional`: `REDIS_URL` (default: `redis://redis:6379/0`) +- `Optional`: `REDIS_QUEUE_NAME` (default: `jobs.default`) +- `Optional`: `REDIS_KEY_PREFIX` (default: `jobs`) +- `Optional`: `JOB_TIMEOUT_SECONDS` (default: `600`) +- `Optional`: `JOB_RESULT_TTL_SECONDS` (default: `3600`) +- `Optional`: `JOB_MAX_ATTEMPTS` (default: `8`) +- `Optional`: `JOB_RETRY_BASE_SECONDS` (default: `5`) +- `Optional`: `JOB_RETRY_MAX_SECONDS` (default: `300`) + +### Postgres + Compose Exposure + +- `Optional`: `POSTGRES_URL` (default: `postgresql://postgres@postgres:5432/workflows`) +- `Optional` (Compose DB container): `POSTGRES_DB` (default: `workflows`) +- `Optional` (Compose DB container): `POSTGRES_USER` (default: `postgres`) +- `Optional` (Compose DB container): `POSTGRES_PASSWORD` (default: `postgres`) +- `Optional` (Compose host bind): `POSTGRES_HOST_BIND` (default: `127.0.0.1`) +- `Optional` (Compose host port): `POSTGRES_PORT` (default: `5432`) + +### MinIO + Internal Transfers + +- `Required` in non-local environments: `MINIO_ROOT_PASSWORD` +- `Optional`: `MINIO_ENDPOINT` (default: `http://minio:9000`) +- `Optional`: `MINIO_INTERNAL_BUCKET` (default: `internal-transfers`) +- `Optional`: `MINIO_ROOT_USER` (default: `internal`) +- `Optional`: `MINIO_HOST_BIND` (default: `127.0.0.1`; set `0.0.0.0` to expose externally) +- `Optional`: `MINIO_API_PORT` (default: `9000`) +- `Optional`: `MINIO_CONSOLE_PORT` (default: `9001`) +- Note: `MINIO_ACCESS_KEY` / `MINIO_SECRET_KEY` are `SharedSettings` alias properties (`minio_access_key`, `minio_secret_key`) and are not env-loaded fields. +- Note: use `MINIO_ROOT_USER` and `MINIO_ROOT_PASSWORD` as the actual env vars. + +### Worker API Ingest + +- `Required` for protected endpoints: `WEBHOOK_SHARED_SECRET` (ingest requests are rejected when unset) +- `Optional`: `WEBHOOK_INGEST_HOST` (default: `0.0.0.0`) +- `Optional`: `WEBHOOK_INGEST_PORT` (default: `8090`) ### Worker Consumer -- `WORKER_NAME` (default: `integrations-worker`) -- `WORKER_QUEUE_NAMES` (default: `jobs.default`, comma-separated) -- `WORKER_BURST` (default: `false`) -- `MAX_ATTACHMENTS_PER_CONTACT` (default: `3`) -- `MAX_FILE_SIZE_MB` (default: `10`) -- `ALLOWED_FILE_TYPES` (default: `pdf,doc,docx,txt`) -- `RESUME_KEYWORDS` (default: `resume,cv,curriculum`) -- `OPENAI_API_KEY` (optional; if unset, heuristic extraction is used) -- `OPENAI_BASE_URL` (optional) -- `OPENAI_MODEL` (default: `gpt-4o-mini`) +- `Optional`: `WORKER_NAME` (default: `integrations-worker`) +- `Optional`: `WORKER_QUEUE_NAMES` (default: `jobs.default`, comma-separated) +- `Optional`: `WORKER_BURST` (default: `false`) + +### Worker CRM Sync + Skills Extraction + +- `Optional`: `CRM_SYNC_ENABLED` (default: `true`) +- `Optional`: `CRM_SYNC_INTERVAL_SECONDS` (default: `900`) +- `Optional`: `CRM_SYNC_PAGE_SIZE` (default: `200`) +- `Optional`: `MAX_ATTACHMENTS_PER_CONTACT` (default: `3`) +- `Optional`: `MAX_FILE_SIZE_MB` (default: `10`) +- `Optional`: `ALLOWED_FILE_TYPES` (default: `pdf,doc,docx,txt`) +- `Optional`: `RESUME_KEYWORDS` (default: `resume,cv,curriculum`) +- `Optional`: `OPENAI_API_KEY` (if unset, heuristic extraction is used) +- `Optional`: `OPENAI_BASE_URL` +- `Optional`: `OPENAI_MODEL` (default: `gpt-4o-mini`) + +### Discord Bot Core + +- `Required`: `DISCORD_BOT_TOKEN` +- `Required`: `CHANNEL_ID` +- `Optional`: `HEALTHCHECK_PORT` (default: `3000`) +- `Optional`: `DISCORD_SENDMSG_CHARACTER_LIMIT` (default: `2000`) +- `Optional`: `CHECK_EMAIL_WAIT` (default: `2`) + +### Discord Email Monitoring + +- `Required`: `EMAIL_USERNAME` +- `Required`: `EMAIL_PASSWORD` +- `Required`: `IMAP_SERVER` +- `Required`: `SMTP_SERVER` + +### Discord CRM Audit Logging (Best Effort) + +- `Optional`: `AUDIT_API_BASE_URL` (when set with `WEBHOOK_SHARED_SECRET`, CRM commands emit best-effort audit events) +- `Optional`: `AUDIT_API_TIMEOUT_SECONDS` (default: `2.0`) + +### Kimai (Legacy/Deprecating) + +- `Currently required by config model`: `KIMAI_BASE_URL`, `KIMAI_API_TOKEN` ## Commands diff --git a/apps/discord_bot/Dockerfile b/apps/discord_bot/Dockerfile index 3890c703..f70a7d2b 100644 --- a/apps/discord_bot/Dockerfile +++ b/apps/discord_bot/Dockerfile @@ -1,3 +1,4 @@ +# Pinned from ghcr.io/astral-sh/uv:python3.12-bookworm-slim (verified 2026-02-21). FROM ghcr.io/astral-sh/uv@sha256:e5b65587bce7de595f299855d7385fe7fca39b8a74baa261ba1b7147afa78e58 WORKDIR /app diff --git a/apps/discord_bot/src/five08/__init__.py b/apps/discord_bot/src/five08/__init__.py deleted file mode 100644 index 10749279..00000000 --- a/apps/discord_bot/src/five08/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Namespace package for five08 discord bot distribution.""" - -from pkgutil import extend_path - -__path__ = extend_path(__path__, __name__) diff --git a/apps/discord_bot/src/five08/discord_bot/cogs/crm.py b/apps/discord_bot/src/five08/discord_bot/cogs/crm.py index b321f9a4..545c42f0 100644 --- a/apps/discord_bot/src/five08/discord_bot/cogs/crm.py +++ b/apps/discord_bot/src/five08/discord_bot/cogs/crm.py @@ -14,6 +14,7 @@ from five08.discord_bot.config import settings from five08.clients import espo +from five08.discord_bot.utils.audit import DiscordAuditLogger from five08.discord_bot.utils.role_decorators import ( require_role, check_user_roles_with_hierarchy, @@ -73,6 +74,14 @@ async def callback(self, interaction: discord.Interaction) -> None: # Check if user has Member role if not cog._check_member_role(interaction): + cog._audit_command( + interaction=interaction, + action="crm.resume_download_button", + result="denied", + metadata={"reason": "missing_member_role"}, + resource_type="discord_ui_action", + resource_id=self.resume_id, + ) await interaction.response.send_message( "❌ You must have the Member role to download resumes.", ephemeral=True, @@ -82,12 +91,29 @@ async def callback(self, interaction: discord.Interaction) -> None: await interaction.response.defer(ephemeral=True) # Use shared download method - await cog._download_and_send_resume( + download_ok = await cog._download_and_send_resume( interaction, self.contact_name, self.resume_id ) + cog._audit_command( + interaction=interaction, + action="crm.resume_download_button", + result="success" if download_ok else "error", + metadata={"contact_name": self.contact_name}, + resource_type="crm_contact", + resource_id=self.resume_id, + ) except Exception as e: logger.error(f"Unexpected error in resume button callback: {e}") + if "cog" in locals() and cog: + cog._audit_command( + interaction=interaction, + action="crm.resume_download_button", + result="error", + metadata={"error": str(e)}, + resource_type="discord_ui_action", + resource_id=self.resume_id, + ) await interaction.followup.send( "❌ An unexpected error occurred while downloading the resume." ) @@ -331,10 +357,35 @@ def __init__(self, bot: commands.Bot) -> None: self.espo_api = EspoAPI(api_url, settings.espo_api_key) # Store base URL for profile links self.base_url = settings.espo_base_url.rstrip("/") + self.audit_logger = DiscordAuditLogger( + base_url=settings.audit_api_base_url, + shared_secret=settings.webhook_shared_secret, + timeout_seconds=settings.audit_api_timeout_seconds, + ) + + def _audit_command( + self, + *, + interaction: discord.Interaction, + action: str, + result: str, + metadata: dict[str, Any] | None = None, + resource_type: str | None = "discord_command", + resource_id: str | None = None, + ) -> None: + """Queue a best-effort audit write for CRM command activity.""" + self.audit_logger.log_command( + interaction=interaction, + action=action, + result=result, + metadata=metadata, + resource_type=resource_type, + resource_id=resource_id, + ) async def _download_and_send_resume( self, interaction: discord.Interaction, contact_name: str, resume_id: str - ) -> None: + ) -> bool: """Download and send a resume file as a Discord attachment.""" try: # Download the resume file @@ -351,10 +402,12 @@ async def _download_and_send_resume( await interaction.followup.send( f"📄 Resume for **{contact_name}**:", file=discord_file ) + return True except EspoAPIError as e: logger.error(f"Failed to download resume {resume_id}: {e}") await interaction.followup.send(f"❌ Failed to download resume: {str(e)}") + return False def _check_member_role(self, interaction: discord.Interaction) -> bool: """Check if user has Member role or higher for resume access.""" @@ -388,6 +441,12 @@ async def search_members( ) if not query_value and not skills_list: + self._audit_command( + interaction=interaction, + action="crm.search_members", + result="denied", + metadata={"reason": "missing_query_and_skills"}, + ) await interaction.followup.send( "❌ Please provide a search term or skills to search by." ) @@ -450,6 +509,16 @@ async def search_members( contacts = response.get("list", []) if not contacts: + self._audit_command( + interaction=interaction, + action="crm.search_members", + result="success", + metadata={ + "query": query_value or None, + "skills": skills_list, + "contacts_found": 0, + }, + ) await interaction.followup.send( f"🔍 No contacts found for: {search_summary}" ) @@ -535,11 +604,35 @@ async def search_members( else: await interaction.followup.send(embed=embed) + self._audit_command( + interaction=interaction, + action="crm.search_members", + result="success", + metadata={ + "query": query_value or None, + "skills": skills_list, + "contacts_found": len(contacts), + "resume_button_count": len(view.children), + }, + ) + except EspoAPIError as e: logger.error(f"EspoCRM API error: {e}") + self._audit_command( + interaction=interaction, + action="crm.search_members", + result="error", + metadata={"error": str(e)}, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in CRM search: {e}") + self._audit_command( + interaction=interaction, + action="crm.search_members", + result="error", + metadata={"error": str(e)}, + ) await interaction.followup.send( "❌ An unexpected error occurred while searching the CRM." ) @@ -565,9 +658,21 @@ async def crm_status(self, interaction: discord.Interaction) -> None: embed.add_field(name="Base URL", value=settings.espo_base_url, inline=True) await interaction.followup.send(embed=embed) + self._audit_command( + interaction=interaction, + action="crm.status", + result="success", + metadata={"connected_as": user_name}, + ) except EspoAPIError as e: logger.error(f"EspoCRM API error: {e}") + self._audit_command( + interaction=interaction, + action="crm.status", + result="error", + metadata={"error": str(e)}, + ) embed = discord.Embed( title="❌ CRM Status", description=f"Failed to connect to EspoCRM: {str(e)}", @@ -576,6 +681,12 @@ async def crm_status(self, interaction: discord.Interaction) -> None: await interaction.followup.send(embed=embed) except Exception as e: logger.error(f"Unexpected error in CRM status: {e}") + self._audit_command( + interaction=interaction, + action="crm.status", + result="error", + metadata={"error": str(e)}, + ) embed = discord.Embed( title="❌ CRM Status", description="An unexpected error occurred while checking CRM status.", @@ -636,6 +747,12 @@ async def get_resume(self, interaction: discord.Interaction, query: str) -> None contacts = response.get("list", []) if not contacts: + self._audit_command( + interaction=interaction, + action="crm.get_resume", + result="success", + metadata={"query": query, "contact_found": False}, + ) await interaction.followup.send(f"❌ No contact found for: `{query}`") return @@ -647,6 +764,16 @@ async def get_resume(self, interaction: discord.Interaction, query: str) -> None resume_names = contact.get("resumeNames", {}) if not resume_ids or len(resume_ids) == 0: + self._audit_command( + interaction=interaction, + action="crm.get_resume", + result="success", + metadata={ + "query": query, + "contact_found": True, + "has_resume": False, + }, + ) await interaction.followup.send( f"❌ No resume found for {contact_name}" ) @@ -661,13 +788,40 @@ async def get_resume(self, interaction: discord.Interaction, query: str) -> None ) # Use shared download method - await self._download_and_send_resume(interaction, contact_name, resume_id) + download_ok = await self._download_and_send_resume( + interaction, contact_name, resume_id + ) + self._audit_command( + interaction=interaction, + action="crm.get_resume", + result="success" if download_ok else "error", + metadata={ + "query": query, + "contact_found": True, + "has_resume": True, + "download_ok": download_ok, + }, + resource_type="crm_contact", + resource_id=str(contact.get("id", "")), + ) except EspoAPIError as e: logger.error(f"EspoCRM API error in get_resume: {e}") + self._audit_command( + interaction=interaction, + action="crm.get_resume", + result="error", + metadata={"query": query, "error": str(e)}, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in get_resume: {e}") + self._audit_command( + interaction=interaction, + action="crm.get_resume", + result="error", + metadata={"query": query, "error": str(e)}, + ) await interaction.followup.send( "❌ An unexpected error occurred while fetching the resume." ) @@ -818,8 +972,32 @@ async def _perform_discord_linking( f"Discord user {user.name} (ID: {user.id}) linked to CRM contact " f"{contact_name} (ID: {contact_id}) by {interaction.user.name}" ) + self._audit_command( + interaction=interaction, + action="crm.link_discord_user.execute", + result="success", + metadata={ + "linked_user_id": str(user.id), + "linked_username": user.name, + "contact_name": contact_name, + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) return True else: + self._audit_command( + interaction=interaction, + action="crm.link_discord_user.execute", + result="error", + metadata={ + "linked_user_id": str(user.id), + "contact_name": contact_name, + "error": "crm_update_failed", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) await interaction.followup.send( "❌ Failed to update contact in CRM. Please try again." ) @@ -827,10 +1005,22 @@ async def _perform_discord_linking( except EspoAPIError as e: logger.error(f"EspoCRM API error in _perform_discord_linking: {e}") + self._audit_command( + interaction=interaction, + action="crm.link_discord_user.execute", + result="error", + metadata={"linked_user_id": str(user.id), "error": str(e)}, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") return False except Exception as e: logger.error(f"Unexpected error in _perform_discord_linking: {e}") + self._audit_command( + interaction=interaction, + action="crm.link_discord_user.execute", + result="error", + metadata={"linked_user_id": str(user.id), "error": str(e)}, + ) await interaction.followup.send( "❌ An unexpected error occurred while linking the user." ) @@ -895,6 +1085,16 @@ async def link_discord_user( contacts = await self._search_contact_for_linking(search_term) if not contacts: + self._audit_command( + interaction=interaction, + action="crm.link_discord_user", + result="success", + metadata={ + "search_term": search_term, + "linked_user_id": str(user.id), + "contacts_found": 0, + }, + ) await interaction.followup.send( f"❌ No contact found for: `{search_term}`" ) @@ -902,6 +1102,17 @@ async def link_discord_user( # Handle multiple results - show choices if len(contacts) > 1: + self._audit_command( + interaction=interaction, + action="crm.link_discord_user", + result="success", + metadata={ + "search_term": search_term, + "linked_user_id": str(user.id), + "contacts_found": len(contacts), + "requires_selection": True, + }, + ) await self._show_contact_choices( interaction, user, search_term, contacts ) @@ -909,13 +1120,46 @@ async def link_discord_user( # Single result - proceed with linking contact = contacts[0] + self._audit_command( + interaction=interaction, + action="crm.link_discord_user", + result="success", + metadata={ + "search_term": search_term, + "linked_user_id": str(user.id), + "contacts_found": 1, + "requires_selection": False, + }, + resource_type="crm_contact", + resource_id=str(contact.get("id", "")), + ) await self._perform_discord_linking(interaction, user, contact) except EspoAPIError as e: logger.error(f"EspoCRM API error in link_discord_user: {e}") + self._audit_command( + interaction=interaction, + action="crm.link_discord_user", + result="error", + metadata={ + "search_term": search_term, + "linked_user_id": str(user.id), + "error": str(e), + }, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in link_discord_user: {e}") + self._audit_command( + interaction=interaction, + action="crm.link_discord_user", + result="error", + metadata={ + "search_term": search_term, + "linked_user_id": str(user.id), + "error": str(e), + }, + ) await interaction.followup.send( "❌ An unexpected error occurred while linking the Discord user." ) @@ -931,6 +1175,12 @@ async def unlinked_discord_users(self, interaction: discord.Interaction) -> None await interaction.response.defer(ephemeral=True) if not interaction.guild: + self._audit_command( + interaction=interaction, + action="crm.unlinked_discord_users", + result="denied", + metadata={"reason": "not_in_guild"}, + ) await interaction.followup.send( "❌ This command can only be used in a server." ) @@ -953,6 +1203,12 @@ async def unlinked_discord_users(self, interaction: discord.Interaction) -> None unlinked_users.append(member) if not unlinked_users: + self._audit_command( + interaction=interaction, + action="crm.unlinked_discord_users", + result="success", + metadata={"unlinked_count": 0}, + ) await interaction.followup.send( "✅ **All Members Linked**\nAll Discord users with Member role are linked in the CRM!" ) @@ -960,12 +1216,30 @@ async def unlinked_discord_users(self, interaction: discord.Interaction) -> None # Send list of unlinked users await self._send_unlinked_users_list(interaction, unlinked_users) + self._audit_command( + interaction=interaction, + action="crm.unlinked_discord_users", + result="success", + metadata={"unlinked_count": len(unlinked_users)}, + ) except EspoAPIError as e: logger.error(f"EspoCRM API error in unlinked_discord_users: {e}") + self._audit_command( + interaction=interaction, + action="crm.unlinked_discord_users", + result="error", + metadata={"error": str(e)}, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in unlinked_discord_users: {e}") + self._audit_command( + interaction=interaction, + action="crm.unlinked_discord_users", + result="error", + metadata={"error": str(e)}, + ) await interaction.followup.send( "❌ An unexpected error occurred while checking unlinked users." ) @@ -1086,6 +1360,15 @@ async def set_github_username( ) or not check_user_roles_with_hierarchy( interaction.user.roles, ["Steering Committee"] ): + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="denied", + metadata={ + "search_term": search_term, + "reason": "missing_required_role", + }, + ) await interaction.followup.send( "❌ You must have Steering Committee role or higher to set GitHub usernames for other people." ) @@ -1094,11 +1377,32 @@ async def set_github_username( # Search for target contact contacts = await self._search_contact_for_linking(search_term) if not contacts: + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="success", + metadata={ + "search_term": search_term, + "contact_found": False, + "target_scope": "other", + }, + ) await interaction.followup.send( f"❌ No contact found for: `{search_term}`" ) return elif len(contacts) > 1: + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="success", + metadata={ + "search_term": search_term, + "contact_found": False, + "target_scope": "other", + "reason": "multiple_contacts", + }, + ) await interaction.followup.send( f"❌ Multiple contacts found for `{search_term}`. Please be more specific or use the contact ID." ) @@ -1111,6 +1415,15 @@ async def set_github_username( str(interaction.user.id) ) if not target_contact: + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="denied", + metadata={ + "target_scope": "self", + "reason": "discord_not_linked", + }, + ) await interaction.followup.send( "❌ Your Discord account is not linked to a CRM contact. " "Please ask a Steering Committee member to link your account first." @@ -1121,6 +1434,15 @@ async def set_github_username( contact_name = target_contact.get("name", "Unknown") if not contact_id: + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="error", + metadata={ + "search_term": search_term, + "error": "contact_id_missing", + }, + ) await interaction.followup.send("❌ Contact ID not found.") return @@ -1166,16 +1488,52 @@ async def set_github_username( f"GitHub username set for {contact_name} (ID: {contact_id}) " f"to @{clean_github_username} by {interaction.user.name}" ) + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="success", + metadata={ + "search_term": search_term, + "github_username": clean_github_username, + "target_scope": "other" if search_term else "self", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) else: + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="error", + metadata={ + "search_term": search_term, + "github_username": clean_github_username, + "error": "crm_update_failed", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) await interaction.followup.send( "❌ Failed to update contact in CRM. Please try again." ) except EspoAPIError as e: logger.error(f"EspoCRM API error in set_github_username: {e}") + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="error", + metadata={"search_term": search_term, "error": str(e)}, + ) await interaction.followup.send(f"❌ CRM API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in set_github_username: {e}") + self._audit_command( + interaction=interaction, + action="crm.set_github_username", + result="error", + metadata={"search_term": search_term, "error": str(e)}, + ) await interaction.followup.send( "❌ An unexpected error occurred while setting the GitHub username." ) @@ -1275,6 +1633,15 @@ async def upload_resume( ) if file_extension not in valid_extensions: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="denied", + metadata={ + "filename": file.filename, + "reason": "invalid_file_type", + }, + ) await interaction.followup.send( f"❌ Invalid file type. Please upload a PDF, DOC, or DOCX file.\nYou uploaded: `{file.filename}`" ) @@ -1283,6 +1650,16 @@ async def upload_resume( # Validate file size (10MB limit) max_size = 10 * 1024 * 1024 # 10MB in bytes if file.size > max_size: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="denied", + metadata={ + "filename": file.filename, + "size_bytes": file.size, + "reason": "file_too_large", + }, + ) await interaction.followup.send( f"❌ File too large. Maximum size is 10MB.\nYour file: {file.size / (1024 * 1024):.1f}MB" ) @@ -1298,6 +1675,16 @@ async def upload_resume( ) or not check_user_roles_with_hierarchy( interaction.user.roles, ["Steering Committee"] ): + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="denied", + metadata={ + "search_term": search_term, + "filename": file.filename, + "reason": "missing_required_role", + }, + ) await interaction.followup.send( "❌ You must have Steering Committee role or higher to upload resumes for other people." ) @@ -1306,11 +1693,34 @@ async def upload_resume( # Search for target contact contacts = await self._search_contact_for_linking(search_term) if not contacts: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="success", + metadata={ + "search_term": search_term, + "filename": file.filename, + "contact_found": False, + "target_scope": "other", + }, + ) await interaction.followup.send( f"❌ No contact found for: `{search_term}`" ) return elif len(contacts) > 1: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="success", + metadata={ + "search_term": search_term, + "filename": file.filename, + "contact_found": False, + "target_scope": "other", + "reason": "multiple_contacts", + }, + ) await interaction.followup.send( f"❌ Multiple contacts found for `{search_term}`. Please be more specific or use the contact ID." ) @@ -1323,6 +1733,16 @@ async def upload_resume( str(interaction.user.id) ) if not target_contact: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="denied", + metadata={ + "filename": file.filename, + "target_scope": "self", + "reason": "discord_not_linked", + }, + ) await interaction.followup.send( "❌ Your Discord account is not linked to a CRM contact. " "Please ask a Steering Committee member to link your account first." @@ -1333,6 +1753,16 @@ async def upload_resume( contact_name = target_contact.get("name", "Unknown") if not contact_id: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="error", + metadata={ + "search_term": search_term, + "filename": file.filename, + "error": "contact_id_missing", + }, + ) await interaction.followup.send("❌ Contact ID not found.") return @@ -1342,6 +1772,19 @@ async def upload_resume( ) if has_duplicate: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="success", + metadata={ + "search_term": search_term, + "filename": file.filename, + "target_scope": "other" if search_term else "self", + "duplicate_found": True, + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) # Show confirmation dialog embed = discord.Embed( title="⚠️ Duplicate Resume Detected", @@ -1385,6 +1828,18 @@ async def upload_resume( attachment_id = attachment.get("id") if not attachment_id: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="error", + metadata={ + "search_term": search_term, + "filename": file.filename, + "error": "attachment_id_missing", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) await interaction.followup.send("❌ Failed to upload file to CRM.") return @@ -1418,19 +1873,68 @@ async def upload_resume( f"Resume uploaded for {contact_name} (ID: {contact_id}) " f"by {interaction.user.name}: {file.filename}" ) + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="success", + metadata={ + "search_term": search_term, + "filename": file.filename, + "size_bytes": file.size, + "overwrite": overwrite, + "target_scope": "other" if search_term else "self", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) else: + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="error", + metadata={ + "search_term": search_term, + "filename": file.filename, + "overwrite": overwrite, + "error": "resume_link_update_failed", + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) await interaction.followup.send( "⚠️ File uploaded but failed to link to contact. Please check CRM manually." ) except EspoAPIError as e: logger.error(f"Failed to upload file to EspoCRM: {e}") + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="error", + metadata={ + "search_term": search_term, + "filename": file.filename, + "error": str(e), + }, + resource_type="crm_contact", + resource_id=str(contact_id), + ) await interaction.followup.send( f"❌ Failed to upload file to CRM: {str(e)}" ) except Exception as e: logger.error(f"Unexpected error in upload_resume: {e}") + self._audit_command( + interaction=interaction, + action="crm.upload_resume", + result="error", + metadata={ + "search_term": search_term, + "filename": file.filename, + "error": str(e), + }, + ) await interaction.followup.send( "❌ An unexpected error occurred while uploading the resume." ) diff --git a/apps/discord_bot/src/five08/discord_bot/config.py b/apps/discord_bot/src/five08/discord_bot/config.py index 43a748d4..07bd52ae 100644 --- a/apps/discord_bot/src/five08/discord_bot/config.py +++ b/apps/discord_bot/src/five08/discord_bot/config.py @@ -34,6 +34,8 @@ class Settings(SharedSettings): # CRM/EspoCRM settings espo_api_key: str espo_base_url: str + audit_api_base_url: str | None = None + audit_api_timeout_seconds: float = 2.0 # Kimai time tracking settings kimai_base_url: str diff --git a/apps/discord_bot/src/five08/discord_bot/utils/audit.py b/apps/discord_bot/src/five08/discord_bot/utils/audit.py new file mode 100644 index 00000000..4fe8f36c --- /dev/null +++ b/apps/discord_bot/src/five08/discord_bot/utils/audit.py @@ -0,0 +1,142 @@ +"""Best-effort audit event writer for Discord user actions.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import discord +import requests + +logger = logging.getLogger(__name__) + + +class DiscordAuditLogger: + """Write human audit events to the worker API without breaking commands.""" + + def __init__( + self, + *, + base_url: str | None, + shared_secret: str | None, + timeout_seconds: float, + ) -> None: + self.base_url = (base_url or "").strip().rstrip("/") + self.shared_secret = (shared_secret or "").strip() + self.timeout_seconds = timeout_seconds + + @property + def enabled(self) -> bool: + """Return whether audit writes are configured and enabled.""" + return bool(self.base_url and self.shared_secret) + + def log_command( + self, + *, + interaction: discord.Interaction, + action: str, + result: str, + metadata: dict[str, Any] | None = None, + resource_type: str | None = "discord_command", + resource_id: str | None = None, + ) -> None: + """Queue a best-effort audit write in the background.""" + if not self.enabled: + return + + event_payload = self._build_payload( + interaction=interaction, + action=action, + result=result, + metadata=metadata, + resource_type=resource_type, + resource_id=resource_id, + ) + + task = asyncio.create_task(self._post_event(event_payload)) + task.add_done_callback(self._on_task_done) + + async def _post_event(self, event_payload: dict[str, Any]) -> None: + await asyncio.to_thread(self._send_event_sync, event_payload) + + def _send_event_sync(self, event_payload: dict[str, Any]) -> None: + if not self.enabled: + return + + url = f"{self.base_url}/audit/events" + headers = { + "X-Webhook-Secret": self.shared_secret, + "Content-Type": "application/json", + } + + try: + response = requests.post( + url, + headers=headers, + json=event_payload, + timeout=self.timeout_seconds, + ) + if response.status_code >= 400: + logger.warning( + "Audit write failed status=%s action=%s body=%s", + response.status_code, + event_payload.get("action"), + response.text[:300], + ) + except Exception as exc: + logger.warning( + "Audit write exception action=%s error=%s", + event_payload.get("action"), + exc, + ) + + def _on_task_done(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception as exc: # pragma: no cover - defensive fallback + logger.warning("Unexpected audit task failure: %s", exc) + + def _build_payload( + self, + *, + interaction: discord.Interaction, + action: str, + result: str, + metadata: dict[str, Any] | None, + resource_type: str | None, + resource_id: str | None, + ) -> dict[str, Any]: + command_name = None + if interaction.command is not None: + command_name = interaction.command.qualified_name + + actor_display_name = getattr(interaction.user, "display_name", None) + if not actor_display_name: + actor_display_name = getattr(interaction.user, "name", None) + + base_metadata: dict[str, Any] = { + "command": command_name, + "guild_id": str(interaction.guild_id) if interaction.guild_id else None, + "channel_id": ( + str(interaction.channel_id) + if interaction.channel_id is not None + else None + ), + "interaction_id": str(interaction.id), + } + if metadata: + base_metadata.update(metadata) + + return { + "source": "discord", + "action": action, + "result": result, + "actor_provider": "discord", + "actor_subject": str(interaction.user.id), + "actor_display_name": actor_display_name, + "resource_type": resource_type, + "resource_id": resource_id, + "correlation_id": str(interaction.id), + "metadata": base_metadata, + } diff --git a/apps/worker/Dockerfile b/apps/worker/Dockerfile index 949f1203..650c0e82 100644 --- a/apps/worker/Dockerfile +++ b/apps/worker/Dockerfile @@ -1,3 +1,4 @@ +# Pinned from ghcr.io/astral-sh/uv:python3.12-bookworm-slim (verified 2026-02-21). FROM ghcr.io/astral-sh/uv@sha256:e5b65587bce7de595f299855d7385fe7fca39b8a74baa261ba1b7147afa78e58 WORKDIR /app diff --git a/apps/worker/pyproject.toml b/apps/worker/pyproject.toml index 74e9b063..1c4ba0b4 100644 --- a/apps/worker/pyproject.toml +++ b/apps/worker/pyproject.toml @@ -7,6 +7,7 @@ dependencies = [ "aiohttp>=3.13.1", "openai>=2.0.0", "alembic==1.18.4", + # Pin intentionally for reproducible broker behavior across worker/api images. "dramatiq[redis]==2.0.1", "pdfminer.six>=20250506", "pydantic~=2.10", diff --git a/apps/worker/src/five08/__init__.py b/apps/worker/src/five08/__init__.py deleted file mode 100644 index c6d851f8..00000000 --- a/apps/worker/src/five08/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Namespace package for five08 worker distribution.""" - -from pkgutil import extend_path - -__path__ = extend_path(__path__, __name__) diff --git a/apps/worker/src/five08/worker/actors.py b/apps/worker/src/five08/worker/actors.py index 6aa029a5..8d63149c 100644 --- a/apps/worker/src/five08/worker/actors.py +++ b/apps/worker/src/five08/worker/actors.py @@ -21,7 +21,12 @@ ) from five08.queue import parse_queue_names from five08.worker.config import settings -from five08.worker.jobs import process_contact_skills_job, process_webhook_event +from five08.worker.jobs import ( + process_contact_skills_job, + process_webhook_event, + sync_people_from_crm_job, + sync_person_from_crm_job, +) from five08.logging import configure_logging @@ -37,6 +42,8 @@ _HANDLERS: dict[str, Any] = { process_webhook_event.__name__: process_webhook_event, process_contact_skills_job.__name__: process_contact_skills_job, + sync_people_from_crm_job.__name__: sync_people_from_crm_job, + sync_person_from_crm_job.__name__: sync_person_from_crm_job, } diff --git a/apps/worker/src/five08/worker/api.py b/apps/worker/src/five08/worker/api.py index 1541a9d7..2e1cc862 100644 --- a/apps/worker/src/five08/worker/api.py +++ b/apps/worker/src/five08/worker/api.py @@ -1,31 +1,50 @@ """Webhook ingest API for enqueuing background jobs.""" import asyncio +import contextlib import logging import secrets from datetime import datetime, timezone +from uuid import uuid4 from aiohttp import web from pydantic import ValidationError +from psycopg import Connection from redis import Redis from five08.logging import configure_logging +from five08.audit import ( + ActorProvider, + AuditEventInput, + AuditResult, + AuditSource, + insert_audit_event, +) from five08.queue import ( EnqueuedJob, QueueClient, enqueue_job, + get_postgres_connection, get_redis_connection, is_postgres_healthy, ) from five08.worker.config import settings from five08.worker.db_migrations import run_job_migrations from five08.worker.dispatcher import build_queue_client -from five08.worker.jobs import process_contact_skills_job, process_webhook_event -from five08.worker.models import EspoCRMWebhookPayload +from five08.worker.jobs import ( + process_contact_skills_job, + process_webhook_event, + sync_people_from_crm_job, + sync_person_from_crm_job, +) +from five08.worker.models import AuditEventPayload, EspoCRMWebhookPayload logger = logging.getLogger(__name__) REDIS_CONN_KEY = web.AppKey("redis_conn", Redis) QUEUE_KEY = web.AppKey("queue", QueueClient) +CRM_SYNC_TASK_KEY = web.AppKey("crm_sync_task", asyncio.Task) +POSTGRES_CONN_KEY = web.AppKey("postgres_conn", Connection) +POSTGRES_CONN_LOCK_KEY = web.AppKey("postgres_conn_lock", asyncio.Lock) def _is_authorized(request: web.Request) -> bool: @@ -46,6 +65,107 @@ def _extract_idempotency_key(value: object) -> str | None: return None +def _crm_sync_idempotency_key(*, now: datetime) -> str: + interval_seconds = max(1, settings.crm_sync_interval_seconds) + bucket = int(now.timestamp()) // interval_seconds + return f"crm-sync:{bucket}" + + +async def _enqueue_full_crm_sync_job(queue: QueueClient, *, reason: str) -> EnqueuedJob: + now = datetime.now(tz=timezone.utc) + job: EnqueuedJob = await asyncio.to_thread( + enqueue_job, + queue=queue, + fn=sync_people_from_crm_job, + args=(), + settings=settings, + idempotency_key=_crm_sync_idempotency_key(now=now), + ) + logger.info( + "Enqueued CRM people full-sync job id=%s created=%s reason=%s", + job.id, + job.created, + reason, + ) + return job + + +async def _crm_sync_scheduler(app: web.Application) -> None: + queue = app[QUEUE_KEY] + interval_seconds = max(1, settings.crm_sync_interval_seconds) + while True: + try: + await _enqueue_full_crm_sync_job(queue, reason="scheduler") + except Exception: + logger.exception("Failed scheduling CRM full-sync job") + await asyncio.sleep(interval_seconds) + + +def _check_postgres_connection(connection: Connection) -> bool: + try: + with connection.cursor() as cursor: + cursor.execute("SELECT 1") + return True + except Exception: + return False + + +async def _is_postgres_connection_healthy(app: web.Application) -> bool: + lock = app[POSTGRES_CONN_LOCK_KEY] + async with lock: + connection = app[POSTGRES_CONN_KEY] + healthy = await asyncio.to_thread(_check_postgres_connection, connection) + if healthy: + return True + + with contextlib.suppress(Exception): + await asyncio.to_thread(connection.close) + + try: + refreshed = await asyncio.to_thread(get_postgres_connection, settings) + except Exception: + return False + + app[POSTGRES_CONN_KEY] = refreshed + return await asyncio.to_thread(_check_postgres_connection, refreshed) + + +def _enqueue_espocrm_batch_sync(queue: QueueClient, event_ids: list[str]) -> None: + for event_id in event_ids: + enqueue_job( + queue=queue, + fn=process_contact_skills_job, + args=(event_id,), + settings=settings, + idempotency_key=f"espocrm:{event_id}", + ) + + +async def _enqueue_espocrm_batch(queue: QueueClient, event_ids: list[str]) -> None: + await asyncio.to_thread(_enqueue_espocrm_batch_sync, queue, event_ids) + + +def _enqueue_espocrm_people_sync_batch_sync( + queue: QueueClient, event_ids: list[str], *, bucket: str +) -> None: + for event_id in event_ids: + enqueue_job( + queue=queue, + fn=sync_person_from_crm_job, + args=(event_id,), + settings=settings, + idempotency_key=f"crm-contact-sync:{event_id}:{bucket}", + ) + + +async def _enqueue_espocrm_people_sync_batch( + queue: QueueClient, event_ids: list[str], *, bucket: str +) -> None: + await asyncio.to_thread( + _enqueue_espocrm_people_sync_batch_sync, queue, event_ids, bucket=bucket + ) + + async def health_handler(request: web.Request) -> web.Response: """Simple health endpoint.""" redis_conn = request.app[REDIS_CONN_KEY] @@ -54,7 +174,10 @@ async def health_handler(request: web.Request) -> web.Response: redis_ok = bool(await asyncio.to_thread(redis_conn.ping)) except Exception: redis_ok = False - postgres_ok = await asyncio.to_thread(is_postgres_healthy, settings) + if POSTGRES_CONN_KEY in request.app: + postgres_ok = await _is_postgres_connection_healthy(request.app) + else: + postgres_ok = await asyncio.to_thread(is_postgres_healthy, settings) return web.json_response( { @@ -125,30 +248,30 @@ async def espocrm_webhook_handler(request: web.Request) -> web.Response: {"error": "invalid_webhook_event", "detail": str(exc)}, status=400 ) + event_ids = [event.id for event in payload.events] + deduped_event_ids = list(dict.fromkeys(event_ids)) queue = request.app[QUEUE_KEY] - jobs: list[dict[str, str]] = [] - for event in payload.events: - job = await asyncio.to_thread( - enqueue_job, - queue=queue, - fn=process_contact_skills_job, - args=(event.id,), - settings=settings, - idempotency_key=f"espocrm:{event.id}", + try: + await _enqueue_espocrm_batch(queue, deduped_event_ids) + except Exception: + logger.exception( + "Failed enqueueing EspoCRM webhook events count=%s queue=%s", + len(deduped_event_ids), + settings.redis_queue_name, ) - jobs.append({"contact_id": event.id, "job_id": job.id}) + return web.json_response({"error": "enqueue_failed"}, status=503) logger.info( - "Enqueued %s EspoCRM contact jobs for queue=%s", - len(jobs), + "Enqueued %s EspoCRM webhook events queue=%s", + len(deduped_event_ids), settings.redis_queue_name, ) return web.json_response( { "status": "queued", "source": "espocrm", - "jobs": jobs, - "events_processed": len(jobs), + "events_received": len(deduped_event_ids), + "events_enqueued": len(deduped_event_ids), }, status=202, ) @@ -165,13 +288,14 @@ async def process_contact_handler(request: web.Request) -> web.Response: queue = request.app[QUEUE_KEY] manual_nonce = datetime.now(tz=timezone.utc).isoformat() + nonce_suffix = uuid4().hex[:12] job = await asyncio.to_thread( enqueue_job, queue=queue, fn=process_contact_skills_job, args=(contact_id,), settings=settings, - idempotency_key=f"manual:{contact_id}:{manual_nonce}", + idempotency_key=f"manual:{contact_id}:{manual_nonce}:{nonce_suffix}", ) logger.info( "Enqueued manual contact job job_id=%s contact_id=%s created=%s", @@ -190,18 +314,151 @@ async def process_contact_handler(request: web.Request) -> web.Response: ) +async def sync_people_handler(request: web.Request) -> web.Response: + """Manual enqueue for a full CRM->people cache sync.""" + if not _is_authorized(request): + return web.json_response({"error": "unauthorized"}, status=401) + + queue = request.app[QUEUE_KEY] + job = await _enqueue_full_crm_sync_job(queue, reason="manual") + return web.json_response( + { + "status": "queued", + "source": "manual", + "job_id": job.id, + "created": job.created, + }, + status=202, + ) + + +async def espocrm_people_sync_webhook_handler(request: web.Request) -> web.Response: + """Queue per-contact people cache sync jobs from CRM webhook events.""" + if not _is_authorized(request): + return web.json_response({"error": "unauthorized"}, status=401) + + try: + payload_data = await request.json() + except Exception: + return web.json_response({"error": "invalid_json"}, status=400) + + if not isinstance(payload_data, list): + return web.json_response( + {"error": "payload_must_be_array_of_events"}, status=400 + ) + + try: + payload = EspoCRMWebhookPayload.from_list(payload_data) + except (ValidationError, TypeError) as exc: + return web.json_response( + {"error": "invalid_webhook_event", "detail": str(exc)}, status=400 + ) + + event_ids = [event.id for event in payload.events] + deduped_event_ids = list(dict.fromkeys(event_ids)) + queue = request.app[QUEUE_KEY] + bucket = datetime.now(tz=timezone.utc).strftime("%Y%m%d%H%M") + try: + await _enqueue_espocrm_people_sync_batch( + queue, deduped_event_ids, bucket=bucket + ) + except Exception: + logger.exception( + "Failed enqueueing EspoCRM people-sync events count=%s queue=%s", + len(deduped_event_ids), + settings.redis_queue_name, + ) + return web.json_response({"error": "enqueue_failed"}, status=503) + + return web.json_response( + { + "status": "queued", + "source": "espocrm_people_sync", + "events_received": len(deduped_event_ids), + "events_enqueued": len(deduped_event_ids), + }, + status=202, + ) + + +async def audit_event_handler(request: web.Request) -> web.Response: + """Persist one human audit event.""" + if not _is_authorized(request): + return web.json_response({"error": "unauthorized"}, status=401) + + try: + payload_data = await request.json() + except Exception: + return web.json_response({"error": "invalid_json"}, status=400) + + if not isinstance(payload_data, dict): + return web.json_response({"error": "payload_must_be_object"}, status=400) + + try: + payload = AuditEventPayload.model_validate(payload_data) + except ValidationError as exc: + return web.json_response( + {"error": "invalid_payload", "detail": str(exc)}, status=400 + ) + + try: + created = await asyncio.to_thread( + insert_audit_event, + settings, + AuditEventInput( + source=AuditSource(payload.source), + action=payload.action, + result=AuditResult(payload.result), + actor_provider=ActorProvider(payload.actor_provider), + actor_subject=payload.actor_subject, + resource_type=payload.resource_type, + resource_id=payload.resource_id, + actor_display_name=payload.actor_display_name, + correlation_id=payload.correlation_id, + metadata=payload.metadata, + occurred_at=payload.occurred_at, + ), + ) + except ValueError as exc: + return web.json_response( + {"error": "invalid_payload", "detail": str(exc)}, status=400 + ) + + return web.json_response( + { + "status": "created", + "event_id": created.id, + "person_id": created.person_id, + }, + status=201, + ) + + async def on_startup(app: web.Application) -> None: """Initialize queue dependencies.""" await asyncio.to_thread(run_job_migrations) redis_conn = get_redis_connection(settings) app[REDIS_CONN_KEY] = redis_conn + app[POSTGRES_CONN_LOCK_KEY] = asyncio.Lock() + app[POSTGRES_CONN_KEY] = await asyncio.to_thread(get_postgres_connection, settings) app[QUEUE_KEY] = build_queue_client() + if settings.crm_sync_enabled: + app[CRM_SYNC_TASK_KEY] = asyncio.create_task(_crm_sync_scheduler(app)) + else: + logger.info("CRM sync scheduler disabled by config") async def on_cleanup(app: web.Application) -> None: """Close Redis connection cleanly.""" redis_conn = app[REDIS_CONN_KEY] redis_conn.close() + if POSTGRES_CONN_KEY in app: + await asyncio.to_thread(app[POSTGRES_CONN_KEY].close) + if CRM_SYNC_TASK_KEY in app: + task = app[CRM_SYNC_TASK_KEY] + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task def create_app() -> web.Application: @@ -212,8 +469,13 @@ def create_app() -> web.Application: app.router.add_get("/", health_handler) app.router.add_get("/health", health_handler) app.router.add_post("/webhooks/espocrm", espocrm_webhook_handler) + app.router.add_post( + "/webhooks/espocrm/people-sync", espocrm_people_sync_webhook_handler + ) app.router.add_post("/webhooks/{source}", ingest_handler) app.router.add_post("/process-contact/{contact_id}", process_contact_handler) + app.router.add_post("/sync/people", sync_people_handler) + app.router.add_post("/audit/events", audit_event_handler) return app diff --git a/apps/worker/src/five08/worker/config.py b/apps/worker/src/five08/worker/config.py index 5e8d7b00..9c4b87b3 100644 --- a/apps/worker/src/five08/worker/config.py +++ b/apps/worker/src/five08/worker/config.py @@ -21,6 +21,9 @@ class WorkerSettings(SharedSettings): allowed_file_types: str = "pdf,doc,docx,txt" resume_keywords: str = "resume,cv,curriculum" max_attachments_per_contact: int = 3 + crm_sync_enabled: bool = True + crm_sync_interval_seconds: int = 900 + crm_sync_page_size: int = 200 @property def allowed_file_extensions(self) -> set[str]: diff --git a/apps/worker/src/five08/worker/crm/people_sync.py b/apps/worker/src/five08/worker/crm/people_sync.py new file mode 100644 index 00000000..19863a2d --- /dev/null +++ b/apps/worker/src/five08/worker/crm/people_sync.py @@ -0,0 +1,260 @@ +"""CRM to local people-cache sync workflow.""" + +from __future__ import annotations + +import logging +import re +from typing import Any + +from five08.audit import PeopleSyncStatus, PersonRecord, upsert_person +from five08.clients.espo import EspoAPI, EspoAPIError +from five08.worker.config import settings + +logger = logging.getLogger(__name__) + +_DISCORD_ID_RE = re.compile(r"\(ID:\s*(\d+)\)") + + +class EspoPeopleSyncClient: + """Fetch contact identity data from EspoCRM.""" + + def __init__(self) -> None: + api_url = settings.espo_base_url.rstrip("/") + "/api/v1" + self.api = EspoAPI(api_url, settings.espo_api_key) + + def list_contact_page( + self, *, offset: int, max_size: int + ) -> tuple[list[dict[str, Any]], int | None]: + """Load one page of contacts for identity sync.""" + params: dict[str, Any] = { + "offset": offset, + "maxSize": max_size, + "select": ( + "id,name,emailAddress,emailAddressData,c508Email," + "cDiscordUsername,cDiscordUserId,cDiscordRoles," + "cGithubUsername,githubUsername" + ), + } + raw = self.api.request("GET", "Contact", params) + contacts = raw.get("list", []) + if not isinstance(contacts, list): + contacts = [] + + total_raw = raw.get("total") + total = total_raw if isinstance(total_raw, int) else None + parsed_contacts = [item for item in contacts if isinstance(item, dict)] + return parsed_contacts, total + + def get_contact(self, contact_id: str) -> dict[str, Any]: + """Load one contact by id.""" + raw = self.api.request("GET", f"Contact/{contact_id}") + if not isinstance(raw, dict): + raise ValueError("Unexpected contact payload from EspoCRM") + return raw + + +class PeopleSyncProcessor: + """Sync local people cache from EspoCRM contacts.""" + + def __init__(self) -> None: + self.client = EspoPeopleSyncClient() + + def sync_all_contacts(self) -> dict[str, Any]: + """Run a paginated full sync into the local people table.""" + synced_count = 0 + failed_ids: list[str] = [] + offset = 0 + page_size = max(1, settings.crm_sync_page_size) + pages = 0 + total_seen = 0 + + while True: + try: + contacts, total = self.client.list_contact_page( + offset=offset, + max_size=page_size, + ) + except EspoAPIError as exc: + logger.error("Failed loading contacts page offset=%s: %s", offset, exc) + break + + if not contacts: + break + + pages += 1 + total_seen += len(contacts) + for raw_contact in contacts: + person = self._to_person_record(raw_contact) + if person is None: + failed_ids.append(str(raw_contact.get("id", "unknown"))) + continue + + try: + upsert_person(settings, person) + synced_count += 1 + except Exception as exc: + contact_id = person.crm_contact_id + failed_ids.append(contact_id) + logger.warning( + "Failed syncing CRM contact id=%s into people cache: %s", + contact_id, + exc, + ) + + offset += len(contacts) + if total is not None and offset >= total: + break + if len(contacts) < page_size: + break + + return { + "synced_count": synced_count, + "failed_count": len(failed_ids), + "failed_contact_ids": failed_ids, + "total_seen": total_seen, + "pages": pages, + } + + def sync_contact(self, contact_id: str) -> dict[str, Any]: + """Sync one contact into the local people table.""" + try: + raw_contact = self.client.get_contact(contact_id) + except EspoAPIError as exc: + logger.error("Failed loading contact id=%s: %s", contact_id, exc) + return { + "contact_id": contact_id, + "synced": False, + "error": str(exc), + } + + person = self._to_person_record(raw_contact) + if person is None: + return { + "contact_id": contact_id, + "synced": False, + "error": "contact_missing_id", + } + + try: + upsert_person(settings, person) + except Exception as exc: + logger.warning("Failed syncing contact id=%s: %s", contact_id, exc) + return { + "contact_id": contact_id, + "synced": False, + "error": str(exc), + } + + return { + "contact_id": contact_id, + "synced": True, + } + + def _to_person_record(self, raw_contact: dict[str, Any]) -> PersonRecord | None: + contact_id = str(raw_contact.get("id", "")).strip() + if not contact_id: + return None + + discord_username = self._discord_username(raw_contact) + discord_user_id = self._discord_user_id(raw_contact, discord_username) + + return PersonRecord( + crm_contact_id=contact_id, + name=_text_or_none(raw_contact.get("name")), + email=self._email(raw_contact), + email_508=self._email_508(raw_contact), + discord_user_id=discord_user_id, + discord_username=discord_username, + discord_roles=self._discord_roles(raw_contact.get("cDiscordRoles")), + github_username=self._github_username(raw_contact), + sync_status=PeopleSyncStatus.ACTIVE, + ) + + def _email(self, raw_contact: dict[str, Any]) -> str | None: + direct = _text_or_none(raw_contact.get("emailAddress")) + if direct: + return direct + + email_data = raw_contact.get("emailAddressData") + if not isinstance(email_data, list): + return None + + for item in email_data: + if not isinstance(item, dict): + continue + candidate = _text_or_none(item.get("emailAddress")) + if candidate and bool(item.get("primary")): + return candidate + + for item in email_data: + if not isinstance(item, dict): + continue + candidate = _text_or_none(item.get("emailAddress")) + if candidate: + return candidate + + return None + + def _email_508(self, raw_contact: dict[str, Any]) -> str | None: + return _text_or_none(raw_contact.get("c508Email")) + + def _discord_username(self, raw_contact: dict[str, Any]) -> str | None: + raw_username = _text_or_none(raw_contact.get("cDiscordUsername")) + if raw_username is None: + raw_username = _text_or_none(raw_contact.get("discordUsername")) + if raw_username is None: + return None + + cleaned = _DISCORD_ID_RE.sub("", raw_username).strip() + return cleaned or None + + def _discord_user_id( + self, + raw_contact: dict[str, Any], + discord_username: str | None, + ) -> str | None: + for key in ("cDiscordUserId", "discordUserId", "cDiscordId"): + candidate = _text_or_none(raw_contact.get(key)) + if candidate: + return candidate + + if discord_username is None: + return None + + raw_username = _text_or_none(raw_contact.get("cDiscordUsername")) + if raw_username is None: + return None + + match = _DISCORD_ID_RE.search(raw_username) + if match is None: + return None + return match.group(1) + + def _discord_roles(self, raw_roles: Any) -> list[str]: + if isinstance(raw_roles, list): + return [_text for item in raw_roles if (_text := _text_or_none(item))] + if isinstance(raw_roles, str): + values = [item.strip() for item in raw_roles.split(",")] + return [value for value in values if value] + if isinstance(raw_roles, dict): + roles: list[str] = [] + for value in raw_roles.values(): + text = _text_or_none(value) + if text: + roles.append(text) + return roles + return [] + + def _github_username(self, raw_contact: dict[str, Any]) -> str | None: + for key in ("cGithubUsername", "githubUsername"): + candidate = _text_or_none(raw_contact.get(key)) + if candidate: + return candidate + return None + + +def _text_or_none(value: Any) -> str | None: + if not isinstance(value, str): + return None + cleaned = value.strip() + return cleaned or None diff --git a/apps/worker/src/five08/worker/crm/processor.py b/apps/worker/src/five08/worker/crm/processor.py index 042e8247..e14e245a 100644 --- a/apps/worker/src/five08/worker/crm/processor.py +++ b/apps/worker/src/five08/worker/crm/processor.py @@ -106,11 +106,11 @@ def process_contact_skills(self, contact_id: str) -> SkillsExtractionResult: source="document_analysis", ) - existing_lower = {item.lower() for item in existing_skills} + existing_lower = {item.casefold() for item in existing_skills} new_skills = [ skill for skill in extracted.skills - if skill.lower() not in existing_lower + if skill.casefold() not in existing_lower ] updated_skills = existing_skills + new_skills diff --git a/apps/worker/src/five08/worker/crm/skills_extractor.py b/apps/worker/src/five08/worker/crm/skills_extractor.py index 840834f4..3cf6e670 100644 --- a/apps/worker/src/five08/worker/crm/skills_extractor.py +++ b/apps/worker/src/five08/worker/crm/skills_extractor.py @@ -98,7 +98,7 @@ def extract_skills(self, resume_text: str) -> ExtractedSkills: def _extract_skills_heuristic(self, resume_text: str) -> ExtractedSkills: """Simple keyword and token-based extraction fallback.""" lowered = resume_text.lower() - token_matches = re.findall(r"\b[a-z][a-z0-9+#\-.]{2,24}\b", lowered) + token_matches = re.findall(r"\b[a-z][a-z0-9+#\-.]{1,24}\b", lowered) detected: set[str] = set() for token in token_matches: if token in COMMON_SKILLS: diff --git a/apps/worker/src/five08/worker/jobs.py b/apps/worker/src/five08/worker/jobs.py index 532cf72c..8ec7d6a0 100644 --- a/apps/worker/src/five08/worker/jobs.py +++ b/apps/worker/src/five08/worker/jobs.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from typing import Any +from five08.worker.crm.people_sync import PeopleSyncProcessor from five08.worker.crm.processor import ContactSkillsProcessor logger = logging.getLogger(__name__) @@ -28,3 +29,19 @@ def process_webhook_event(source: str, payload: dict[str, Any]) -> dict[str, Any "received_at": received_at, "payload_keys": sorted(payload.keys()), } + + +def sync_people_from_crm_job() -> dict[str, Any]: + """Sync a full contacts page-set from CRM into the local people cache.""" + logger.info("Processing CRM people full-sync job") + processor = PeopleSyncProcessor() + result = processor.sync_all_contacts() + return result + + +def sync_person_from_crm_job(contact_id: str) -> dict[str, Any]: + """Sync one CRM contact into the local people cache.""" + logger.info("Processing CRM people sync job contact_id=%s", contact_id) + processor = PeopleSyncProcessor() + result = processor.sync_contact(contact_id) + return result diff --git a/apps/worker/src/five08/worker/migrations/versions/20260221_0100_create_jobs_table.py b/apps/worker/src/five08/worker/migrations/versions/20260221_0100_create_jobs_table.py index 3b131a5d..e000bd74 100644 --- a/apps/worker/src/five08/worker/migrations/versions/20260221_0100_create_jobs_table.py +++ b/apps/worker/src/five08/worker/migrations/versions/20260221_0100_create_jobs_table.py @@ -17,7 +17,10 @@ def upgrade() -> None: op.create_table( "jobs", sa.Column( - "id", postgresql.UUID(as_uuid=True), nullable=False, primary_key=True + "id", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, ), sa.Column("type", sa.Text(), nullable=False), sa.Column("status", sa.Text(), nullable=False), diff --git a/apps/worker/src/five08/worker/migrations/versions/20260221_0200_create_people_and_audit_events.py b/apps/worker/src/five08/worker/migrations/versions/20260221_0200_create_people_and_audit_events.py new file mode 100644 index 00000000..c7053681 --- /dev/null +++ b/apps/worker/src/five08/worker/migrations/versions/20260221_0200_create_people_and_audit_events.py @@ -0,0 +1,189 @@ +"""Create people cache and human audit tables.""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "202602210200" +down_revision = "202602210100" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create people and audit_events tables with updated_at triggers.""" + op.create_table( + "people", + sa.Column( + "id", postgresql.UUID(as_uuid=True), nullable=False, primary_key=True + ), + sa.Column("crm_contact_id", sa.Text(), nullable=False), + sa.Column("name", sa.Text(), nullable=True), + sa.Column("email", sa.Text(), nullable=True), + sa.Column("email_508", sa.Text(), nullable=True), + sa.Column("discord_user_id", sa.Text(), nullable=True), + sa.Column("discord_username", sa.Text(), nullable=True), + sa.Column( + "discord_roles", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'[]'::jsonb"), + ), + sa.Column("github_username", sa.Text(), nullable=True), + sa.Column( + "sync_status", + sa.Text(), + nullable=False, + server_default=sa.text("'active'"), + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + sa.CheckConstraint( + "sync_status IN ('active', 'missing_in_crm', 'conflict')", + name="ck_people_sync_status", + ), + sa.UniqueConstraint("crm_contact_id", name="uq_people_crm_contact_id"), + sa.UniqueConstraint("discord_user_id", name="uq_people_discord_user_id"), + ) + + op.create_index("idx_people_email", "people", ["email"]) + op.create_index("idx_people_email_508", "people", ["email_508"]) + op.create_index("idx_people_discord_user_id", "people", ["discord_user_id"]) + + op.create_table( + "audit_events", + sa.Column( + "id", postgresql.UUID(as_uuid=True), nullable=False, primary_key=True + ), + sa.Column( + "occurred_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + sa.Column("source", sa.Text(), nullable=False), + sa.Column("action", sa.Text(), nullable=False), + sa.Column("resource_type", sa.Text(), nullable=True), + sa.Column("resource_id", sa.Text(), nullable=True), + sa.Column("result", sa.Text(), nullable=False), + sa.Column("actor_provider", sa.Text(), nullable=False), + sa.Column("actor_subject", sa.Text(), nullable=False), + sa.Column("actor_display_name", sa.Text(), nullable=True), + sa.Column("person_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("correlation_id", sa.Text(), nullable=True), + sa.Column( + "metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + sa.CheckConstraint( + "source IN ('discord', 'admin_dashboard')", + name="ck_audit_events_source", + ), + sa.CheckConstraint( + "result IN ('success', 'denied', 'error')", + name="ck_audit_events_result", + ), + sa.CheckConstraint( + "actor_provider IN ('discord', 'admin_sso')", + name="ck_audit_events_actor_provider", + ), + sa.ForeignKeyConstraint(["person_id"], ["people.id"], ondelete="SET NULL"), + ) + + op.create_index("idx_audit_events_occurred_at", "audit_events", ["occurred_at"]) + op.create_index( + "idx_audit_events_source_action", + "audit_events", + ["source", "action", "occurred_at"], + ) + op.create_index( + "idx_audit_events_actor_lookup", + "audit_events", + ["actor_provider", "actor_subject", "occurred_at"], + ) + op.create_index("idx_audit_events_person_id", "audit_events", ["person_id"]) + + op.execute( + """ + CREATE FUNCTION people_set_updated_at_fn() + RETURNS TRIGGER AS $$ + BEGIN + NEW.updated_at = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + op.execute( + """ + CREATE TRIGGER people_set_updated_at_tr + BEFORE UPDATE ON people + FOR EACH ROW + EXECUTE FUNCTION people_set_updated_at_fn(); + """ + ) + + op.execute( + """ + CREATE FUNCTION audit_events_set_updated_at_fn() + RETURNS TRIGGER AS $$ + BEGIN + NEW.updated_at = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + op.execute( + """ + CREATE TRIGGER audit_events_set_updated_at_tr + BEFORE UPDATE ON audit_events + FOR EACH ROW + EXECUTE FUNCTION audit_events_set_updated_at_fn(); + """ + ) + + +def downgrade() -> None: + """Drop people and audit_events tables.""" + op.execute("DROP TRIGGER IF EXISTS audit_events_set_updated_at_tr ON audit_events") + op.execute("DROP FUNCTION IF EXISTS audit_events_set_updated_at_fn()") + op.execute("DROP TRIGGER IF EXISTS people_set_updated_at_tr ON people") + op.execute("DROP FUNCTION IF EXISTS people_set_updated_at_fn()") + + op.drop_index("idx_audit_events_person_id", table_name="audit_events") + op.drop_index("idx_audit_events_actor_lookup", table_name="audit_events") + op.drop_index("idx_audit_events_source_action", table_name="audit_events") + op.drop_index("idx_audit_events_occurred_at", table_name="audit_events") + op.drop_table("audit_events") + + op.drop_index("idx_people_discord_user_id", table_name="people") + op.drop_index("idx_people_email_508", table_name="people") + op.drop_index("idx_people_email", table_name="people") + op.drop_table("people") diff --git a/apps/worker/src/five08/worker/models.py b/apps/worker/src/five08/worker/models.py index 93d3442e..5d07e8b2 100644 --- a/apps/worker/src/five08/worker/models.py +++ b/apps/worker/src/five08/worker/models.py @@ -1,5 +1,7 @@ """Typed models for worker webhook and skills processing flows.""" +from datetime import datetime +from typing import Literal from typing import Any from pydantic import BaseModel, Field @@ -53,3 +55,19 @@ class SkillsExtractionResult(BaseModel): updated_skills: list[str] success: bool error: str | None = None + + +class AuditEventPayload(BaseModel): + """Inbound payload for creating a human audit event.""" + + source: Literal["discord", "admin_dashboard"] + action: str = Field(..., min_length=1) + result: Literal["success", "denied", "error"] = "success" + actor_provider: Literal["discord", "admin_sso"] + actor_subject: str = Field(..., min_length=1) + resource_type: str | None = None + resource_id: str | None = None + actor_display_name: str | None = None + correlation_id: str | None = None + metadata: dict[str, Any] = Field(default_factory=dict) + occurred_at: datetime | None = None diff --git a/docker-compose.yml b/docker-compose.yml index ade41cd7..987e6acb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} restart: unless-stopped ports: - - "${POSTGRES_PORT:-5432}:5432" + - "${POSTGRES_HOST_BIND:-127.0.0.1}:${POSTGRES_PORT:-5432}:5432" volumes: - postgres-data:/var/lib/postgresql/data healthcheck: @@ -69,8 +69,6 @@ services: depends_on: redis: condition: service_healthy - postgres: - condition: service_healthy worker-api: build: @@ -82,7 +80,10 @@ services: environment: REDIS_URL: ${REDIS_URL:-redis://redis:6379/0} REDIS_QUEUE_NAME: ${REDIS_QUEUE_NAME:-jobs.default} - POSTGRES_URL: ${POSTGRES_URL:-postgresql://postgres:postgres@postgres:5432/workflows} + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} + POSTGRES_DB: ${POSTGRES_DB:-workflows} + POSTGRES_URL: ${POSTGRES_URL} JOB_MAX_ATTEMPTS: ${JOB_MAX_ATTEMPTS:-8} JOB_RETRY_BASE_SECONDS: ${JOB_RETRY_BASE_SECONDS:-5} JOB_RETRY_MAX_SECONDS: ${JOB_RETRY_MAX_SECONDS:-300} @@ -98,6 +99,8 @@ services: condition: service_healthy postgres: condition: service_healthy + minio-init: + condition: service_completed_successfully ports: - "${WEBHOOK_INGEST_PORT:-8090}:${WEBHOOK_INGEST_PORT:-8090}" @@ -112,7 +115,10 @@ services: REDIS_URL: ${REDIS_URL:-redis://redis:6379/0} REDIS_QUEUE_NAME: ${REDIS_QUEUE_NAME:-jobs.default} WORKER_QUEUE_NAMES: ${WORKER_QUEUE_NAMES:-jobs.default} - POSTGRES_URL: ${POSTGRES_URL:-postgresql://postgres:postgres@postgres:5432/workflows} + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} + POSTGRES_DB: ${POSTGRES_DB:-workflows} + POSTGRES_URL: ${POSTGRES_URL} JOB_MAX_ATTEMPTS: ${JOB_MAX_ATTEMPTS:-8} JOB_RETRY_BASE_SECONDS: ${JOB_RETRY_BASE_SECONDS:-5} JOB_RETRY_MAX_SECONDS: ${JOB_RETRY_MAX_SECONDS:-300} @@ -128,6 +134,8 @@ services: condition: service_healthy postgres: condition: service_healthy + minio-init: + condition: service_completed_successfully volumes: redis-data: diff --git a/packages/shared/src/five08/__init__.py b/packages/shared/src/five08/__init__.py deleted file mode 100644 index cce18bd5..00000000 --- a/packages/shared/src/five08/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Shared five08 namespace package used by bot and worker services.""" - -from pkgutil import extend_path - -__path__ = extend_path(__path__, __name__) diff --git a/packages/shared/src/five08/audit.py b/packages/shared/src/five08/audit.py new file mode 100644 index 00000000..a4c84e2b --- /dev/null +++ b/packages/shared/src/five08/audit.py @@ -0,0 +1,257 @@ +"""Audit and people-cache persistence helpers.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import StrEnum +from typing import Any +from uuid import uuid4 + +from psycopg.rows import dict_row +from psycopg.types.json import Jsonb + +from five08.queue import get_postgres_connection +from five08.settings import SharedSettings + + +class AuditSource(StrEnum): + """Supported human action sources.""" + + DISCORD = "discord" + ADMIN_DASHBOARD = "admin_dashboard" + + +class AuditResult(StrEnum): + """Outcome values for audit events.""" + + SUCCESS = "success" + DENIED = "denied" + ERROR = "error" + + +class ActorProvider(StrEnum): + """Identity providers used to resolve a person.""" + + DISCORD = "discord" + ADMIN_SSO = "admin_sso" + + +class PeopleSyncStatus(StrEnum): + """States for CRM-backed people cache records.""" + + ACTIVE = "active" + MISSING_IN_CRM = "missing_in_crm" + CONFLICT = "conflict" + + +@dataclass(frozen=True) +class PersonRecord: + """Normalized people cache row shape.""" + + crm_contact_id: str + name: str | None = None + email: str | None = None + email_508: str | None = None + discord_user_id: str | None = None + discord_username: str | None = None + discord_roles: list[str] | None = None + github_username: str | None = None + sync_status: PeopleSyncStatus = PeopleSyncStatus.ACTIVE + + +@dataclass(frozen=True) +class AuditEventInput: + """Input payload for writing one audit event.""" + + source: AuditSource + action: str + result: AuditResult + actor_provider: ActorProvider + actor_subject: str + resource_type: str | None = None + resource_id: str | None = None + actor_display_name: str | None = None + correlation_id: str | None = None + metadata: dict[str, Any] | None = None + occurred_at: datetime | None = None + + +@dataclass(frozen=True) +class CreatedAuditEvent: + """Insert result payload for one audit event.""" + + id: str + person_id: str | None + + +def _normalize_email(value: str | None) -> str | None: + if value is None: + return None + normalized = value.strip().lower() + return normalized or None + + +def normalize_actor_subject(provider: ActorProvider, subject: str) -> str: + """Normalize actor subject values for stable lookups.""" + normalized = subject.strip() + if provider == ActorProvider.ADMIN_SSO: + email = _normalize_email(normalized) + if email is None: + raise ValueError("actor_subject is required for admin_sso") + return email + if not normalized: + raise ValueError("actor_subject is required") + return normalized + + +def upsert_person(settings: SharedSettings, person: PersonRecord) -> str: + """Insert or update one people cache record.""" + person_id = str(uuid4()) + query = """ + INSERT INTO people ( + id, + crm_contact_id, + name, + email, + email_508, + discord_user_id, + discord_username, + discord_roles, + github_username, + sync_status + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (crm_contact_id) DO UPDATE + SET + name = EXCLUDED.name, + email = EXCLUDED.email, + email_508 = EXCLUDED.email_508, + discord_user_id = EXCLUDED.discord_user_id, + discord_username = EXCLUDED.discord_username, + discord_roles = EXCLUDED.discord_roles, + github_username = EXCLUDED.github_username, + sync_status = EXCLUDED.sync_status + RETURNING id::text; + """ + roles = person.discord_roles or [] + + with get_postgres_connection(settings) as conn: + with conn.cursor(row_factory=dict_row) as cursor: + cursor.execute( + query, + ( + person_id, + person.crm_contact_id, + person.name, + _normalize_email(person.email), + _normalize_email(person.email_508), + person.discord_user_id, + person.discord_username, + Jsonb(roles), + person.github_username, + person.sync_status.value, + ), + ) + row = cursor.fetchone() + + if row is None: + raise RuntimeError("Failed to upsert person record") + + return row["id"] + + +def resolve_person_id( + settings: SharedSettings, + *, + actor_provider: ActorProvider, + actor_subject: str, +) -> str | None: + """Resolve a person id from audit actor provider + subject.""" + normalized_subject = normalize_actor_subject(actor_provider, actor_subject) + + if actor_provider == ActorProvider.DISCORD: + query = """ + SELECT id::text + FROM people + WHERE discord_user_id = %s + LIMIT 1; + """ + params: tuple[str, ...] = (normalized_subject,) + else: + query = """ + SELECT id::text + FROM people + WHERE lower(email_508) = %s OR lower(email) = %s + LIMIT 1; + """ + params = (normalized_subject, normalized_subject) + + with get_postgres_connection(settings) as conn: + with conn.cursor(row_factory=dict_row) as cursor: + cursor.execute(query, params) + row = cursor.fetchone() + + if row is None: + return None + return row["id"] + + +def insert_audit_event( + settings: SharedSettings, + payload: AuditEventInput, +) -> CreatedAuditEvent: + """Insert one human audit event.""" + event_id = str(uuid4()) + occurred_at = payload.occurred_at + if occurred_at is None: + occurred_at = datetime.now(tz=timezone.utc) + + person_id = resolve_person_id( + settings, + actor_provider=payload.actor_provider, + actor_subject=payload.actor_subject, + ) + normalized_subject = normalize_actor_subject( + payload.actor_provider, payload.actor_subject + ) + + query = """ + INSERT INTO audit_events ( + id, + occurred_at, + source, + action, + resource_type, + resource_id, + result, + actor_provider, + actor_subject, + actor_display_name, + person_id, + correlation_id, + metadata + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + """ + + with get_postgres_connection(settings) as conn: + with conn.cursor() as cursor: + cursor.execute( + query, + ( + event_id, + occurred_at, + payload.source.value, + payload.action, + payload.resource_type, + payload.resource_id, + payload.result.value, + payload.actor_provider.value, + normalized_subject, + payload.actor_display_name, + person_id, + payload.correlation_id, + Jsonb(payload.metadata or {}), + ), + ) + + return CreatedAuditEvent(id=event_id, person_id=person_id) diff --git a/packages/shared/src/five08/queue.py b/packages/shared/src/five08/queue.py index 81fe4c1c..06d2397d 100644 --- a/packages/shared/src/five08/queue.py +++ b/packages/shared/src/five08/queue.py @@ -287,7 +287,12 @@ def mark_job_retry( run_after: datetime, last_error: str, ) -> None: - """Keep record of a failed retryable attempt.""" + """Record a retryable failure using `_mark_job` with `JobStatus.FAILED`. + + This marks a non-terminal failure state while attempts are still below the + max-attempts threshold. Callers should use this for retry scheduling paths; + terminal failures should use `mark_job_dead`, which writes `JobStatus.DEAD`. + """ _mark_job( settings, job_id, diff --git a/packages/shared/src/five08/settings.py b/packages/shared/src/five08/settings.py index 721e0031..efdb7e49 100644 --- a/packages/shared/src/five08/settings.py +++ b/packages/shared/src/five08/settings.py @@ -1,5 +1,6 @@ """Shared configuration settings across services.""" +from pydantic import model_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -13,6 +14,7 @@ def normalize_sqlalchemy_postgres_url(url: str) -> str: class SharedSettings(BaseSettings): """Base settings shared by all services in the monorepo.""" + runtime_env: str = "local" log_level: str = "INFO" redis_url: str = "redis://redis:6379/0" # Docker Compose default; set REDIS_URL when running outside Compose. @@ -20,7 +22,7 @@ class SharedSettings(BaseSettings): redis_key_prefix: str = "jobs" redis_socket_connect_timeout: float | None = 5.0 redis_socket_timeout: float | None = 5.0 - postgres_url: str = "postgresql://postgres:postgres@postgres:5432/workflows" + postgres_url: str = "postgresql://postgres@postgres:5432/workflows" job_max_attempts: int = 8 job_retry_base_seconds: int = 5 job_retry_max_seconds: int = 300 @@ -28,7 +30,7 @@ class SharedSettings(BaseSettings): job_result_ttl_seconds: int = 3600 minio_endpoint: str = "http://minio:9000" minio_root_user: str = "internal" - minio_root_password: str = "change-me" + minio_root_password: str = "" minio_internal_bucket: str = "internal-transfers" webhook_ingest_host: str = "0.0.0.0" @@ -37,6 +39,21 @@ class SharedSettings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="ignore") + @model_validator(mode="after") + def validate_required_secrets(self) -> "SharedSettings": + """Require non-empty runtime secrets in non-local runtime environments.""" + env = self.runtime_env.strip().lower() + if env in {"local", "dev", "development", "test"}: + return self + + if not self.postgres_url.strip(): + raise ValueError("POSTGRES_URL must be set when RUNTIME_ENV is non-local.") + if not self.minio_root_password.strip(): + raise ValueError( + "MINIO_ROOT_PASSWORD must be set when RUNTIME_ENV is non-local." + ) + return self + @property def minio_access_key(self) -> str: """Access key alias for MinIO clients using the old naming.""" diff --git a/pyproject.toml b/pyproject.toml index c3b16d29..3a29359b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,3 +62,12 @@ exclude_lines = [ "if 0:", "if __name__ == .__main__.:", ] + +[tool.mypy] +namespace_packages = true +explicit_package_bases = true +mypy_path = [ + "apps/discord_bot/src", + "apps/worker/src", + "packages/shared/src", +] diff --git a/tests/integration/test_email_monitor.py b/tests/integration/test_email_monitor.py index ca418bbb..78b2d0b7 100644 --- a/tests/integration/test_email_monitor.py +++ b/tests/integration/test_email_monitor.py @@ -26,40 +26,38 @@ def email_monitor(self, mock_bot): monitor.task_poll_inbox.is_running = Mock(return_value=False) return monitor - # IMAP integration tests removed - too complex to mock properly - # The core functionality is tested via unit tests and command tests + @pytest.fixture + def email_monitor_real_poll(self, mock_bot): + """Create an EmailMonitor instance with the real poll coroutine.""" + return EmailMonitor(mock_bot) @pytest.mark.asyncio async def test_poll_inbox_handles_imap_errors( - self, email_monitor, mock_discord_channel, capfd + self, email_monitor_real_poll, mock_discord_channel ): """Test that IMAP errors are handled gracefully.""" - email_monitor.bot.get_channel.return_value = mock_discord_channel + email_monitor_real_poll.bot.get_channel.return_value = mock_discord_channel with patch( "imaplib.IMAP4_SSL", side_effect=imaplib.IMAP4.error("Connection failed") ): - # Should not raise an exception - await email_monitor.task_poll_inbox() + # IMAP transport errors currently bubble from the poll loop. + with pytest.raises(imaplib.IMAP4.error): + await EmailMonitor.task_poll_inbox.coro(email_monitor_real_poll) @pytest.mark.asyncio async def test_poll_inbox_handles_email_parsing_errors( - self, email_monitor, mock_discord_channel, mock_imap_server + self, email_monitor_real_poll, mock_discord_channel, mock_imap_server ): """Test handling of malformed email messages.""" - email_monitor.bot.get_channel.return_value = mock_discord_channel + email_monitor_real_poll.bot.get_channel.return_value = mock_discord_channel mock_imap_server.search.return_value = ("OK", [b"1"]) # Malformed email data mock_imap_server.fetch.return_value = ("OK", [(None, b"malformed email data")]) with patch("imaplib.IMAP4_SSL", return_value=mock_imap_server): - # Should handle parsing errors gracefully - try: - await email_monitor.task_poll_inbox() - except Exception: - # Some parsing errors might still be raised, which is acceptable - pass + await EmailMonitor.task_poll_inbox.coro(email_monitor_real_poll) @pytest.mark.asyncio async def test_cog_unload_cancels_task(self, email_monitor): diff --git a/tests/unit/test_discord_audit.py b/tests/unit/test_discord_audit.py new file mode 100644 index 00000000..5871ceca --- /dev/null +++ b/tests/unit/test_discord_audit.py @@ -0,0 +1,57 @@ +"""Unit tests for Discord audit helper.""" + +from unittest.mock import Mock, patch + +from five08.discord_bot.utils.audit import DiscordAuditLogger + + +def _mock_interaction() -> Mock: + interaction = Mock() + interaction.id = 123456789 + interaction.guild_id = 987654321 + interaction.channel_id = 555 + interaction.command = Mock() + interaction.command.qualified_name = "search-members" + interaction.user = Mock() + interaction.user.id = 42 + interaction.user.display_name = "Test User" + interaction.user.name = "testuser" + return interaction + + +def test_audit_logger_disabled_without_config() -> None: + """Logger should no-op when base URL/secret are not configured.""" + logger = DiscordAuditLogger(base_url=None, shared_secret=None, timeout_seconds=1.0) + interaction = _mock_interaction() + + logger.log_command( + interaction=interaction, + action="crm.search_members", + result="success", + ) + + assert logger.enabled is False + + +def test_send_event_sync_logs_warning_on_request_error() -> None: + """Request exceptions should be logged as warnings and not raised.""" + logger = DiscordAuditLogger( + base_url="http://worker-api:8090", + shared_secret="secret", + timeout_seconds=1.0, + ) + payload = logger._build_payload( + interaction=_mock_interaction(), + action="crm.search_members", + result="success", + metadata={"query": "python"}, + resource_type="discord_command", + resource_id=None, + ) + + with patch("five08.discord_bot.utils.audit.requests.post") as mock_post: + with patch("five08.discord_bot.utils.audit.logger.warning") as mock_warning: + mock_post.side_effect = RuntimeError("network down") + logger._send_event_sync(payload) + + mock_warning.assert_called_once() diff --git a/tests/unit/test_shared_queue.py b/tests/unit/test_shared_queue.py index b631aa75..83db72be 100644 --- a/tests/unit/test_shared_queue.py +++ b/tests/unit/test_shared_queue.py @@ -2,7 +2,7 @@ from unittest.mock import Mock, patch -from five08.queue import enqueue_job +from five08.queue import JobStatus, _parse_status, enqueue_job from five08.settings import SharedSettings @@ -19,3 +19,16 @@ def test_enqueue_job_persists_and_dispatches_to_queue_client() -> None: queue.enqueue.assert_called_once_with("job-1", run_at=None) assert result.id == "job-1" assert result.created is True + + +def test_parse_status_handles_unknown_values() -> None: + """Unknown DB status should fallback to FAILED and emit a warning.""" + assert _parse_status("queued") == JobStatus.QUEUED + + with patch("five08.queue.logger.warning") as mock_warning: + result = _parse_status("unexpected-status") + + assert result == JobStatus.FAILED + mock_warning.assert_called_once_with( + "Unknown job status from DB: %s", "unexpected-status" + ) diff --git a/tests/unit/test_shared_settings.py b/tests/unit/test_shared_settings.py new file mode 100644 index 00000000..b42af7f2 --- /dev/null +++ b/tests/unit/test_shared_settings.py @@ -0,0 +1,27 @@ +"""Unit tests for shared settings validation.""" + +import pytest +from pydantic import ValidationError + +from five08.settings import SharedSettings + + +def test_non_local_settings_accept_explicit_values() -> None: + """Non-local settings should validate when values are provided directly.""" + settings = SharedSettings( + runtime_env="production", + postgres_url="postgresql://user:pass@db.example.com:5432/workflows", + minio_root_password="secret", + ) + + assert settings.runtime_env == "production" + + +def test_non_local_settings_require_non_empty_secrets() -> None: + """Non-local settings should reject empty runtime secret values.""" + with pytest.raises(ValidationError, match="MINIO_ROOT_PASSWORD must be set"): + SharedSettings( + runtime_env="production", + postgres_url="postgresql://user:pass@db.example.com:5432/workflows", + minio_root_password=" ", + ) diff --git a/tests/unit/test_skills_extractor.py b/tests/unit/test_skills_extractor.py new file mode 100644 index 00000000..4ee0fa44 --- /dev/null +++ b/tests/unit/test_skills_extractor.py @@ -0,0 +1,12 @@ +"""Unit tests for heuristic skills extraction.""" + +from five08.worker.crm.skills_extractor import SkillsExtractor + + +def test_heuristic_extractor_includes_two_letter_go_skill() -> None: + """Heuristic extraction should include two-letter skill tokens like go.""" + extractor = SkillsExtractor() + result = extractor._extract_skills_heuristic("Built services in Go and Python") + + assert "go" in result.skills + assert "python" in result.skills diff --git a/tests/unit/test_worker_api.py b/tests/unit/test_worker_api.py index 2daa7f23..061b477b 100644 --- a/tests/unit/test_worker_api.py +++ b/tests/unit/test_worker_api.py @@ -104,7 +104,7 @@ async def test_ingest_handler_rejects_non_object_payload( async def test_espocrm_webhook_handler_enqueues_contact_jobs( auth_headers: dict[str, str], ) -> None: - """EspoCRM webhook should enqueue one job per event.""" + """EspoCRM webhook should enqueue before responding.""" app_obj = web.Application() app_obj[api.QUEUE_KEY] = Mock() request = make_mocked_request( @@ -112,17 +112,13 @@ async def test_espocrm_webhook_handler_enqueues_contact_jobs( ) request.json = AsyncMock(return_value=[{"id": "c-1"}, {"id": "c-2"}]) # type: ignore[method-assign] - with patch("five08.worker.api.enqueue_job") as mock_enqueue: - mock_enqueue.side_effect = [Mock(id="job-1"), Mock(id="job-2")] + with patch("five08.worker.api._enqueue_espocrm_batch", new_callable=AsyncMock): response = await api.espocrm_webhook_handler(request) payload = json.loads(response.text) assert response.status == 202 - assert payload["events_processed"] == 2 - assert payload["jobs"] == [ - {"contact_id": "c-1", "job_id": "job-1"}, - {"contact_id": "c-2", "job_id": "job-2"}, - ] + assert payload["events_received"] == 2 + assert payload["events_enqueued"] == 2 @pytest.mark.asyncio @@ -167,3 +163,134 @@ async def test_process_contact_handler_enqueues_single_contact( assert response.status == 202 assert payload["contact_id"] == "c-123" assert payload["job_id"] == "job-123" + + +@pytest.mark.asyncio +async def test_sync_people_handler_enqueues_full_sync( + auth_headers: dict[str, str], +) -> None: + """Manual people-sync endpoint should enqueue one full sync job.""" + app_obj = web.Application() + app_obj[api.QUEUE_KEY] = Mock() + request = make_mocked_request( + "POST", "/sync/people", app=app_obj, headers=auth_headers + ) + + with patch("five08.worker.api._enqueue_full_crm_sync_job") as mock_enqueue: + mock_enqueue.return_value = Mock(id="job-sync", created=True) + response = await api.sync_people_handler(request) + + payload = json.loads(response.text) + assert response.status == 202 + assert payload["job_id"] == "job-sync" + assert payload["created"] is True + + +@pytest.mark.asyncio +async def test_espocrm_people_sync_webhook_handler_enqueues_contact_jobs( + auth_headers: dict[str, str], +) -> None: + """People sync webhook should enqueue before responding.""" + app_obj = web.Application() + app_obj[api.QUEUE_KEY] = Mock() + request = make_mocked_request( + "POST", + "/webhooks/espocrm/people-sync", + app=app_obj, + headers=auth_headers, + ) + request.json = AsyncMock(return_value=[{"id": "c-1"}, {"id": "c-2"}]) # type: ignore[method-assign] + + with patch( + "five08.worker.api._enqueue_espocrm_people_sync_batch", + new_callable=AsyncMock, + ): + response = await api.espocrm_people_sync_webhook_handler(request) + + payload = json.loads(response.text) + assert response.status == 202 + assert payload["events_received"] == 2 + assert payload["events_enqueued"] == 2 + + +@pytest.mark.asyncio +async def test_espocrm_webhook_handler_returns_503_on_enqueue_failure( + auth_headers: dict[str, str], +) -> None: + """EspoCRM webhook should fail when enqueue persistence fails.""" + app_obj = web.Application() + app_obj[api.QUEUE_KEY] = Mock() + request = make_mocked_request( + "POST", "/webhooks/espocrm", app=app_obj, headers=auth_headers + ) + request.json = AsyncMock(return_value=[{"id": "c-1"}]) # type: ignore[method-assign] + + with patch( + "five08.worker.api._enqueue_espocrm_batch", + new=AsyncMock(side_effect=RuntimeError("boom")), + ): + response = await api.espocrm_webhook_handler(request) + + payload = json.loads(response.text) + assert response.status == 503 + assert payload["error"] == "enqueue_failed" + + +@pytest.mark.asyncio +async def test_espocrm_people_sync_webhook_handler_returns_503_on_enqueue_failure( + auth_headers: dict[str, str], +) -> None: + """People sync webhook should fail when enqueue persistence fails.""" + app_obj = web.Application() + app_obj[api.QUEUE_KEY] = Mock() + request = make_mocked_request( + "POST", + "/webhooks/espocrm/people-sync", + app=app_obj, + headers=auth_headers, + ) + request.json = AsyncMock(return_value=[{"id": "c-1"}]) # type: ignore[method-assign] + + with patch( + "five08.worker.api._enqueue_espocrm_people_sync_batch", + new=AsyncMock(side_effect=RuntimeError("boom")), + ): + response = await api.espocrm_people_sync_webhook_handler(request) + + payload = json.loads(response.text) + assert response.status == 503 + assert payload["error"] == "enqueue_failed" + + +@pytest.mark.asyncio +async def test_audit_event_handler_persists_human_event( + auth_headers: dict[str, str], +) -> None: + """Audit events endpoint should persist one validated event.""" + app_obj = web.Application() + request = make_mocked_request( + "POST", + "/audit/events", + app=app_obj, + headers=auth_headers, + ) + request.json = AsyncMock( # type: ignore[method-assign] + return_value={ + "source": "discord", + "action": "crm.search", + "result": "success", + "actor_provider": "discord", + "actor_subject": "12345", + "actor_display_name": "johnny", + "metadata": {"query": "python"}, + } + ) + + with patch("five08.worker.api.insert_audit_event") as mock_insert: + mock_insert.return_value = Mock(id="evt-1", person_id="person-1") + response = await api.audit_event_handler(request) + + payload = json.loads(response.text) + assert response.status == 201 + assert payload["event_id"] == "evt-1" + assert payload["person_id"] == "person-1" diff --git a/tests/unit/test_worker_models.py b/tests/unit/test_worker_models.py index 0ef2a6db..46c3c057 100644 --- a/tests/unit/test_worker_models.py +++ b/tests/unit/test_worker_models.py @@ -1,6 +1,6 @@ """Unit tests for worker models.""" -from five08.worker.models import EspoCRMWebhookPayload +from five08.worker.models import AuditEventPayload, EspoCRMWebhookPayload def test_espocrm_webhook_payload_from_list() -> None: @@ -11,3 +11,15 @@ def test_espocrm_webhook_payload_from_list() -> None: assert len(payload.events) == 2 assert payload.events[0].id == "contact-1" assert payload.events[0].name == "Jane" + + +def test_audit_event_payload_defaults_metadata() -> None: + """Audit payload should default metadata to an empty object.""" + payload = AuditEventPayload( + source="discord", + action="crm.search", + result="success", + actor_provider="discord", + actor_subject="12345", + ) + assert payload.metadata == {} diff --git a/tests/unit/test_worker_people_sync.py b/tests/unit/test_worker_people_sync.py new file mode 100644 index 00000000..88c18be8 --- /dev/null +++ b/tests/unit/test_worker_people_sync.py @@ -0,0 +1,46 @@ +"""Unit tests for CRM people sync normalizers.""" + +from five08.worker.crm.people_sync import PeopleSyncProcessor + + +def test_to_person_record_parses_discord_snapshot_fields() -> None: + """People sync should parse Discord display + ID and role list.""" + processor = PeopleSyncProcessor() + + person = processor._to_person_record( + { + "id": "contact-1", + "name": "Jane Doe", + "emailAddress": "jane@example.com", + "c508Email": "jane@508.dev", + "cDiscordUsername": "jane#1234 (ID: 987654321)", + "cDiscordRoles": "Member, Admin", + "cGithubUsername": "janedoe", + } + ) + + assert person is not None + assert person.crm_contact_id == "contact-1" + assert person.discord_username == "jane#1234" + assert person.discord_user_id == "987654321" + assert person.discord_roles == ["Member", "Admin"] + assert person.github_username == "janedoe" + + +def test_email_falls_back_to_email_address_data() -> None: + """People sync should use primary emailAddressData when emailAddress is missing.""" + processor = PeopleSyncProcessor() + + person = processor._to_person_record( + { + "id": "contact-2", + "name": "John Doe", + "emailAddressData": [ + {"emailAddress": "secondary@example.com", "primary": False}, + {"emailAddress": "primary@example.com", "primary": True}, + ], + } + ) + + assert person is not None + assert person.email == "primary@example.com"