From a489a636510f145b151b40abc77cadcc0aa716b5 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 18 May 2026 16:59:45 +0900 Subject: [PATCH 01/21] =?UTF-8?q?feat(data-pipeline):=20Instagram=20?= =?UTF-8?q?=EC=97=94=ED=8B=B0=ED=8B=B0=20enrichment=20=ED=8C=8C=EC=9D=B4?= =?UTF-8?q?=ED=94=84=EB=9D=BC=EC=9D=B8=20=EC=B6=94=EA=B0=80=20(#544)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(data-pipeline): add Instagram entity enrichment Adds a quota-aware Instagram account enrichment pipeline that keeps runtime state in assets while writing reviewed entity catalog data to operation. Co-authored-by: Cursor * fix(data-pipeline): retry stale entity enrichment steps Co-authored-by: Cursor * style(data-pipeline): format enrichment admin routes Co-authored-by: Cursor --------- Co-authored-by: Cursor --- docs/agent/database-summary.md | 6 +- docs/agent/web-routes-and-features.md | 3 +- docs/architecture/assets-project.md | 22 +- docs/database/entity-enrichment-pipeline.md | 352 ++++++++ docs/database/operating-model.md | 21 +- packages/ai-server/.dev.env.example | 2 + packages/ai-server/.env.example | 1 + packages/ai-server/src/config/_container.py | 51 +- packages/ai-server/src/config/_environment.py | 3 + packages/ai-server/src/main.py | 34 +- .../services/entity_enrichment/__init__.py | 11 + .../services/entity_enrichment/repository.py | 767 ++++++++++++++++++ .../services/entity_enrichment/scheduler.py | 147 ++++ .../src/services/entity_enrichment/service.py | 446 ++++++++++ .../src/domains/admin/gemini_cost.rs | 33 +- .../data-pipeline/instagram-accounts/page.tsx | 341 ++++++++ .../data-pipeline/instagram-accounts/route.ts | 208 +++++ .../instagram-accounts/settings/route.ts | 72 ++ .../web/lib/api/admin/entity-enrichment.ts | 125 +++ .../web/lib/components/admin/AdminSidebar.tsx | 6 + ...18153700_entity_enrichment_asset_state.sql | 70 ++ ...60518152600_entity_enrichment_pipeline.sql | 74 ++ 22 files changed, 2781 insertions(+), 14 deletions(-) create mode 100644 docs/database/entity-enrichment-pipeline.md create mode 100644 packages/ai-server/src/services/entity_enrichment/__init__.py create mode 100644 packages/ai-server/src/services/entity_enrichment/repository.py create mode 100644 packages/ai-server/src/services/entity_enrichment/scheduler.py create mode 100644 packages/ai-server/src/services/entity_enrichment/service.py create mode 100644 packages/web/app/admin/data-pipeline/instagram-accounts/page.tsx create mode 100644 packages/web/app/api/admin/data-pipeline/instagram-accounts/route.ts create mode 100644 packages/web/app/api/admin/data-pipeline/instagram-accounts/settings/route.ts create mode 100644 packages/web/lib/api/admin/entity-enrichment.ts create mode 100644 supabase-assets/migrations/20260518153700_entity_enrichment_asset_state.sql create mode 100644 supabase/migrations/20260518152600_entity_enrichment_pipeline.sql diff --git a/docs/agent/database-summary.md b/docs/agent/database-summary.md index 02dd62e8..ddef88b4 100644 --- a/docs/agent/database-summary.md +++ b/docs/agent/database-summary.md @@ -2,11 +2,12 @@ title: Database — Agent Summary owner: llm status: draft -updated: 2026-04-30 +updated: 2026-05-11 tags: [db, agent] related: - docs/database/operating-model.md - docs/database/01-schema-usage.md + - docs/database/entity-enrichment-pipeline.md - docs/architecture/assets-project.md - docs/database/04-supabase-cli-setup.md --- @@ -29,6 +30,7 @@ prod 는 assets 의 존재를 모르고, assets 도 prod 를 모른다 (cross-pr ## Canonical sources - **운영 모델 (먼저 읽기)**: [`docs/database/operating-model.md`](../database/operating-model.md) — 영역/시스템 매트릭스, "어디 추가하나" 결정 트리, drift 회피, dev 시드 절차 +- Entity enrichment RFC: [`docs/database/entity-enrichment-pipeline.md`](../database/entity-enrichment-pipeline.md) — assets Instagram tags → prod `instagram_accounts` → `artists`/`brands`/`groups` - 스키마 사용법: [`docs/database/01-schema-usage.md`](../database/01-schema-usage.md) - 데이터 흐름: [`docs/database/03-data-flow.md`](../database/03-data-flow.md) - 업데이트 체크리스트: [`docs/database/02-update-checklist.md`](../database/02-update-checklist.md) @@ -40,6 +42,7 @@ prod 는 assets 의 존재를 모르고, assets 도 prod 를 모른다 (cross-pr - **prod public schema**: 앱 데이터 (posts, items, users, solutions, social 등) + 엔티티 카탈로그 (artists/groups/brands, #333) - **assets public schema**: 파이프라인 스테이징 (raw_post_sources, raw_posts, pipeline_events) + 5-state 상태머신 +- **entity enrichment**: assets `raw_posts.platform_metadata.tagged_usernames` 는 후보 증거이고, scheduler state/usage 는 assets `pipeline_settings` / `gemini_usage_events` 가 소유한다. reviewed account/entity state 는 prod `public.instagram_accounts` / `artists` / `brands` / `groups` 가 소유한다. - Migration 전략: - prod: SeaORM (테이블·컬럼) + Supabase CLI SQL (RLS·함수, `supabase/migrations` 가 SOT, #282) - assets: Supabase CLI SQL only (`supabase-assets/migrations/`). SeaORM 은 assets 를 건드리지 않음. @@ -57,3 +60,4 @@ prod 는 assets 의 존재를 모르고, assets 도 prod 를 모른다 (cross-pr - 2026-04-30: 운영 모델 단일 진입점 (`operating-model.md`) 추가 (#371). PRD ref 갱신 (`womgfy...` → `tdchmitwczlwydzkyczu`). - 2026-04-25: warehouse 스키마 드롭 + assets 프로젝트 분리 반영 (#333 / #335) - 2026-04-17: 초기 작성 (Phase 1) +- 2026-05-11: Instagram tagged account → entity catalog enrichment RFC 추가. diff --git a/docs/agent/web-routes-and-features.md b/docs/agent/web-routes-and-features.md index 73d79144..7d091a03 100644 --- a/docs/agent/web-routes-and-features.md +++ b/docs/agent/web-routes-and-features.md @@ -2,7 +2,7 @@ title: Web Routes & Features — Agent Reference owner: human status: approved -updated: 2026-04-17 +updated: 2026-05-11 tags: [agent, ui] --- @@ -43,6 +43,7 @@ App Router 기준 (`packages/web/app/`). 작업 시 이 표와 실제 `app/` 트 | `/admin/entities/group-members` | 그룹 멤버 관리 | | `/admin/raw-post-sources` | 수집 소스 등록/관리 (Pinterest 등 — #327) | | `/admin/raw-posts` | **검증 큐** (#333) — assets 의 raw_posts 를 status 탭(COMPLETED/IN_PROGRESS/ERROR/VERIFIED) 으로 필터링, "검증" 버튼으로 prod posts 반영 | +| `/admin/data-pipeline/instagram-accounts` | Instagram tagged account enrichment queue, Gemini grounding quota, scheduler controls, manual entity review. See [`docs/database/entity-enrichment-pipeline.md`](../database/entity-enrichment-pipeline.md). | | `/request/upload` | Image upload with DropZone | | `/request/detect` | AI detection results with item spotting | | `/request/try` | Try 포스트 업로드 페이지 | diff --git a/docs/architecture/assets-project.md b/docs/architecture/assets-project.md index 24b93895..f9f3fa03 100644 --- a/docs/architecture/assets-project.md +++ b/docs/architecture/assets-project.md @@ -2,14 +2,18 @@ title: Assets Supabase Project owner: human status: approved -updated: 2026-04-25 +updated: 2026-05-11 tags: [architecture, db, agent] +related: + - docs/database/entity-enrichment-pipeline.md --- # Assets Supabase Project (#333) **한 줄 요약**: Pinterest/Instagram 등 외부 플랫폼에서 수집·파싱한 raw_posts 와 파이프라인 중간 상태를 별도 Supabase 프로젝트(`assets`)에 격리. prod 는 admin 이 검증 완료한 데이터만 보유. +> Instagram tagged accounts 는 assets `raw_posts.platform_metadata.tagged_usernames` 에 후보 증거로 남고, reviewed account/entity state 는 prod `public.instagram_accounts` / `artists` / `brands` / `groups` 가 소유한다. 상세 RFC 는 [`docs/database/entity-enrichment-pipeline.md`](../database/entity-enrichment-pipeline.md). + ## 왜 분리했나 PR #258 의 raw_posts 파이프라인이 prod Supabase 의 `warehouse.raw_post*` 테이블을 직접 사용하면서 두 가지 문제가 누적됐다: @@ -28,6 +32,7 @@ PR #258 의 raw_posts 파이프라인이 prod Supabase 의 `warehouse.raw_post*` │ public.raw_posts │ │ └ status pipeline_status │ │ └ verified_at / verified_by │ +│ └ platform_metadata.tagged_usernames (IG) │ │ public.pipeline_events │ │ + RLS: service-role only │ └──────────────▲──────────────┬─────────────────┘ @@ -89,6 +94,19 @@ NOT_STARTED → IN_PROGRESS → COMPLETED ──(admin verify)──► VERIFIED 로컬 개발자가 cloud assets(공유) 데이터를 오염시키지 않도록 `APP_ENV=local` 일 때 verify 엔드포인트는 **prod INSERT 만** 수행하고 assets status write 를 스킵한다. production 배포에서만 status=VERIFIED 가 기록된다. +### 5. Instagram tagged accounts 는 downstream entity enrichment 입력 + +Instagram adapter 는 Instaloader GraphQL payload 의 tagged users 를 +`raw_posts.platform_metadata.tagged_usernames` 로 저장한다. 이 값은 assets +프로젝트 안에서는 raw evidence 이며, account classification 이나 artist/brand/group +승격 상태를 의미하지 않는다. + +Entity enrichment pipeline 은 이 raw evidence 를 읽어 prod `public.instagram_accounts` +에 username 기준으로 upsert 한다. Scheduler 상태와 Gemini 사용량은 assets +`pipeline_settings` / `gemini_usage_events` 에 남기고, Gemini/profile +backfill/admin review 의 검증된 결과만 prod entity catalog 에 기록한다. 두 +프로젝트 사이에는 여전히 FK 를 만들지 않는다. + ## verify 시퀀스 ``` @@ -152,6 +170,7 @@ prod 에서 `warehouse` 스키마를 완전 드롭하고 살아남는 엔티티 | `caption` | 텍스트 (Pinterest description, IG caption 등) | 합성 케이스: NULL 또는 prompt | | `author_name` | 저자/소스 명 (Pinterest pinner 등) | — | | `platform_metadata` | 플랫폼별 자유 메타 (saves, board_id, hashtags 등) | 합성 케이스: 보통 NULL | +| `platform_metadata.tagged_usernames` | Instagram tagged account 후보. 쉼표 구분 username 문자열. Entity enrichment pipeline 의 입력으로만 사용 | Instagram raw_posts | | `parse_result` | 비전파싱 결과 (아이템 bbox, 브랜드 후보 등) | 모든 케이스 동일 — Vision 결과 | | `dispatch_id` | ai-server scheduler 의 1회 dispatch 추적 키 | — | @@ -161,3 +180,4 @@ prod 에서 `warehouse` 스키마를 완전 드롭하고 살아남는 엔티티 - 2026-04-26: r2_url/r2_key 컬럼 드롭, image_url 단일화 (#347) - 2026-04-25: 초기 작성 — 두 프로젝트 분리, 5-state 상태머신, verify 플로우 (#333) +- 2026-05-11: Instagram tagged accounts 를 entity enrichment downstream input 으로 문서화. diff --git a/docs/database/entity-enrichment-pipeline.md b/docs/database/entity-enrichment-pipeline.md new file mode 100644 index 00000000..bfc1ea30 --- /dev/null +++ b/docs/database/entity-enrichment-pipeline.md @@ -0,0 +1,352 @@ +--- +title: Entity Enrichment Pipeline +owner: llm +status: draft +updated: 2026-05-11 +tags: [database, assets, operation, instagram, entity-enrichment, admin] +related: + - docs/database/operating-model.md + - docs/architecture/assets-project.md + - docs/agent/database-summary.md + - docs/agent/web-routes-and-features.md +--- + +# Entity Enrichment Pipeline + +This document is the working RFC and GitHub issue draft for turning Instagram +tagged accounts into reviewed `artists`, `brands`, `groups`, `group_members`, +and `instagram_accounts` rows in the operation Supabase project. + +## Goal + +Build a Data Pipeline workflow that: + +- extracts Instagram tagged usernames from the assets project, +- upserts candidates into operation `public.instagram_accounts`, +- enriches missing names/profile metadata with Gemini 2.5 Flash + Google Search + grounding and Instaloader profile lookup, +- records Gemini grounded-search daily usage against the 1,500 RPD free quota, +- lets admins review account type and role from a Data Pipeline page, +- promotes approved primary accounts into `artists`, `brands`, or `groups`. + +## Current State + +### Operation Supabase + +The operation project owns the entity catalog: + +- `public.artists` +- `public.brands` +- `public.groups` +- `public.group_members` +- `public.instagram_accounts` + +Observed on 2026-05-11: + +- `artists`: 68 rows +- `brands`: 416 rows +- `groups`: 47 rows +- `instagram_accounts`: 1,518 rows +- `group_members`: 0 rows +- `artists.profile_image_url`, `brands.logo_image_url`, and + `groups.profile_image_url` are currently empty across all existing rows. + +`instagram_accounts` already has `username`, `name_en`, `name_ko`, +`display_name`, `bio`, `profile_image_url`, `wikidata_status`, `wikidata_id`, +`needs_review`, `artist_id`, `brand_id`, `group_id`, and +`entity_region_code`. + +Missing for this workflow: + +- explicit account type (`artist`, `brand`, `group`, `other`, `unknown`), +- explicit account role (`primary`, `secondary`, `regional`, `unknown`), +- brand `country_of_origin` (`NA` if unknown), +- Gemini review status/model/confidence/reason in metadata, +- grounded-search usage accounting in the assets Gemini cost tables. + +### Assets Supabase + +The assets project owns raw pipeline staging: + +- `public.raw_post_sources` +- `public.raw_posts` +- `public.pipeline_events` +- `public.pipeline_settings` +- `public.gemini_usage_events` +- `public.gemini_spend_daily` + +Instagram tagged accounts are currently stored in +`raw_posts.platform_metadata.tagged_usernames` as a comma-separated string. The +source is `packages/ai-server/src/services/raw_posts/adapters/instagram.py`, +which reads `edge_media_to_tagged_user.edges[*].node.user.username` from the +Instaloader GraphQL payload. + +The first version of this pipeline must handle both: + +- existing operation `instagram_accounts` that need enrichment, and +- new candidates discovered from assets `raw_posts`. + +## Data Flow + +```mermaid +flowchart TD + assetRawPosts["Assets raw_posts"] --> extractTags["Extract tagged_usernames"] + extractTags --> upsertAccounts["Upsert operation instagram_accounts"] + upsertAccounts --> profileBackfill["Instaloader profile backfill"] + upsertAccounts --> geminiReview["Gemini 2.5 Flash grounded review"] + geminiReview --> usageLog["Assets gemini_usage_events"] + profileBackfill --> reviewQueue["Admin Data Pipeline review queue"] + geminiReview --> reviewQueue + reviewQueue --> promoteEntities["Promote approved primary accounts"] + promoteEntities --> artists["artists"] + promoteEntities --> brands["brands"] + promoteEntities --> groups["groups"] + promoteEntities --> groupMembers["group_members"] +``` + +## Proposed Operation Schema + +Add only durable classification columns to `public.instagram_accounts`. +Operational review/profile state stays in `metadata` to avoid widening the +table for pipeline-internal details. + +| Column | Type | Purpose | +| --- | --- | --- | +| `account_type` | `text` | Gemini/admin classification: `artist`, fashion-only `brand`, `group`, `other`, `unknown` | +| `entity_ig_role` | `text` | Entity account role: `primary`, `secondary`, `regional`, `unknown` | + +Gemini review provenance and profile backfill state should stay in +`instagram_accounts.metadata`, not in dedicated columns: + +```json +{ + "entity_enrichment": { + "gemini_review": { + "status": "reviewed", + "reviewed_at": "2026-05-14T00:00:00Z", + "model": "gemini-2.5-flash", + "confidence": 0.92, + "reason": "...", + "payload": {} + }, + "profile_backfill": { + "status": "completed", + "completed_at": "2026-05-14T00:00:00Z", + "source": "instaloader" + } + } +} +``` + +Add `public.brands.country_of_origin text not null default 'NA'`. + +| Column | Type | Purpose | +| --- | --- | --- | +| `country_of_origin` | `text` | Country where the brand was founded; `NA` if unknown. | + +Recommended check constraints: + +- `account_type in ('artist', 'brand', 'group', 'other', 'unknown')` +- `entity_ig_role in ('primary', 'secondary', 'regional', 'unknown')` +- `num_nonnulls(artist_id, brand_id, group_id) <= 1` + +`entity_region_code` already exists and should be used for regional accounts +such as `JP` or `KR`. + +### Gemini Usage Accounting + +Use the existing assets DB cost tables instead of creating an operation DB usage +table: + +- `public.gemini_usage_events` records each grounded review call with + `pipeline = 'entity_enrichment'`, `step = 'instagram_account_review'`, and + `grounding_queries = 1`. +- `public.gemini_spend_daily` remains the admin read-side for spend and usage + aggregation. +- The scheduler checks today's KST `grounding_queries` for + `pipeline = 'entity_enrichment'` before sending another grounded prompt. If + the configured daily cap has been reached, it pauses without another Gemini + call. + +Runtime scheduler state also belongs in assets DB: + +- `public.pipeline_settings.platform = 'entity_enrichment'` +- the existing `processing_*` columns store enabled/running/progress/last error +- `pipeline_settings.metadata.entity_enrichment` stores config such as + `daily_grounding_cap` and `auto_promote_confidence_min` + +Model selection is intentionally isolated from the shared `GEMINI_MODEL` env var: +entity enrichment must use `ENTITY_ENRICHMENT_GEMINI_MODEL=gemini-2.5-flash`. +Do not point this pipeline at preview/3.x Gemini models, because Google Search +grounding limits and pricing may differ. + +## Scheduler Scope + +The enrichment scheduler belongs under the Data Pipeline concept, not inside the +entity CRUD screens. + +Responsibilities: + +1. Read assets `raw_posts` where `platform = 'instagram'` and + `platform_metadata.tagged_usernames` exists. +2. Normalize usernames with the same rules as the Instagram adapter: trim, + remove leading `@`, lowercase. +3. Upsert operation `instagram_accounts` by unique `username`. +4. Select operation accounts that are missing required data or have + `needs_review = true`. +5. Run Instaloader profile lookup to populate `display_name`, `bio`, and + `profile_image_url`. +6. Run Gemini 2.5 Flash with Google Search grounding to classify: + `name_en`, `name_ko`, `account_type`, `entity_ig_role`, region, confidence, + and evidence. +7. Store structured review output in operation metadata and quota usage in + assets `gemini_usage_events`. +8. Mark uncertain results as `needs_human` / `needs_review = true`. + +Non-responsibilities: + +- It should not auto-publish uncertain Gemini decisions. +- It should not replace the existing raw posts review queue. +- It should not create duplicate primary entities for secondary/regional + accounts. + +Recommended implementation home: + +- candidate extraction and profile/Gemini enrichment: ai-server scheduler or + worker, because `instagram.py`, Instaloader, Gemini usage, and existing + scheduler patterns already live there; +- admin control and review: Next.js admin APIs/pages under `packages/web`; +- operation writes: service-role server-side only, with audit/provenance. + +## Admin Dashboard Scope + +Add a Data Pipeline page, tentatively: + +- `/admin/data-pipeline/instagram-accounts` + +The page should show: + +- queue counts derived from `metadata.entity_enrichment.*.status`, + `account_type`, and `entity_ig_role`, +- Gemini grounded-search usage for today: used, remaining, quota, cap reached, +- scheduler controls from assets `pipeline_settings`: enabled, paused, batch + size, daily cap, last run, last success, last error, +- account table: username, current entity link, Gemini suggestion, confidence, + profile image, bio/display name, source count, last tagged source, +- review actions: approve suggestion, edit type/role/region, link existing + entity, create primary entity, mark unknown, skip, retry. + +Promotion behavior: + +- `entity_ig_role = 'primary'` can create/link one of `artists`, `brands`, or + `groups`, then set that entity's `primary_instagram_account_id`. +- `entity_ig_role = 'secondary'` links to the correct entity but does not + replace `primary_instagram_account_id`. +- `entity_ig_role = 'regional'` links to the correct entity and must set + `entity_region_code`. +- `account_type = 'other'` means the account is known but not eligible for the + fashion catalog, e.g. beauty/cosmetics, magazines/media like Vogue, technology + companies, character/IP owners like Hello Kitty/Sanrio, venues, restaurants, or + platforms. +- `account_type = 'unknown'` means there is insufficient evidence. Both `other` + and `unknown` remain in `instagram_accounts` and stay visible for review or + exclusion. + +## Storage Decision + +Use the existing Cloudflare R2 public URL strategy used by raw posts. Do not add +operation Supabase Storage buckets for this pipeline. + +MVP behavior: + +- Instaloader profile lookup resolves the source Instagram profile image URL. +- ai-server downloads that image and uploads it to R2 under + `entity-profiles/instagram/{username}.{ext}`. +- `instagram_accounts.profile_image_url` stores the R2 public URL when R2 is + configured. +- If R2 is not configured in a local environment, the scheduler may keep the + source profile image URL and log a warning, but production should be configured + with R2 credentials. + +## GitHub Issue Draft + +Title: + +```text +Build Instagram account enrichment pipeline for artist/brand/group catalog +``` + +Body: + +```markdown +## Summary +Build a Data Pipeline workflow that turns Instagram tagged accounts into reviewed artist/brand/group catalog entries. Operation DB owns durable entity state; assets DB owns pipeline runtime state and Gemini usage accounting. + +## Context +We currently collect Instagram tagged accounts from Instaloader into assets `raw_posts.platform_metadata.tagged_usernames`. Operation DB already has `instagram_accounts`, `artists`, `brands`, `groups`, and `group_members`, but the old n8n warehouse workflow no longer matches the current public schema. Existing entity dashboards can edit artists/brands, but there is no first-class Instagram account enrichment/review queue. + +We want to use Gemini 2.5 Flash with Google Search grounding to enrich missing `name_en`, `name_ko`, account type, account role, regional classification, and brand country of origin. Grounding has a 1,500 RPD free quota in the paid tier, so usage must be tracked in the existing assets Gemini cost tables and visible in admin before the scheduler can run safely. + +## Scope +- Add only durable classification columns to operation `instagram_accounts`: `account_type` and `entity_ig_role`. +- Add `brands.country_of_origin text not null default 'NA'`. +- Store Gemini/profile enrichment details under `instagram_accounts.metadata.entity_enrichment` instead of adding dedicated audit/status columns. +- Store scheduler runtime state in assets `pipeline_settings.platform = 'entity_enrichment'`. +- Record Gemini usage in existing assets `gemini_usage_events` with `pipeline = 'entity_enrichment'` and `step = 'instagram_account_review'`. +- Upsert tagged usernames from assets raw posts into operation `instagram_accounts`. +- Backfill Instagram profile data: `display_name`, `bio`, `profile_image_url`, + mirroring profile images to R2. +- Use Gemini 2.5 Flash with Google Search grounding to enrich missing names and classify account type/role/region/country of origin. +- Add a Data Pipeline dashboard page for queue health, scheduler controls, quota usage, and enrichment monitoring. +- Auto-promote high-confidence primary accounts into `artists`, `brands`, or `groups`; keep secondary/regional accounts in `instagram_accounts` for reference and later manual linking if needed. +- Use the existing artist/brand/group admin screens for manual corrections after enrichment. +- Document the pipeline and operation/assets DB ownership. + +## Non-goals +- Fully replacing the existing raw posts review queue. +- Autonomously publishing uncertain Gemini decisions. +- Reworking Pinterest/raw post pipelines beyond consuming existing tagged metadata. +- Creating duplicate primary entities for regional or secondary Instagram accounts. + +## Acceptance Criteria +- Admin can see an Instagram account enrichment queue under Data Pipeline. +- Admin can run/pause the enrichment scheduler from Data Pipeline using the assets `pipeline_settings` row for `entity_enrichment`. +- Admin can see today's Gemini grounded-search usage, remaining quota, configured cap, and cap-reached state. +- Scheduler respects the daily Gemini grounding cap using assets `gemini_usage_events` and records every grounded request through the existing Gemini cost tracking path. +- Scheduler resets stale component-level `error` / `processing` statuses back to `pending` after a cooldown so missing Gemini review and missing profile backfill can be retried independently. +- Tagged usernames from assets raw posts are deduplicated and upserted into operation `instagram_accounts` by normalized username. +- Gemini output is stored as structured review data under operation `instagram_accounts.metadata.entity_enrichment.gemini_review`, not only opaque text. +- Profile backfill status/provenance is stored under operation `instagram_accounts.metadata.entity_enrichment.profile_backfill`, not as dedicated columns. +- Admin can inspect each candidate account with Gemini evidence and profile metadata. +- Account type is visible as `artist`, fashion-only `brand`, `group`, `other`, or `unknown`. +- Account role is visible as `primary`, `secondary`, `regional`, or `unknown`. +- Regional accounts store `entity_region_code`, e.g. `JP` or `KR`, without creating duplicate primary entities. +- Brand primary entities store `country_of_origin`, using `NA` when unknown. +- High-confidence primary accounts create or link the correct `artists`, `brands`, or `groups` row and set `primary_instagram_account_id`. +- Manual corrections happen in the existing artist/brand/group admin screens, not in the pipeline queue. +- Secondary/regional accounts remain in `instagram_accounts` with explicit role and region. +- Entity image/logo URL fields can be populated from R2-backed profile image URLs. +- Automated writes preserve provenance through metadata or audit logs. +- Docs are updated with schema, data flow, quota policy, and manual review rules. + +## Implementation Notes +- Prefer a Data Pipeline page over adding this first to the generic entity CRUD screens. +- Candidate extraction reads from assets; catalog writes go to operation; scheduler runtime and Gemini usage stay in assets. There are no cross-project FKs. +- Do not create operation DB tables for entity enrichment scheduler settings or Gemini daily usage. Reuse assets `pipeline_settings`, `gemini_usage_events`, and `gemini_spend_daily`. +- First release should process both new tagged accounts and existing operation `instagram_accounts` that are missing names/images/review state. +- Reuse the existing R2 public URL strategy for profile/logo images; do not add Supabase Storage for this flow. +- Review operation RLS posture before exposing more audit/admin data. Current MCP advisory reports RLS disabled on `seaql_migrations`, `admin_audit_log`, and `post_magazine_events`. +``` + +## Documentation Checklist + +- Update `docs/agent/database-summary.md` with this pipeline as an operation + entity catalog workflow fed by assets raw data, with runtime/usage state in + assets. +- Update `docs/database/operating-model.md` with the cross-project rule: + assets owns raw tagged evidence plus pipeline runtime/usage state; operation + owns reviewed account/entity state. +- Update `docs/architecture/assets-project.md` with tagged account extraction as + a downstream consumer of `raw_posts.platform_metadata`. +- Data Pipeline route: `packages/web/app/admin/data-pipeline/instagram-accounts/page.tsx`. +- Keep this document as the canonical RFC until the issue is split into PRs. diff --git a/docs/database/operating-model.md b/docs/database/operating-model.md index f8e6e1c7..aca718ab 100644 --- a/docs/database/operating-model.md +++ b/docs/database/operating-model.md @@ -1,12 +1,13 @@ --- title: DB 운영 모델 — 단일 진입점 -date: 2026-04-30 +date: 2026-05-11 status: approved owner: human related: - docs/agent/environments.md - docs/DATABASE-MIGRATIONS.md - docs/agent/database-summary.md + - docs/database/entity-enrichment-pipeline.md - docs/architecture/assets-project.md tags: [database, migration, supabase, seaorm, operating-model] --- @@ -42,7 +43,7 @@ decoded 는 **3개의 PostgreSQL 영역** (dev/prod/assets) 과 **3개의 사실 └────────────────────────────────────────────────────────────────────┘ ``` -prod ↔ assets 사이에 cross-project FK 는 **없다**. assets 는 검증되어야 prod 로 흘러간다 (`/api/v1/.../verify` 엔드포인트). +prod ↔ assets 사이에 cross-project FK 는 **없다**. assets 는 검증되어야 prod 로 흘러간다 (`/api/v1/.../verify` 엔드포인트). Instagram 계정 보강처럼 assets raw evidence 를 prod entity catalog 로 반영하는 흐름도 같은 원칙을 따른다: assets 는 `raw_posts.platform_metadata` 증거만 소유하고, reviewed account/entity state 는 prod `public.instagram_accounts` / `artists` / `brands` / `groups` 가 소유한다. ## 마이그레이션 / 스키마 시스템 @@ -82,6 +83,11 @@ prod ↔ assets 사이에 cross-project FK 는 **없다**. assets 는 검증되 ├─ assets 스키마 변경 ───────────► supabase-assets/migrations/_.sql │ (별도 link → supabase db push) │ + ├─ assets raw 데이터 → prod catalog 파이프라인 + │ ───────► prod schema 변경은 supabase/migrations/* + │ assets schema 변경은 supabase-assets/migrations/* + │ cross-project FK 금지, app/worker 가 idempotent write + │ └─ SeaORM 변경 ─────────────────► ❌ 추가하지 말 것 (B.3 #374 까지) 기존 SeaORM 마이그레이션도 손대지 말 것 ``` @@ -113,6 +119,15 @@ prod ↔ assets 사이에 cross-project FK 는 **없다**. assets 는 검증되 2. `cd supabase-assets && supabase db push` 3. ai-server / api-server 의 status 머신 코드 동기 갱신 +**(e) Instagram tagged account 를 entity catalog 로 보강** + +1. assets `raw_posts.platform_metadata.tagged_usernames` 는 후보 증거로만 취급한다. +2. prod `public.instagram_accounts` / `artists` / `brands` / `groups` 변경은 `supabase/migrations/_entity_enrichment_*.sql` 로 추가한다. +3. Gemini grounding 사용량과 scheduler running/progress 같은 pipeline runtime 상태는 assets `pipeline_settings` / `gemini_usage_events` 가 소유한다. +4. admin 이 보는 quota/status 는 assets 에서 읽고, 검증된 entity 결과만 prod 에 쓴다. +5. assets 와 prod 사이 FK 는 만들지 않는다. worker/admin API 가 username 기준으로 idempotent upsert 한다. +6. 상세 RFC: [`docs/database/entity-enrichment-pipeline.md`](entity-enrichment-pipeline.md). + ## drift 발생 패턴 + 회피 | 패턴 | 사례 / 회피책 | @@ -157,6 +172,7 @@ unset PRD_DB_URL | 마이그레이션 SOT 정책, `SKIP_DB_MIGRATIONS` | [`docs/DATABASE-MIGRATIONS.md`](../DATABASE-MIGRATIONS.md) | | Supabase CLI 사용법 (link, push, gen types) | [`docs/database/04-supabase-cli-setup.md`](04-supabase-cli-setup.md) | | nightly drift CI 운영 (#373) | [`docs/database/drift-check.md`](drift-check.md) | +| Entity enrichment RFC | [`docs/database/entity-enrichment-pipeline.md`](entity-enrichment-pipeline.md) | | PRD → dev 시드 자동화 스크립트 | [`scripts/seed-from-prod.sh`](../../scripts/seed-from-prod.sh) | | assets 프로젝트 설계 (#333) | [`docs/architecture/assets-project.md`](../architecture/assets-project.md) | | agent 짧은 요약 | [`docs/agent/database-summary.md`](../agent/database-summary.md) | @@ -167,3 +183,4 @@ unset PRD_DB_URL - **2026-04-30** — 초기 작성 (#371). PRD→dev 시드 작업 중 발견된 `post_magazines.status='failed'` drift (#372) 를 계기로 운영 모델을 1페이지로 정리. 기존에 흩어져 있던 `agent/environments.md`, `DATABASE-MIGRATIONS.md`, `agent/database-summary.md`, `database/04-supabase-cli-setup.md` 의 진입점 역할. - **2026-04-30** — 후속: dev→prod 시드 절차를 `just seed-from-prod` (#377) 한 줄로 단축. drift 패턴 표에 `post_magazines.status='failed'` (#372) 사례를 inline 인용. nightly drift CI (#373, #378) 와 시드 스크립트를 관련 문서에 등록. +- **2026-05-11** — assets Instagram tagged accounts 를 prod entity catalog 로 보강하는 cross-project pipeline 원칙과 RFC 링크 추가. diff --git a/packages/ai-server/.dev.env.example b/packages/ai-server/.dev.env.example index be74974f..b3baeb5d 100644 --- a/packages/ai-server/.dev.env.example +++ b/packages/ai-server/.dev.env.example @@ -47,6 +47,8 @@ MAX_CONCURRENT_REQUESTS=5 REQUEST_TIMEOUT=30 MAX_RETRIES=3 +ENTITY_ENRICHMENT_GEMINI_MODEL=gemini-2.5-flash + # Cloudflare R2 — raw_posts (#258). 비우면 업로드 경로만 비활성. RAW_POSTS_R2_ACCOUNT_ID= RAW_POSTS_R2_ACCESS_KEY_ID= diff --git a/packages/ai-server/.env.example b/packages/ai-server/.env.example index e66b1b02..5f4881e4 100644 --- a/packages/ai-server/.env.example +++ b/packages/ai-server/.env.example @@ -62,6 +62,7 @@ PERPLEXITY_API_URL=https://api.perplexity.ai PERPLEXITY_MODEL=sonar PERPLEXITY_MAX_RETRIES=3 PERPLEXITY_REQUEST_TIMEOUT=30 +ENTITY_ENRICHMENT_GEMINI_MODEL=gemini-2.5-flash SEARXNG_API_URL=http://localhost:4000 SEARXNG_MAX_RETRIES=3 diff --git a/packages/ai-server/src/config/_container.py b/packages/ai-server/src/config/_container.py index 6692f83f..d8463570 100644 --- a/packages/ai-server/src/config/_container.py +++ b/packages/ai-server/src/config/_container.py @@ -44,6 +44,11 @@ from src.services.editorial_discovery.repository import EditorialDiscoveryRepository from src.services.editorial_discovery.scheduler import DiscoveryScheduler from src.services.editorial_article.scheduler import ArticlePickupScheduler +from src.services.entity_enrichment import ( + EntityEnrichmentRepository, + EntityEnrichmentScheduler, + EntityEnrichmentService, +) from src.grpc.client.backend_client import GRPCBackendClient from src.managers.llm.adapters.perplexity import PerplexityClient from src.managers.llm.adapters.local_llm import LocalLLMClient @@ -97,7 +102,7 @@ class InfrastructureContainer(DeclarativeContainer): # #214 raw_posts_callback_client removed — ai-server writes DB directly. - # ─── Postgres asyncpg pool — assets (raw_posts) vs operation (운영 DB) 분리 (#333, #369) ─── + # ─── Postgres asyncpg pools: assets(raw_posts) vs operation(prod) ─── # # assets: ASSETS_DATABASE_URL → raw_posts / raw_post_sources / pipeline_events # operation: OPERATION_DATABASE_URL → posts / post_magazines / post_magazine_events @@ -435,6 +440,42 @@ class EditorialArticlePickupContainer(DeclarativeContainer): ) +class EntityEnrichmentContainer(DeclarativeContainer): + """Instagram tagged accounts → operation entity catalog enrichment (#495).""" + + environment: Dependency[Environment] = Dependency(Environment) + logger: Dependency[LoggerService] = Dependency(LoggerService) + infrastructure: DependenciesContainer[InfrastructureContainer] = ( + DependenciesContainer() + ) + + repository: Singleton[EntityEnrichmentRepository] = Singleton( + EntityEnrichmentRepository, + assets_db=infrastructure.assets_database_manager, + operation_db=infrastructure.operation_database_manager, + ) + + service: Singleton[EntityEnrichmentService] = Singleton( + EntityEnrichmentService, + repository=repository, + r2_client=infrastructure.r2_client, + gemini_api_key=Callable(lambda env: env.GEMINI_API_KEY, environment), + gemini_model=Callable( + lambda env: env.ENTITY_ENRICHMENT_GEMINI_MODEL, + environment, + ), + instagram_session_username=Callable( + lambda env: env.INSTAGRAM_SESSION_USERNAME, environment + ), + ) + + scheduler: Singleton[EntityEnrichmentScheduler] = Singleton( + EntityEnrichmentScheduler, + repository=repository, + service=service, + ) + + # GRPC API Layer class GRPCContainer(DeclarativeContainer): environment: Dependency[Environment] = Dependency(Environment) @@ -518,6 +559,14 @@ class Application(DeclarativeContainer): infrastructure=infrastructure, ) + # Entity Enrichment Layer (#495) + entity_enrichment: Container[EntityEnrichmentContainer] = Container( + EntityEnrichmentContainer, + environment=environment, + logger=logger, + infrastructure=infrastructure, + ) + # GRPC API Layer grpc: Container[GRPCContainer] = Container( GRPCContainer, diff --git a/packages/ai-server/src/config/_environment.py b/packages/ai-server/src/config/_environment.py index 1178c24e..04d5baeb 100644 --- a/packages/ai-server/src/config/_environment.py +++ b/packages/ai-server/src/config/_environment.py @@ -122,6 +122,9 @@ class Environment(BaseModel): GEMINI_SPOTS_MODEL: str = "gemini-2.5-flash" # spot detection on hero # Verify-time title composition. Cheap text-only call — flash-lite 충분. GEMINI_TITLE_MODEL: str = "gemini-2.5-flash-lite" + # Entity enrichment must stay on 2.5 Flash: Google Search grounding quota/cost + # differs for preview/3.x models. + ENTITY_ENRICHMENT_GEMINI_MODEL: str = "gemini-2.5-flash" RAW_POSTS_PROCESSOR_ENABLED: bool = True # kill switch # Pinterest adapter (#214) diff --git a/packages/ai-server/src/main.py b/packages/ai-server/src/main.py index c8ce8e55..33fb0cbc 100644 --- a/packages/ai-server/src/main.py +++ b/packages/ai-server/src/main.py @@ -13,6 +13,7 @@ from src.services.post_editorial.scheduler import EditorialScheduler from src.services.editorial_discovery.scheduler import DiscoveryScheduler from src.services.editorial_article.scheduler import ArticlePickupScheduler +from src.services.entity_enrichment.scheduler import EntityEnrichmentScheduler async def start_api_server() -> None: @@ -169,6 +170,25 @@ async def start_article_pickup_scheduler( await scheduler.shutdown() +async def start_entity_enrichment_scheduler( + entity_enrichment_container, shutdown_event: asyncio.Event +) -> None: + """Instagram account enrichment cron — DB settings toggle controls work.""" + import logging + + logger = logging.getLogger(__name__) + scheduler: EntityEnrichmentScheduler = entity_enrichment_container.scheduler() + try: + scheduler.start() + await shutdown_event.wait() + except asyncio.CancelledError: + pass + except Exception: + logger.exception("entity enrichment scheduler crashed") + finally: + await scheduler.shutdown() + + async def start_arq_worker( environment, metadata_container, infrastructure_container ) -> None: @@ -197,7 +217,7 @@ async def start_arq_worker( except Exception: logger.exception("ARQ orphan recovery failed (non-fatal, continuing)") - # Create worker with injected dependencies (assets + operation pools for post_editorial) + # Create worker with injected dependencies. worker = await create_worker( environment, metadata_container, @@ -238,6 +258,7 @@ async def main(): post_editorial_container = app.post_editorial() editorial_discovery_container = app.editorial_discovery() editorial_article_pickup_container = app.editorial_article_pickup() + entity_enrichment_container = app.entity_enrichment() # Initialize backend client connection backend_client = metadata_container.backend_client() @@ -249,7 +270,7 @@ async def main(): f"Failed to connect backend client: {str(e)}. Will retry on first use." ) - # GRPC server configuration (Docker: set AI_GRPC_LISTEN_PORT to match api's AI_SERVER_GRPC_URL) + # Docker: AI_GRPC_LISTEN_PORT should match api's AI_SERVER_GRPC_URL. grpc_host = "0.0.0.0" grpc_port_env = os.environ.get("AI_GRPC_LISTEN_PORT", "").strip() if grpc_port_env: @@ -265,6 +286,7 @@ async def main(): editorial_shutdown = asyncio.Event() discovery_shutdown = asyncio.Event() article_pickup_shutdown = asyncio.Event() + entity_enrichment_shutdown = asyncio.Event() try: await asyncio.gather( start_task_scheduler(metadata_container), @@ -275,10 +297,15 @@ async def main(): ), start_raw_posts_scheduler(raw_posts_container, raw_posts_shutdown), start_editorial_scheduler(post_editorial_container, editorial_shutdown), - start_discovery_scheduler(editorial_discovery_container, discovery_shutdown), + start_discovery_scheduler( + editorial_discovery_container, discovery_shutdown + ), start_article_pickup_scheduler( editorial_article_pickup_container, article_pickup_shutdown ), + start_entity_enrichment_scheduler( + entity_enrichment_container, entity_enrichment_shutdown + ), start_grpc_server(grpc_container, grpc_host, grpc_port), start_api_server(), return_exceptions=False, @@ -291,6 +318,7 @@ async def main(): editorial_shutdown.set() discovery_shutdown.set() article_pickup_shutdown.set() + entity_enrichment_shutdown.set() # Cleanup backend client connection try: diff --git a/packages/ai-server/src/services/entity_enrichment/__init__.py b/packages/ai-server/src/services/entity_enrichment/__init__.py new file mode 100644 index 00000000..e2e783e8 --- /dev/null +++ b/packages/ai-server/src/services/entity_enrichment/__init__.py @@ -0,0 +1,11 @@ +"""Instagram account → entity catalog enrichment pipeline (#495).""" + +from .repository import EntityEnrichmentRepository +from .scheduler import EntityEnrichmentScheduler +from .service import EntityEnrichmentService + +__all__ = [ + "EntityEnrichmentRepository", + "EntityEnrichmentScheduler", + "EntityEnrichmentService", +] diff --git a/packages/ai-server/src/services/entity_enrichment/repository.py b/packages/ai-server/src/services/entity_enrichment/repository.py new file mode 100644 index 00000000..9ad548a5 --- /dev/null +++ b/packages/ai-server/src/services/entity_enrichment/repository.py @@ -0,0 +1,767 @@ +"""Repository for Instagram account enrichment (#495). + +Reads raw tagged-account evidence from assets and writes reviewed catalog state +to operation public tables. There are no cross-project FKs. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from datetime import date, datetime +from typing import Any, Dict, Iterable, List, Optional +from uuid import UUID + +from src.managers.database import DatabaseManager +from src.services.raw_posts.adapters.instagram import normalize_instagram_username + + +logger = logging.getLogger(__name__) +_PIPELINE_PLATFORM = "entity_enrichment" + + +@dataclass(frozen=True) +class EntityEnrichmentSettings: + enabled: bool + cycle_seconds: int + batch_size: int + daily_grounding_cap: int + auto_promote_confidence_min: float + last_run_at: Optional[datetime] + last_success_at: Optional[datetime] + last_error: Optional[str] + last_error_at: Optional[datetime] + running: bool + progress: Optional[str] + + +@dataclass(frozen=True) +class InstagramAccountCandidate: + id: UUID + username: str + name_en: Optional[str] + name_ko: Optional[str] + display_name: Optional[str] + bio: Optional[str] + profile_image_url: Optional[str] + account_type: str + entity_ig_role: str + entity_region_code: Optional[str] + metadata: Optional[Dict[str, Any]] + needs_review: Optional[bool] + + +@dataclass(frozen=True) +class GeminiReview: + account_type: str + entity_ig_role: str + name_en: Optional[str] + name_ko: Optional[str] + entity_region_code: Optional[str] + country_of_origin: Optional[str] + confidence: float + reason: str + payload: Dict[str, Any] + + +class EntityEnrichmentRepository: + def __init__( + self, + *, + assets_db: DatabaseManager, + operation_db: DatabaseManager, + ) -> None: + self._assets_db = assets_db + self._operation_db = operation_db + + async def fetch_settings(self) -> EntityEnrichmentSettings: + async with self._assets_db.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT processing_enabled, + processing_cycle_seconds, + processing_batch_size, + processing_last_run_at, + processing_last_success_at, + processing_last_error, + processing_last_error_at, + processing_running, + processing_progress, + metadata + FROM public.pipeline_settings + WHERE platform = $1 + """, + _PIPELINE_PLATFORM, + ) + if row is None: + return EntityEnrichmentSettings( + enabled=False, + cycle_seconds=3600, + batch_size=10, + daily_grounding_cap=1500, + auto_promote_confidence_min=0.9, + last_run_at=None, + last_success_at=None, + last_error=None, + last_error_at=None, + running=False, + progress=None, + ) + config = _entity_enrichment_config(row["metadata"]) + return EntityEnrichmentSettings( + enabled=row["processing_enabled"], + cycle_seconds=row["processing_cycle_seconds"], + batch_size=row["processing_batch_size"], + daily_grounding_cap=int(config.get("daily_grounding_cap") or 1500), + auto_promote_confidence_min=float( + config.get("auto_promote_confidence_min") or 0.9 + ), + last_run_at=row["processing_last_run_at"], + last_success_at=row["processing_last_success_at"], + last_error=row["processing_last_error"], + last_error_at=row["processing_last_error_at"], + running=row["processing_running"], + progress=row["processing_progress"], + ) + + async def mark_running(self, *, progress: Optional[str]) -> None: + async with self._assets_db.acquire() as conn: + await conn.execute( + """ + UPDATE public.pipeline_settings + SET processing_running = true, + processing_progress = $1, + processing_last_run_at = now(), + updated_at = now() + WHERE platform = $2 + """, + progress, + _PIPELINE_PLATFORM, + ) + + async def update_run_health( + self, + *, + success: bool, + error: Optional[str] = None, + progress: Optional[str] = None, + ) -> None: + async with self._assets_db.acquire() as conn: + if success: + await conn.execute( + """ + UPDATE public.pipeline_settings + SET processing_running = false, + processing_progress = $1, + processing_last_success_at = now(), + processing_last_error = NULL, + processing_last_error_at = NULL, + updated_at = now() + WHERE platform = $2 + """, + progress, + _PIPELINE_PLATFORM, + ) + else: + await conn.execute( + """ + UPDATE public.pipeline_settings + SET processing_running = false, + processing_progress = $1, + processing_last_error = $2, + processing_last_error_at = now(), + updated_at = now() + WHERE platform = $3 + """, + progress, + (error or "")[:500], + _PIPELINE_PLATFORM, + ) + + async def ingest_tagged_usernames(self, *, source_limit: int = 1000) -> int: + """Extract tagged usernames from assets raw_posts and upsert operation accounts.""" + async with self._assets_db.acquire() as conn: + rows = await conn.fetch( + """ + SELECT platform_metadata->>'tagged_usernames' AS tagged_usernames + FROM public.raw_posts + WHERE platform = 'instagram' + AND platform_metadata ? 'tagged_usernames' + ORDER BY updated_at DESC + LIMIT $1 + """, + source_limit, + ) + + usernames = _normalize_tagged_csvs( + row["tagged_usernames"] for row in rows if row["tagged_usernames"] + ) + if not usernames: + return 0 + + async with self._operation_db.acquire() as conn: + await conn.executemany( + """ + INSERT INTO public.instagram_accounts ( + username, + needs_review, + account_type, + entity_ig_role, + metadata + ) + VALUES ( + $1, + true, + 'unknown', + 'unknown', + jsonb_build_object( + 'entity_enrichment_source', 'assets.raw_posts.tagged_usernames', + 'entity_enrichment_first_seen_at', now() + ) + ) + ON CONFLICT (username) DO UPDATE + SET updated_at = now(), + metadata = COALESCE(public.instagram_accounts.metadata, '{}'::jsonb) + || jsonb_build_object('entity_enrichment_last_seen_at', now()) + """, + [(u,) for u in usernames], + ) + return len(usernames) + + async def fetch_candidates(self, *, limit: int) -> List[InstagramAccountCandidate]: + async with self._operation_db.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, + username, + name_en, + name_ko, + display_name, + bio, + profile_image_url, + account_type, + entity_ig_role, + entity_region_code, + metadata, + needs_review + FROM public.instagram_accounts + WHERE is_active = true + AND ( + COALESCE(needs_review, true) = true + OR name_en IS NULL + OR name_ko IS NULL + OR profile_image_url IS NULL + OR account_type = 'unknown' + OR entity_ig_role = 'unknown' + OR metadata #>> '{entity_enrichment,gemini_review,status}' IN ('pending', 'error') + OR metadata #>> '{entity_enrichment,profile_backfill,status}' IN ('pending', 'error') + ) + ORDER BY COALESCE(needs_review, true) DESC, + updated_at DESC + LIMIT $1 + """, + limit, + ) + return [ + InstagramAccountCandidate( + id=r["id"], + username=r["username"], + name_en=r["name_en"], + name_ko=r["name_ko"], + display_name=r["display_name"], + bio=r["bio"], + profile_image_url=r["profile_image_url"], + account_type=r["account_type"], + entity_ig_role=r["entity_ig_role"], + entity_region_code=r["entity_region_code"], + metadata=_jsonb_to_dict(r["metadata"]), + needs_review=r["needs_review"], + ) + for r in rows + ] + + async def reset_retryable_enrichment_statuses( + self, *, cooldown_minutes: int + ) -> int: + """Move stale failed/in-flight component statuses back to pending. + + The scheduler retries only the missing component because service.py checks + profile_backfill and gemini_review status independently. + """ + async with self._operation_db.acquire() as conn: + row = await conn.fetchrow( + """ + WITH profile_reset AS ( + UPDATE public.instagram_accounts + SET metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object( + 'profile_backfill', + jsonb_build_object( + 'status', 'pending', + 'retry_reset_at', now(), + 'previous_status', metadata #>> '{entity_enrichment,profile_backfill,status}', + 'previous_error', metadata #>> '{entity_enrichment,profile_backfill,error}' + ) + ) + ), + updated_at = now() + WHERE is_active = true + AND ( + profile_image_url IS NULL + OR display_name IS NULL + ) + AND metadata #>> '{entity_enrichment,profile_backfill,status}' IN ('error', 'processing') + AND COALESCE( + NULLIF(metadata #>> '{entity_enrichment,profile_backfill,errored_at}', ''), + NULLIF(metadata #>> '{entity_enrichment,profile_backfill,started_at}', '') + )::timestamptz <= now() - ($1::int * interval '1 minute') + RETURNING id + ), + gemini_reset AS ( + UPDATE public.instagram_accounts + SET metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object( + 'gemini_review', + jsonb_build_object( + 'status', 'pending', + 'retry_reset_at', now(), + 'previous_status', metadata #>> '{entity_enrichment,gemini_review,status}', + 'previous_error', metadata #>> '{entity_enrichment,gemini_review,error}' + ) + ) + ), + needs_review = true, + updated_at = now() + WHERE is_active = true + AND ( + name_en IS NULL + OR name_ko IS NULL + OR account_type = 'unknown' + OR entity_ig_role = 'unknown' + ) + AND metadata #>> '{entity_enrichment,gemini_review,status}' IN ('error', 'processing') + AND COALESCE( + NULLIF(metadata #>> '{entity_enrichment,gemini_review,errored_at}', ''), + NULLIF(metadata #>> '{entity_enrichment,gemini_review,started_at}', '') + )::timestamptz <= now() - ($1::int * interval '1 minute') + RETURNING id + ) + SELECT + (SELECT count(*) FROM profile_reset) + + (SELECT count(*) FROM gemini_reset) AS reset_count + """, + cooldown_minutes, + ) + return int(row["reset_count"] if row else 0) + + async def mark_profile_processing(self, account_id: UUID) -> None: + await self._set_enrichment_metadata( + account_id, + "profile_backfill", + {"status": "processing", "started_at": _utc_now_iso()}, + ) + + async def save_profile_backfill( + self, + *, + account_id: UUID, + display_name: Optional[str], + bio: Optional[str], + profile_image_url: Optional[str], + metadata: Dict[str, Any], + ) -> None: + async with self._operation_db.acquire() as conn: + await conn.execute( + """ + UPDATE public.instagram_accounts + SET display_name = COALESCE($2, display_name), + bio = COALESCE($3, bio), + profile_image_url = COALESCE($4, profile_image_url), + metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object('profile_backfill', $5::jsonb) + ), + updated_at = now() + WHERE id = $1 + """, + account_id, + display_name, + bio, + profile_image_url, + json.dumps( + { + **(metadata.get("profile_backfill", metadata)), + "status": "completed", + "completed_at": _utc_now_iso(), + } + ), + ) + + async def save_profile_error(self, *, account_id: UUID, error: str) -> None: + await self._set_enrichment_metadata( + account_id, + "profile_backfill", + {"status": "error", "error": error[:500], "errored_at": _utc_now_iso()}, + ) + + async def mark_gemini_processing(self, account_id: UUID) -> None: + await self._set_enrichment_metadata( + account_id, + "gemini_review", + {"status": "processing", "started_at": _utc_now_iso()}, + ) + + async def reserve_grounding_usage( + self, + *, + usage_date: date, + model: str, + cap: int, + ) -> bool: + async with self._assets_db.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT COALESCE(SUM(COALESCE(grounding_queries, 0)), 0)::int AS used + FROM public.gemini_usage_events + WHERE pipeline = $1 + AND model = $2 + AND (occurred_at AT TIME ZONE 'Asia/Seoul')::date = $3 + """, + _PIPELINE_PLATFORM, + model, + usage_date, + ) + used = int(row["used"] if row else 0) + return used < cap + + async def save_gemini_review( + self, + *, + account_id: UUID, + model: str, + review: GeminiReview, + auto_promote: bool, + ) -> None: + status = "reviewed" if auto_promote else "needs_human" + needs_review = not auto_promote + async with self._operation_db.acquire() as conn: + await conn.execute( + """ + UPDATE public.instagram_accounts + SET account_type = $2, + entity_ig_role = $3, + name_en = COALESCE(NULLIF($4, ''), name_en), + name_ko = COALESCE(NULLIF($5, ''), name_ko), + entity_region_code = NULLIF($6, ''), + metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object('gemini_review', $7::jsonb) + ), + needs_review = $8, + updated_at = now() + WHERE id = $1 + """, + account_id, + review.account_type, + review.entity_ig_role, + review.name_en, + review.name_ko, + review.entity_region_code, + json.dumps( + { + "status": status, + "reviewed_at": datetime.utcnow().isoformat() + "Z", + "model": model, + "confidence": review.confidence, + "reason": review.reason[:500], + "payload": review.payload, + } + ), + needs_review, + ) + + async def save_gemini_error(self, *, account_id: UUID, error: str) -> None: + async with self._operation_db.acquire() as conn: + await conn.execute( + """ + UPDATE public.instagram_accounts + SET metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object( + 'gemini_review', + jsonb_build_object( + 'status', 'error', + 'error', $2::text, + 'errored_at', now() + ) + ) + ), + needs_review = true, + updated_at = now() + WHERE id = $1 + """, + account_id, + error[:500], + ) + + async def auto_promote_primary_account(self, *, account_id: UUID) -> Optional[UUID]: + """Create/link a primary artist/brand/group row for an already-reviewed account.""" + async with self._operation_db.acquire() as conn: + async with conn.transaction(): + account = await conn.fetchrow( + """ + SELECT id, + username, + account_type, + entity_ig_role, + name_en, + name_ko, + profile_image_url, + metadata, + artist_id, + brand_id, + group_id + FROM public.instagram_accounts + WHERE id = $1 + FOR UPDATE + """, + account_id, + ) + if account is None or account["entity_ig_role"] != "primary": + return None + account_type = account["account_type"] + if account_type not in ("artist", "brand", "group"): + return None + existing_id = account[f"{account_type}_id"] + if existing_id: + return existing_id + + table = { + "artist": "artists", + "brand": "brands", + "group": "groups", + }[account_type] + image_column = ( + "logo_image_url" if account_type == "brand" else "profile_image_url" + ) + name_en = account["name_en"] or account["username"] + name_ko = account["name_ko"] or name_en + country_of_origin = _country_of_origin_from_payload(account["metadata"]) + + existing = await conn.fetchrow( + f""" + SELECT id + FROM public.{table} + WHERE primary_instagram_account_id = $1 + OR lower(COALESCE(name_en, '')) = lower($2) + OR lower(COALESCE(name_ko, '')) = lower($3) + ORDER BY primary_instagram_account_id IS NULL ASC + LIMIT 1 + """, + account_id, + name_en, + name_ko, + ) + if existing: + entity_id = existing["id"] + if account_type == "brand": + await conn.execute( + f""" + UPDATE public.{table} + SET primary_instagram_account_id = COALESCE(primary_instagram_account_id, $2), + {image_column} = COALESCE({image_column}, $3), + country_of_origin = CASE + WHEN country_of_origin = 'NA' THEN $4 + ELSE country_of_origin + END, + updated_at = now() + WHERE id = $1 + """, + entity_id, + account_id, + account["profile_image_url"], + country_of_origin, + ) + else: + await conn.execute( + f""" + UPDATE public.{table} + SET primary_instagram_account_id = COALESCE(primary_instagram_account_id, $2), + {image_column} = COALESCE({image_column}, $3), + updated_at = now() + WHERE id = $1 + """, + entity_id, + account_id, + account["profile_image_url"], + ) + else: + if account_type == "brand": + row = await conn.fetchrow( + f""" + INSERT INTO public.{table} ( + name_en, + name_ko, + {image_column}, + primary_instagram_account_id, + country_of_origin, + metadata + ) + VALUES ( + $1, $2, $3, $4, $5, + jsonb_build_object('created_by', 'entity_enrichment_auto_promote') + ) + RETURNING id + """, + name_en, + name_ko, + account["profile_image_url"], + account_id, + country_of_origin, + ) + else: + row = await conn.fetchrow( + f""" + INSERT INTO public.{table} ( + name_en, name_ko, {image_column}, primary_instagram_account_id, metadata + ) + VALUES ( + $1, $2, $3, $4, + jsonb_build_object('created_by', 'entity_enrichment_auto_promote') + ) + RETURNING id + """, + name_en, + name_ko, + account["profile_image_url"], + account_id, + ) + entity_id = row["id"] + + await conn.execute( + f""" + UPDATE public.instagram_accounts + SET {account_type}_id = $2, + needs_review = false, + updated_at = now() + WHERE id = $1 + """, + account_id, + entity_id, + ) + return entity_id + + async def _update_account_status( + self, account_id: UUID, values: Dict[str, Any] + ) -> None: + assignments = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(values)) + async with self._operation_db.acquire() as conn: + await conn.execute( + f""" + UPDATE public.instagram_accounts + SET {assignments}, + updated_at = now() + WHERE id = $1 + """, + account_id, + *values.values(), + ) + + async def _set_enrichment_metadata( + self, + account_id: UUID, + key: str, + value: Dict[str, Any], + ) -> None: + async with self._operation_db.acquire() as conn: + await conn.execute( + """ + UPDATE public.instagram_accounts + SET metadata = COALESCE(metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object($2::text, $3::jsonb) + ), + updated_at = now() + WHERE id = $1 + """, + account_id, + key, + json.dumps(value), + ) + + +def _normalize_tagged_csvs(values: Iterable[str]) -> List[str]: + seen: set[str] = set() + out: List[str] = [] + for raw in values: + for token in str(raw).split(","): + username = normalize_instagram_username(token) + if username and username not in seen: + seen.add(username) + out.append(username) + return out + + +def _utc_now_iso() -> str: + return datetime.utcnow().isoformat() + "Z" + + +def _jsonb_to_dict(raw: Any) -> Optional[Dict[str, Any]]: + if raw is None: + return None + if isinstance(raw, dict): + return raw + if isinstance(raw, str): + try: + parsed = json.loads(raw) + except Exception: + return None + return parsed if isinstance(parsed, dict) else None + try: + parsed = dict(raw) + except (TypeError, ValueError): + return None + return parsed if isinstance(parsed, dict) else None + + +def _entity_enrichment_config(raw: Any) -> Dict[str, Any]: + metadata = _jsonb_to_dict(raw) + if metadata is None: + return {} + config = metadata.get("entity_enrichment") or {} + return config if isinstance(config, dict) else {} + + +def _country_of_origin_from_payload(raw: Any) -> str: + metadata = _jsonb_to_dict(raw) + if metadata is None: + return "NA" + enrichment = metadata.get("entity_enrichment") or {} + if not isinstance(enrichment, dict): + return "NA" + review = enrichment.get("gemini_review") or {} + if not isinstance(review, dict): + return "NA" + payload = review.get("payload") or {} + if not isinstance(payload, dict): + return "NA" + model_response = payload.get("model_response") or {} + if not isinstance(model_response, dict): + return "NA" + value = str(model_response.get("country_of_origin") or "").strip() + return value or "NA" diff --git a/packages/ai-server/src/services/entity_enrichment/scheduler.py b/packages/ai-server/src/services/entity_enrichment/scheduler.py new file mode 100644 index 00000000..b634ca25 --- /dev/null +++ b/packages/ai-server/src/services/entity_enrichment/scheduler.py @@ -0,0 +1,147 @@ +"""APScheduler entrypoint for entity enrichment (#495).""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger + +from .repository import EntityEnrichmentRepository, EntityEnrichmentSettings +from .service import EntityEnrichmentService + + +logger = logging.getLogger(__name__) + +_TICK_SECONDS = 60 +_ACCOUNT_DELAY_SECONDS = 30 +_RETRY_RESET_COOLDOWN_MINUTES = 45 + + +class EntityEnrichmentScheduler: + def __init__( + self, + *, + repository: EntityEnrichmentRepository, + service: EntityEnrichmentService, + ) -> None: + self._repo = repository + self._service = service + self._scheduler: Optional[AsyncIOScheduler] = None + + def start(self) -> None: + if self._scheduler is not None: + logger.warning("EntityEnrichmentScheduler.start() called twice — ignoring") + return + scheduler = AsyncIOScheduler() + scheduler.add_job( + self._cycle, + IntervalTrigger(seconds=_TICK_SECONDS), + id="entity_enrichment_cycle", + max_instances=1, + coalesce=True, + next_run_time=datetime.now(timezone.utc) + timedelta(seconds=45), + ) + scheduler.start() + self._scheduler = scheduler + logger.info("EntityEnrichmentScheduler started (tick=%ds)", _TICK_SECONDS) + + async def shutdown(self) -> None: + if self._scheduler is None: + return + self._scheduler.shutdown(wait=False) + self._scheduler = None + logger.info("EntityEnrichmentScheduler shutdown") + + @staticmethod + def _interval_elapsed(last_run_at: Optional[datetime], cycle_seconds: int) -> bool: + if last_run_at is None: + return True + return ( + datetime.now(timezone.utc) - last_run_at + ).total_seconds() >= cycle_seconds + + async def _cycle(self) -> None: + try: + settings = await self._repo.fetch_settings() + except Exception: + logger.exception("entity_enrichment: fetch settings failed") + return + if not settings.enabled: + return + if settings.running: + return + try: + reset_count = await self._repo.reset_retryable_enrichment_statuses( + cooldown_minutes=_RETRY_RESET_COOLDOWN_MINUTES + ) + if reset_count: + logger.info( + "entity_enrichment: reset %s stale component statuses to pending", + reset_count, + ) + except Exception: + logger.exception("entity_enrichment: retry status reset failed") + if not self._interval_elapsed(settings.last_run_at, settings.cycle_seconds): + return + try: + await self._run_one(settings) + except Exception as exc: + logger.exception("entity_enrichment: cycle crashed") + await self._repo.update_run_health( + success=False, error=f"cycle crash: {exc}", progress="Errored" + ) + + async def _run_one(self, settings: EntityEnrichmentSettings) -> None: + await self._repo.mark_running(progress="Ingesting tagged accounts") + ingested = await self._repo.ingest_tagged_usernames( + source_limit=max(1000, settings.batch_size * 100) + ) + + await self._repo.mark_running(progress="Loading enrichment candidates") + candidates = await self._repo.fetch_candidates(limit=settings.batch_size) + if not candidates: + await self._repo.update_run_health( + success=True, + progress=f"No candidates (ingested {ingested} usernames)", + ) + return + + processed = 0 + for idx, candidate in enumerate(candidates, start=1): + await self._repo.mark_running( + progress=( + f"Processing {idx}/{len(candidates)} @{candidate.username} " + f"(ingested {ingested})" + ) + ) + keep_going = await self._service.enrich_candidate( + candidate, + daily_cap=settings.daily_grounding_cap, + auto_promote_confidence_min=settings.auto_promote_confidence_min, + ) + if not keep_going: + await self._repo.update_run_health( + success=True, + progress=( + f"Paused at Gemini grounding cap after {processed} accounts" + ), + ) + return + processed += 1 + if idx < len(candidates): + await self._repo.mark_running( + progress=( + f"Waiting {_ACCOUNT_DELAY_SECONDS}s before next account " + "to avoid Instagram bot detection" + ) + ) + await asyncio.sleep(_ACCOUNT_DELAY_SECONDS) + + await self._repo.update_run_health( + success=True, + progress=f"Processed {processed} accounts; ingested {ingested} usernames", + ) diff --git a/packages/ai-server/src/services/entity_enrichment/service.py b/packages/ai-server/src/services/entity_enrichment/service.py new file mode 100644 index 00000000..d737ed78 --- /dev/null +++ b/packages/ai-server/src/services/entity_enrichment/service.py @@ -0,0 +1,446 @@ +"""Gemini/profile enrichment for Instagram accounts (#495).""" + +from __future__ import annotations + +import asyncio +import json +import logging +from pathlib import Path +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional + +import httpx + +from src.managers.storage.r2_client import R2Client +from src.services.cost_tracking import ( + extract_grounded_response, + set_context, + track_call, +) +from src.services.raw_posts.adapters.instagram import normalize_instagram_username + +from .repository import ( + EntityEnrichmentRepository, + GeminiReview, + InstagramAccountCandidate, +) + + +logger = logging.getLogger(__name__) + +_GEMINI_BASE = "https://generativelanguage.googleapis.com/v1beta/models" +_FEATURE = "entity_enrichment" + + +_REVIEW_PROMPT = """You classify Instagram accounts for a Korean fashion/editorial catalog. + +Return ONLY JSON with this schema: +{ + "account_type": "artist" | "brand" | "group" | "other" | "unknown", + "entity_ig_role": "primary" | "secondary" | "regional" | "unknown", + "name_en": string | null, + "name_ko": string | null, + "entity_region_code": string | null, + "country_of_origin": string, + "confidence": number, + "reason": string, + "evidence_urls": string[] +} + +Rules: +- artist: individual celebrity, idol, actor, model, musician. +- group: music group/team account. +- account_type must be one of: artist, brand, group, other, unknown. +- brand: fashion-only product brand or fashion retailer. Include apparel, footwear, + bags, jewelry, eyewear, watches, accessories, sportswear, and streetwear. +- For brand accounts only, set country_of_origin to the country where the brand was + founded, e.g. "Italy". Use "NA" if unknown or not a brand. +- other: known real-world account that is not eligible for this fashion catalog, + including beauty/cosmetics, magazines/media (e.g. Vogue), technology companies + (e.g. Meta), character/IP owners (e.g. Hello Kitty/Sanrio), venues, restaurants, + agencies, fan pages, and general platforms. +- unknown: insufficient evidence to identify the account. +- primary: official global/main account for the entity. +- secondary: sub-account, campaign account, staff/fan/archive account. +- regional: official local/regional account, e.g. Japan/Korea. Use ISO-like region code such as JP, KR, US. +- Use Google Search grounding evidence. If unsure, use unknown and low confidence. +- confidence must be between 0 and 1. +""" + + +@dataclass(frozen=True) +class ProfileBackfill: + display_name: Optional[str] + bio: Optional[str] + profile_image_url: Optional[str] + source_profile_image_url: Optional[str] + metadata: Dict[str, Any] + + +class EntityEnrichmentService: + def __init__( + self, + *, + repository: EntityEnrichmentRepository, + r2_client: R2Client, + gemini_api_key: str, + gemini_model: str, + instagram_session_username: str, + ) -> None: + self._repo = repository + self._r2 = r2_client + self._gemini_api_key = gemini_api_key + self._gemini_model = gemini_model or "gemini-2.5-flash" + self._instagram_session_username = instagram_session_username + + async def enrich_candidate( + self, + candidate: InstagramAccountCandidate, + *, + daily_cap: int, + auto_promote_confidence_min: float, + ) -> bool: + """Enrich one account. Returns False when quota stops the batch.""" + profile_status = _metadata_status(candidate, "profile_backfill") + if profile_status in ( + "pending", + "error", + ) or (not candidate.profile_image_url and profile_status != "completed"): + await self._backfill_profile(candidate) + + if ( + _metadata_status(candidate, "gemini_review") + in ( + "pending", + "error", + ) + or _missing_names(candidate) + or candidate.account_type == "unknown" + ): + reserved = await self._repo.reserve_grounding_usage( + usage_date=_kst_date(), + model=self._gemini_model, + cap=daily_cap, + ) + if not reserved: + logger.info("entity_enrichment: daily Gemini grounding cap reached") + return False + await self._review_with_gemini( + candidate, + auto_promote_confidence_min=auto_promote_confidence_min, + ) + return True + + async def _backfill_profile(self, candidate: InstagramAccountCandidate) -> None: + await self._repo.mark_profile_processing(candidate.id) + try: + profile = await asyncio.to_thread( + _sync_fetch_profile, + session_username=self._instagram_session_username, + username=candidate.username, + ) + if profile.source_profile_image_url: + profile_image_url = await self._mirror_profile_image( + candidate.username, + profile.source_profile_image_url, + ) + profile = ProfileBackfill( + display_name=profile.display_name, + bio=profile.bio, + profile_image_url=profile_image_url + or profile.source_profile_image_url, + source_profile_image_url=profile.source_profile_image_url, + metadata=profile.metadata, + ) + + await self._repo.save_profile_backfill( + account_id=candidate.id, + display_name=profile.display_name, + bio=profile.bio, + profile_image_url=profile.profile_image_url, + metadata=profile.metadata, + ) + except Exception as exc: + logger.exception( + "entity_enrichment: profile backfill failed username=%s error_class=%s", + candidate.username, + type(exc).__name__, + ) + await self._repo.save_profile_error(account_id=candidate.id, error=str(exc)) + + async def _review_with_gemini( + self, + candidate: InstagramAccountCandidate, + *, + auto_promote_confidence_min: float, + ) -> None: + if not self._gemini_api_key: + await self._repo.save_gemini_error( + account_id=candidate.id, error="GEMINI_API_KEY is not configured" + ) + return + + await self._repo.mark_gemini_processing(candidate.id) + try: + review = await self._call_gemini(candidate) + auto_promote = ( + review.confidence >= auto_promote_confidence_min + and review.entity_ig_role == "primary" + and review.account_type in ("artist", "brand", "group") + ) + await self._repo.save_gemini_review( + account_id=candidate.id, + model=self._gemini_model, + review=review, + auto_promote=auto_promote, + ) + if auto_promote: + await self._repo.auto_promote_primary_account(account_id=candidate.id) + except Exception as exc: + logger.exception( + "entity_enrichment: Gemini review failed username=%s", + candidate.username, + ) + await self._repo.save_gemini_error(account_id=candidate.id, error=str(exc)) + + async def _call_gemini(self, candidate: InstagramAccountCandidate) -> GeminiReview: + account_url = f"https://www.instagram.com/{candidate.username}/" + user_text = { + "instagram_username": candidate.username, + "instagram_url": account_url, + "display_name": candidate.display_name, + "bio": candidate.bio, + "existing_name_en": candidate.name_en, + "existing_name_ko": candidate.name_ko, + } + payload = { + "contents": [ + { + "role": "user", + "parts": [ + {"text": _REVIEW_PROMPT}, + {"text": json.dumps(user_text, ensure_ascii=False)}, + ], + } + ], + "tools": [{"googleSearch": {}}], + "generationConfig": { + "temperature": 0.1, + }, + } + url = f"{_GEMINI_BASE}/{self._gemini_model}:generateContent" + async with httpx.AsyncClient() as client: + + async def _post_gemini() -> httpx.Response: + response = await client.post( + url, + params={"key": self._gemini_api_key}, + json=payload, + timeout=60, + ) + response.raise_for_status() + return response + + set_context(pipeline=_FEATURE) + try: + resp = await track_call( + "instagram_account_review", + self._gemini_model, + extract_grounded_response, + _post_gemini(), + ) + except httpx.HTTPStatusError as exc: + detail = _truncate_response_text(exc.response.text) + raise RuntimeError( + f"Gemini HTTP {exc.response.status_code} " + f"for model={self._gemini_model}: {detail}" + ) from exc + data = resp.json() + text = data["candidates"][0]["content"]["parts"][0]["text"] + parsed = json.loads(_strip_json_fence(text)) + account_type = _normalize_choice( + parsed.get("account_type"), + {"artist", "brand", "group", "other", "unknown"}, + ) + entity_ig_role = _normalize_choice( + parsed.get("entity_ig_role"), + {"primary", "secondary", "regional", "unknown"}, + ) + confidence = _clamp_float(parsed.get("confidence"), default=0.0) + return GeminiReview( + account_type=account_type, + entity_ig_role=entity_ig_role, + name_en=_optional_str(parsed.get("name_en")), + name_ko=_optional_str(parsed.get("name_ko")), + entity_region_code=_optional_str(parsed.get("entity_region_code")), + country_of_origin=_optional_str(parsed.get("country_of_origin")) or "NA", + confidence=confidence, + reason=_optional_str(parsed.get("reason")) or "No reason provided.", + payload={ + "feature": _FEATURE, + "model_response": parsed, + "grounding_metadata": _grounding_metadata(data), + }, + ) + + async def _mirror_profile_image(self, username: str, url: str) -> Optional[str]: + if not self._r2.is_configured(): + logger.warning("entity_enrichment: R2 not configured; storing source URL") + return None + async with httpx.AsyncClient() as client: + resp = await client.get(url, timeout=30, follow_redirects=True) + resp.raise_for_status() + content_type = resp.headers.get("content-type", "image/jpeg").split(";")[0] + ext = _extension_for_content_type(content_type) + key = f"entity-profiles/instagram/{normalize_instagram_username(username)}.{ext}" + result = await asyncio.to_thread( + self._r2.put, + key, + resp.content, + content_type, + ) + return result.url + + +def _sync_fetch_profile(*, session_username: str, username: str) -> ProfileBackfill: + import instaloader + + session_user = normalize_instagram_username(session_username) + profile_username = normalize_instagram_username(username) + if not session_user: + raise RuntimeError("INSTAGRAM_SESSION_USERNAME is not configured") + + loader = instaloader.Instaloader( + download_pictures=False, + download_videos=False, + download_video_thumbnails=False, + download_geotags=False, + download_comments=False, + save_metadata=False, + compress_json=False, + ) + session_path = _instaloader_session_path(session_user) + session_file_exists = session_path.exists() + loader.load_session_from_file(session_user) + try: + login_user = loader.test_login() + except Exception as exc: + login_user = None + logger.warning( + "entity_enrichment: instaloader session test failed session_user=%s session_file=%s exists=%s error_class=%s error=%s", + session_user, + str(session_path), + session_file_exists, + type(exc).__name__, + exc, + ) + try: + profile = instaloader.Profile.from_username(loader.context, profile_username) + except Exception as exc: + logger.warning( + "entity_enrichment: instaloader profile lookup failed username=%s session_user=%s session_file=%s exists=%s test_login=%s context_user=%s error_class=%s error=%s", + profile_username, + session_user, + str(session_path), + session_file_exists, + login_user, + getattr(loader.context, "username", None), + type(exc).__name__, + exc, + ) + raise + return ProfileBackfill( + display_name=getattr(profile, "full_name", None) or None, + bio=getattr(profile, "biography", None) or None, + profile_image_url=None, + source_profile_image_url=getattr(profile, "profile_pic_url_no_iphone", None) + or None, + metadata={ + "profile_backfill": { + "source": "instaloader", + "username": profile_username, + "is_verified": bool(getattr(profile, "is_verified", False)), + "followers": getattr(profile, "followers", None), + "fetched_at": datetime.utcnow().isoformat() + "Z", + } + }, + ) + + +def _kst_date(): + # The quota is managed by product/business day in Korea. + return datetime.now(timezone(timedelta(hours=9))).date() + + +def _missing_names(candidate: InstagramAccountCandidate) -> bool: + return not candidate.name_en or not candidate.name_ko + + +def _metadata_status(candidate: InstagramAccountCandidate, key: str) -> str: + metadata = candidate.metadata or {} + enrichment = metadata.get("entity_enrichment") if isinstance(metadata, dict) else {} + if not isinstance(enrichment, dict): + return "pending" + payload = enrichment.get(key) or {} + if not isinstance(payload, dict): + return "pending" + return str(payload.get("status") or "pending") + + +def _normalize_choice(value: Any, allowed: set[str]) -> str: + v = str(value or "").strip().lower() + return v if v in allowed else "unknown" + + +def _optional_str(value: Any) -> Optional[str]: + if value is None: + return None + s = str(value).strip() + return s or None + + +def _clamp_float(value: Any, *, default: float) -> float: + try: + n = float(value) + except (TypeError, ValueError): + return default + return min(1.0, max(0.0, n)) + + +def _grounding_metadata(data: Dict[str, Any]) -> Dict[str, Any]: + try: + return data["candidates"][0].get("groundingMetadata") or {} + except (KeyError, IndexError): + return {} + + +def _extension_for_content_type(content_type: str) -> str: + if content_type == "image/png": + return "png" + if content_type == "image/webp": + return "webp" + return "jpg" + + +def _instaloader_session_path(session_user: str) -> Path: + return Path.home() / ".config" / "instaloader" / f"session-{session_user}" + + +def _truncate_response_text(text: str, *, limit: int = 1000) -> str: + cleaned = (text or "").strip() + if len(cleaned) <= limit: + return cleaned + return cleaned[:limit] + "..." + + +def _strip_json_fence(text: str) -> str: + cleaned = (text or "").strip() + if not cleaned.startswith("```"): + return cleaned + cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else "" + if cleaned.endswith("```"): + cleaned = cleaned[:-3] + if cleaned.lstrip().startswith("json"): + cleaned = cleaned.lstrip()[4:].lstrip() + return cleaned.strip() diff --git a/packages/api-server/src/domains/admin/gemini_cost.rs b/packages/api-server/src/domains/admin/gemini_cost.rs index 9dc89b29..f9fbe813 100644 --- a/packages/api-server/src/domains/admin/gemini_cost.rs +++ b/packages/api-server/src/domains/admin/gemini_cost.rs @@ -27,6 +27,9 @@ pub struct DaysQuery { /// 조회 기간 (일). 1..=90. 기본 7. #[serde(default)] pub days: Option, + /// Optional pipeline filter, e.g. entity_enrichment. + #[serde(default)] + pub pipeline: Option, } #[derive(Debug, Deserialize)] @@ -134,9 +137,10 @@ pub async fn get_daily_spend( ) -> AppResult> { let days = clamp_days(q.days, 7); let db = state.assets_db.as_ref(); - let stmt = Statement::from_sql_and_values( - DatabaseBackend::Postgres, - "SELECT date_trunc('day', occurred_at)::date::text AS day, + let pipeline = q.pipeline.as_deref().filter(|v| !v.trim().is_empty()); + let (sql, values) = if let Some(pipeline) = pipeline { + ( + "SELECT date_trunc('day', occurred_at)::date::text AS day, COALESCE(SUM(est_cost_usd), 0)::float8 AS spend_usd, COUNT(*) FILTER (WHERE ok) AS ok_calls, COUNT(*) FILTER (WHERE NOT ok) AS failed_calls, @@ -146,10 +150,29 @@ pub async fn get_daily_spend( COALESCE(SUM(grounding_queries), 0) AS groundings FROM public.gemini_usage_events WHERE occurred_at >= now() - ($1::int * interval '1 day') + AND pipeline = $2 GROUP BY 1 ORDER BY 1 DESC", - vec![(days as i32).into()], - ); + vec![(days as i32).into(), pipeline.to_string().into()], + ) + } else { + ( + "SELECT date_trunc('day', occurred_at)::date::text AS day, + COALESCE(SUM(est_cost_usd), 0)::float8 AS spend_usd, + COUNT(*) FILTER (WHERE ok) AS ok_calls, + COUNT(*) FILTER (WHERE NOT ok) AS failed_calls, + COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens, + COALESCE(SUM(completion_tokens), 0) AS completion_tokens, + COALESCE(SUM(image_output_count), 0) AS images, + COALESCE(SUM(grounding_queries), 0) AS groundings + FROM public.gemini_usage_events + WHERE occurred_at >= now() - ($1::int * interval '1 day') + GROUP BY 1 + ORDER BY 1 DESC", + vec![(days as i32).into()], + ) + }; + let stmt = Statement::from_sql_and_values(DatabaseBackend::Postgres, sql, values); let rows = db.query_all(stmt).await.map_err(AppError::DatabaseError)?; let mut days_out = Vec::with_capacity(rows.len()); let mut total = 0.0f64; diff --git a/packages/web/app/admin/data-pipeline/instagram-accounts/page.tsx b/packages/web/app/admin/data-pipeline/instagram-accounts/page.tsx new file mode 100644 index 00000000..93a613a6 --- /dev/null +++ b/packages/web/app/admin/data-pipeline/instagram-accounts/page.tsx @@ -0,0 +1,341 @@ +"use client"; + +import { useMemo, useState } from "react"; +import { Database, Loader2, Play, Search, ToggleRight } from "lucide-react"; +import { AdminDataTable, type Column } from "@/lib/components/admin/common"; +import { Button } from "@/lib/components/ui/button"; +import { Card } from "@/lib/components/ui/card"; +import { Input } from "@/lib/design-system"; +import { + useEntityEnrichment, + useUpdateEntityEnrichmentSettings, + type InstagramAccountEnrichmentRow, +} from "@/lib/api/admin/entity-enrichment"; + +const TYPE_OPTIONS = ["", "artist", "brand", "group", "other", "unknown"]; +const ROLE_OPTIONS = ["", "primary", "secondary", "regional", "unknown"]; + +export default function InstagramAccountEnrichmentPage() { + const [accountType, setAccountType] = useState(""); + const [role, setRole] = useState(""); + const [search, setSearch] = useState(""); + const { data, isLoading, isError, error } = useEntityEnrichment({ + accountType, + role, + search, + limit: 50, + }); + const settingsMut = useUpdateEntityEnrichmentSettings(); + + const usage = data?.usage; + const settings = data?.settings; + const used = usage?.grounded_request_count ?? 0; + const quota = usage?.free_quota ?? settings?.daily_grounding_cap ?? 1500; + const remaining = Math.max(0, quota - used); + const usagePct = + quota > 0 ? Math.min(100, Math.round((used / quota) * 100)) : 0; + + const columns = useMemo[]>( + () => [ + { + key: "account", + label: "Account", + className: "min-w-[14rem]", + render: (row) => ( +
+ {row.profile_image_url ? ( + // eslint-disable-next-line @next/next/no-img-element + + ) : ( +
+ )} +
+ + @{row.username} + +

+ {row.display_name ?? row.bio ?? "No profile metadata"} +

+
+
+ ), + }, + { + key: "names", + label: "Names", + render: (row) => ( +
+

{row.name_en ?? "—"}

+

+ {row.name_ko ?? "—"} +

+
+ ), + }, + { + key: "classification", + label: "Classification", + render: (row) => ( +
+ {row.account_type} + {row.entity_ig_role} + {row.entity_region_code ? ( + {row.entity_region_code} + ) : null} +
+ ), + }, + { + key: "review", + label: "Gemini", + className: "min-w-[14rem]", + render: (row) => ( +
+
+ {metadataStatus(row, "gemini_review")} + {row.metadata?.entity_enrichment?.gemini_review?.confidence != + null ? ( + + {Math.round( + Number( + row.metadata.entity_enrichment.gemini_review.confidence + ) * 100 + )} + % + + ) : null} +
+

+ {row.metadata?.entity_enrichment?.gemini_review?.reason ?? + row.metadata?.entity_enrichment?.gemini_review?.error ?? + "No review reason yet"} +

+
+ ), + }, + { + key: "profile", + label: "Profile", + render: (row) => { + const profile = row.metadata?.entity_enrichment?.profile_backfill; + return ( +
+ {profileStatusLabel(profile?.status)} + {profile?.error ? ( +

+ {profile.error} +

+ ) : null} +
+ ); + }, + }, + { + key: "links", + label: "Entity", + render: (row) => { + const linked = row.artist_id ?? row.brand_id ?? row.group_id; + return ( + + {linked ? linked.slice(0, 8) : "unlinked"} + + ); + }, + }, + ], + [] + ); + + return ( +
+
+
+

+ Data Pipeline +

+

+ Instagram Account Enrichment +

+

+ Enrich tagged Instagram accounts into reviewed artist, brand, and + group catalog entries. +

+
+ +
+ +
+ + + + +
+ + +
+
+

+ Gemini Grounding Usage (KST) +

+

+ {used.toLocaleString()} / {quota.toLocaleString()} grounded + prompts used today.{" "} + {settings?.progress ? `Latest: ${settings.progress}` : ""} +

+
+ {settings?.running ? ( + + + Running + + ) : null} +
+
+
+
+ {settings?.last_error ? ( +

{settings.last_error}

+ ) : null} + + + +
+ + +
+ + {isError ? ( +
+ {error instanceof Error + ? error.message + : "Failed to load enrichment queue"} +
+ ) : ( + row.id} + isLoading={isLoading} + emptyMessage="No Instagram accounts found" + /> + )} +
+
+ ); +} + +function metadataStatus( + row: InstagramAccountEnrichmentRow, + key: "gemini_review" | "profile_backfill" +) { + return row.metadata?.entity_enrichment?.[key]?.status ?? "pending"; +} + +function profileStatusLabel(status: string | undefined) { + return status === "error" ? "profile error" : (status ?? "pending"); +} + +function MetricCard({ label, value }: { label: string; value: number }) { + return ( + +
+ + {label} +
+

+ {value.toLocaleString()} +

+
+ ); +} + +function Badge({ children }: { children: React.ReactNode }) { + return ( + + {children} + + ); +} + +function Select({ + value, + onChange, + options, + label, +}: { + value: string; + onChange: (value: string) => void; + options: string[]; + label: string; +}) { + return ( + + ); +} diff --git a/packages/web/app/api/admin/data-pipeline/instagram-accounts/route.ts b/packages/web/app/api/admin/data-pipeline/instagram-accounts/route.ts new file mode 100644 index 00000000..a60c3c4e --- /dev/null +++ b/packages/web/app/api/admin/data-pipeline/instagram-accounts/route.ts @@ -0,0 +1,208 @@ +import { NextRequest, NextResponse } from "next/server"; +import { checkIsAdmin } from "@/lib/supabase/admin"; +import { createAdminSupabaseClient } from "@/lib/supabase/admin-server"; +import { createSupabaseServerClient } from "@/lib/supabase/server"; +import { API_BASE_URL } from "@/lib/server-env"; + +const DEFAULT_LIMIT = 25; +const MAX_LIMIT = 100; +const PLATFORM = "entity_enrichment"; +const DEFAULT_SETTINGS = { + enabled: false, + cycle_seconds: 3600, + batch_size: 10, + daily_grounding_cap: 1500, + auto_promote_confidence_min: 0.9, + last_run_at: null, + last_success_at: null, + last_error: null, + last_error_at: null, + running: false, + progress: null, +}; + +function kstDateString(date = new Date()): string { + const kst = new Date(date.getTime() + 9 * 60 * 60 * 1000); + return kst.toISOString().slice(0, 10); +} + +export async function GET(req: NextRequest) { + const auth = await adminSession(); + if ("error" in auth) { + return NextResponse.json({ error: auth.error }, { status: auth.status }); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const supabase = createAdminSupabaseClient() as any; + const params = req.nextUrl.searchParams; + const limit = Math.min( + MAX_LIMIT, + Math.max(1, Number(params.get("limit") ?? DEFAULT_LIMIT)) + ); + const offset = Math.max(0, Number(params.get("offset") ?? 0)); + const accountType = params.get("account_type") ?? ""; + const role = params.get("entity_ig_role") ?? ""; + const search = params.get("search")?.trim() ?? ""; + const today = kstDateString(); + + let accountsQuery = supabase + .from("instagram_accounts") + .select( + "id,username,name_en,name_ko,display_name,bio,profile_image_url,account_type,entity_ig_role,entity_region_code,needs_review,artist_id,brand_id,group_id,metadata,updated_at", + { count: "exact" } + ) + .order("updated_at", { ascending: false }) + .range(offset, offset + limit - 1); + + if (accountType) + accountsQuery = accountsQuery.eq("account_type", accountType); + if (role) accountsQuery = accountsQuery.eq("entity_ig_role", role); + if (search) { + accountsQuery = accountsQuery.or( + `username.ilike.%${search}%,name_en.ilike.%${search}%,name_ko.ilike.%${search}%,display_name.ilike.%${search}%` + ); + } + + const [settingsRes, usageRes, accountsRes, statusRowsRes, typeRes, roleRes] = + await Promise.all([ + fetchEntitySettings(auth.token), + fetchEntityUsage(auth.token), + accountsQuery, + supabase + .from("instagram_accounts") + .select("metadata", { count: "exact" }), + supabase + .from("instagram_accounts") + .select("account_type", { count: "exact" }), + supabase + .from("instagram_accounts") + .select("entity_ig_role", { count: "exact" }), + ]); + + const firstError = + accountsRes.error ?? statusRowsRes.error ?? typeRes.error ?? roleRes.error; + if (firstError) { + return NextResponse.json({ error: firstError.message }, { status: 500 }); + } + + return NextResponse.json({ + settings: settingsRes, + usage: usageRes ?? { + usage_date: today, + feature: PLATFORM, + model: "gemini-2.5-flash", + grounded_request_count: 0, + free_quota: settingsRes.daily_grounding_cap, + paid_overage_count: 0, + }, + accounts: accountsRes.data ?? [], + pagination: { + limit, + offset, + total: accountsRes.count ?? 0, + }, + summary: { + total: statusRowsRes.count ?? 0, + by_status: countByReviewStatus(statusRowsRes.data ?? []), + by_account_type: countBy(typeRes.data ?? [], "account_type"), + by_role: countBy(roleRes.data ?? [], "entity_ig_role"), + }, + }); +} + +async function adminSession() { + const supabase = await createSupabaseServerClient(); + const { + data: { user }, + } = await supabase.auth.getUser(); + if (!user) return { error: "Unauthorized", status: 401 as const }; + if (!(await checkIsAdmin(supabase, user.id))) { + return { error: "Forbidden", status: 403 as const }; + } + const { + data: { session }, + } = await supabase.auth.getSession(); + if (!session?.access_token) { + return { error: "No session", status: 401 as const }; + } + return { token: session.access_token }; +} + +async function fetchEntitySettings(token: string) { + if (!API_BASE_URL) return DEFAULT_SETTINGS; + const res = await fetch(`${API_BASE_URL}/api/v1/raw-posts/pipeline/health`, { + headers: { Authorization: `Bearer ${token}` }, + cache: "no-store", + }); + if (!res.ok) return DEFAULT_SETTINGS; + const data = (await res.json()) as { + entries?: Array>; + }; + const row = data.entries?.find((entry) => entry.platform === PLATFORM); + if (!row) return DEFAULT_SETTINGS; + return { + enabled: Boolean(row.processing_enabled), + cycle_seconds: Number(row.processing_cycle_seconds ?? 3600), + batch_size: Number(row.processing_batch_size ?? 10), + daily_grounding_cap: 1500, + auto_promote_confidence_min: 0.9, + last_run_at: (row.processing_last_run_at as string | null) ?? null, + last_success_at: (row.processing_last_success_at as string | null) ?? null, + last_error: (row.processing_last_error as string | null) ?? null, + last_error_at: (row.processing_last_error_at as string | null) ?? null, + running: Boolean(row.processing_running), + progress: (row.processing_progress as string | null) ?? null, + }; +} + +async function fetchEntityUsage(token: string) { + if (!API_BASE_URL) return null; + const res = await fetch( + `${API_BASE_URL}/api/v1/admin/gemini-cost/spend/daily?days=1&pipeline=${PLATFORM}`, + { + headers: { Authorization: `Bearer ${token}` }, + cache: "no-store", + } + ); + if (!res.ok) return null; + const data = (await res.json()) as { + days?: Array<{ day: string; groundings: number }>; + }; + const today = kstDateString(); + const todayRow = data.days?.find((row) => row.day === today); + return { + usage_date: today, + feature: PLATFORM, + model: "gemini-2.5-flash", + grounded_request_count: todayRow?.groundings ?? 0, + free_quota: 1500, + paid_overage_count: Math.max(0, (todayRow?.groundings ?? 0) - 1500), + }; +} + +function countBy(rows: Array>, key: string) { + return rows.reduce>((acc, row) => { + const value = String(row[key] ?? "unknown"); + acc[value] = (acc[value] ?? 0) + 1; + return acc; + }, {}); +} + +function countByReviewStatus(rows: Array>) { + return rows.reduce>((acc, row) => { + const status = reviewStatusFromMetadata(row["metadata"]); + acc[status] = (acc[status] ?? 0) + 1; + return acc; + }, {}); +} + +function reviewStatusFromMetadata(metadata: unknown) { + if (!metadata || typeof metadata !== "object") return "pending"; + const enrichment = (metadata as { entity_enrichment?: unknown }) + .entity_enrichment; + if (!enrichment || typeof enrichment !== "object") return "pending"; + const review = (enrichment as { gemini_review?: unknown }).gemini_review; + if (!review || typeof review !== "object") return "pending"; + const status = (review as { status?: unknown }).status; + return typeof status === "string" ? status : "pending"; +} diff --git a/packages/web/app/api/admin/data-pipeline/instagram-accounts/settings/route.ts b/packages/web/app/api/admin/data-pipeline/instagram-accounts/settings/route.ts new file mode 100644 index 00000000..a8704f63 --- /dev/null +++ b/packages/web/app/api/admin/data-pipeline/instagram-accounts/settings/route.ts @@ -0,0 +1,72 @@ +import { NextRequest, NextResponse } from "next/server"; +import { checkIsAdmin } from "@/lib/supabase/admin"; +import { createSupabaseServerClient } from "@/lib/supabase/server"; +import { API_BASE_URL } from "@/lib/server-env"; + +const PLATFORM = "entity_enrichment"; + +export async function PATCH(req: NextRequest) { + const auth = await adminSession(); + if ("error" in auth) { + return NextResponse.json({ error: auth.error }, { status: auth.status }); + } + if (!API_BASE_URL) { + return NextResponse.json( + { error: "API_BASE_URL is not configured" }, + { status: 502 } + ); + } + + const body = await req.json(); + const patch: Record = {}; + if ("enabled" in body) patch.processing_enabled = body.enabled; + if ("cycle_seconds" in body) + patch.processing_cycle_seconds = body.cycle_seconds; + if ("batch_size" in body) patch.processing_batch_size = body.batch_size; + if (Object.keys(patch).length === 0) { + return NextResponse.json({ error: "No valid fields" }, { status: 400 }); + } + + try { + const res = await fetch( + `${API_BASE_URL}/api/v1/raw-posts/pipeline/settings/${PLATFORM}`, + { + method: "PATCH", + headers: { + Authorization: `Bearer ${auth.token}`, + "content-type": "application/json", + }, + body: JSON.stringify(patch), + } + ); + if (res.status === 204) return NextResponse.json({ data: null }); + const text = await res.text(); + return new NextResponse(text, { + status: res.status, + headers: { "content-type": "application/json" }, + }); + } catch (error) { + return NextResponse.json( + { error: error instanceof Error ? error.message : "Proxy error" }, + { status: 502 } + ); + } +} + +async function adminSession() { + const supabase = await createSupabaseServerClient(); + const { + data: { user }, + } = await supabase.auth.getUser(); + if (!user) return { error: "Unauthorized", status: 401 as const }; + if (!(await checkIsAdmin(supabase, user.id))) { + return { error: "Forbidden", status: 403 as const }; + } + const { + data: { session }, + } = await supabase.auth.getSession(); + if (!session?.access_token) { + return { error: "No session", status: 401 as const }; + } + return { token: session.access_token }; +} diff --git a/packages/web/lib/api/admin/entity-enrichment.ts b/packages/web/lib/api/admin/entity-enrichment.ts new file mode 100644 index 00000000..06d8d63b --- /dev/null +++ b/packages/web/lib/api/admin/entity-enrichment.ts @@ -0,0 +1,125 @@ +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; + +async function adminFetch(url: string, init?: RequestInit): Promise { + const res = await fetch(url, init); + const text = await res.text(); + const body = text ? JSON.parse(text) : {}; + if (!res.ok) { + throw new Error(body.error ?? `Request failed: ${res.status}`); + } + return body as T; +} + +export interface EntityEnrichmentSettings { + enabled: boolean; + cycle_seconds: number; + batch_size: number; + daily_grounding_cap: number; + auto_promote_confidence_min: number; + last_run_at: string | null; + last_success_at: string | null; + last_error: string | null; + last_error_at: string | null; + running: boolean; + progress: string | null; +} + +export interface GeminiGroundingUsage { + usage_date: string; + feature: string; + model: string; + grounded_request_count: number; + free_quota: number; + paid_overage_count: number; +} + +export interface InstagramAccountEnrichmentRow { + id: string; + username: string; + name_en: string | null; + name_ko: string | null; + display_name: string | null; + bio: string | null; + profile_image_url: string | null; + account_type: "artist" | "brand" | "group" | "other" | "unknown"; + entity_ig_role: "primary" | "secondary" | "regional" | "unknown"; + entity_region_code: string | null; + needs_review: boolean | null; + artist_id: string | null; + brand_id: string | null; + group_id: string | null; + metadata: { + entity_enrichment?: { + gemini_review?: { + status?: string; + confidence?: number; + reason?: string; + error?: string; + }; + profile_backfill?: { + status?: string; + error?: string; + }; + }; + } | null; + updated_at: string; +} + +export interface EntityEnrichmentResponse { + settings: EntityEnrichmentSettings; + usage: GeminiGroundingUsage; + accounts: InstagramAccountEnrichmentRow[]; + pagination: { + limit: number; + offset: number; + total: number; + }; + summary: { + total: number; + by_status: Record; + by_account_type: Record; + by_role: Record; + }; +} + +interface ListParams { + limit?: number; + offset?: number; + accountType?: string; + role?: string; + search?: string; +} + +export function useEntityEnrichment(params: ListParams) { + const qs = new URLSearchParams(); + qs.set("limit", String(params.limit ?? 25)); + qs.set("offset", String(params.offset ?? 0)); + if (params.accountType) qs.set("account_type", params.accountType); + if (params.role) qs.set("entity_ig_role", params.role); + if (params.search) qs.set("search", params.search); + + return useQuery({ + queryKey: ["admin", "entity-enrichment", Object.fromEntries(qs)], + queryFn: ({ signal }) => + adminFetch(`/api/admin/data-pipeline/instagram-accounts?${qs}`, { + signal, + }), + staleTime: 5_000, + refetchInterval: (query) => + query.state.data?.settings.running ? 5_000 : 30_000, + }); +} + +export function useUpdateEntityEnrichmentSettings() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: (body: Partial) => + adminFetch("/api/admin/data-pipeline/instagram-accounts/settings", { + method: "PATCH", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }), + onSuccess: () => + qc.invalidateQueries({ queryKey: ["admin", "entity-enrichment"] }), + }); +} diff --git a/packages/web/lib/components/admin/AdminSidebar.tsx b/packages/web/lib/components/admin/AdminSidebar.tsx index 5f0dc6f2..655f77e1 100644 --- a/packages/web/lib/components/admin/AdminSidebar.tsx +++ b/packages/web/lib/components/admin/AdminSidebar.tsx @@ -18,6 +18,7 @@ import { Link2, DollarSign, ShieldCheck, + Database, } from "lucide-react"; import { cn } from "@/lib/utils"; import { useAuthStore } from "@/lib/stores/authStore"; @@ -56,6 +57,11 @@ const SIDEBAR_ENTRIES: SidebarEntry[] = [ { href: "/admin/raw-post-sources", label: "Sources", icon: Inbox }, { href: "/admin/raw-posts", label: "Review Queue", icon: CheckCircle }, { href: "/admin/solutions", label: "Solutions URLs", icon: Link2 }, + { + href: "/admin/data-pipeline/instagram-accounts", + label: "Instagram Accounts", + icon: Database, + }, ], }, { diff --git a/supabase-assets/migrations/20260518153700_entity_enrichment_asset_state.sql b/supabase-assets/migrations/20260518153700_entity_enrichment_asset_state.sql new file mode 100644 index 00000000..660e4603 --- /dev/null +++ b/supabase-assets/migrations/20260518153700_entity_enrichment_asset_state.sql @@ -0,0 +1,70 @@ +-- Entity enrichment pipeline runtime state (#495). +-- +-- Asset DB owns pipeline scheduler state and Gemini usage accounting. Operation +-- DB owns the reviewed instagram_accounts/artists/brands/groups entity state. + +BEGIN; + +ALTER TABLE public.pipeline_settings + ADD COLUMN IF NOT EXISTS metadata jsonb NOT NULL DEFAULT '{}'::jsonb; + +INSERT INTO public.pipeline_settings ( + platform, + processing_enabled, + processing_cycle_seconds, + processing_batch_size, + processing_recheck_vision, + metadata +) +VALUES ( + 'entity_enrichment', + false, + 3600, + 10, + false, + jsonb_build_object( + 'entity_enrichment', + jsonb_build_object( + 'daily_grounding_cap', 1500, + 'auto_promote_confidence_min', 0.9 + ) + ) +) +ON CONFLICT (platform) DO UPDATE + SET metadata = COALESCE(public.pipeline_settings.metadata, '{}'::jsonb) + || jsonb_build_object( + 'entity_enrichment', + COALESCE(public.pipeline_settings.metadata->'entity_enrichment', '{}'::jsonb) + || jsonb_build_object( + 'daily_grounding_cap', + COALESCE( + public.pipeline_settings.metadata #> '{entity_enrichment,daily_grounding_cap}', + '1500'::jsonb + ), + 'auto_promote_confidence_min', + COALESCE( + public.pipeline_settings.metadata #> '{entity_enrichment,auto_promote_confidence_min}', + '0.9'::jsonb + ) + ) + ), + updated_at = now(); + +ALTER TABLE public.gemini_usage_events + DROP CONSTRAINT IF EXISTS gemini_usage_pipeline_chk; + +ALTER TABLE public.gemini_usage_events + ADD CONSTRAINT gemini_usage_pipeline_chk CHECK ( + pipeline IN ( + 'raw_post', + 'post_editorial', + 'editorial_article', + 'entity_enrichment', + 'ad_hoc' + ) + ); + +COMMENT ON COLUMN public.pipeline_settings.metadata IS + 'Per-platform extension config. entity_enrichment stores grounding cap and auto-promote threshold here.'; + +COMMIT; diff --git a/supabase/migrations/20260518152600_entity_enrichment_pipeline.sql b/supabase/migrations/20260518152600_entity_enrichment_pipeline.sql new file mode 100644 index 00000000..727cb4d3 --- /dev/null +++ b/supabase/migrations/20260518152600_entity_enrichment_pipeline.sql @@ -0,0 +1,74 @@ +-- Entity enrichment pipeline (#495) +-- +-- Operation DB owns reviewed Instagram account/entity state. Asset DB owns +-- scheduler runtime state and Gemini usage accounting. + +BEGIN; + +ALTER TABLE public.instagram_accounts + ADD COLUMN IF NOT EXISTS account_type text NOT NULL DEFAULT 'unknown', + ADD COLUMN IF NOT EXISTS entity_ig_role text NOT NULL DEFAULT 'unknown'; + +ALTER TABLE public.instagram_accounts + DROP COLUMN IF EXISTS gemini_reviewed_at, + DROP COLUMN IF EXISTS gemini_review_model, + DROP COLUMN IF EXISTS gemini_review_confidence, + DROP COLUMN IF EXISTS gemini_review_reason, + DROP COLUMN IF EXISTS gemini_review_payload, + DROP COLUMN IF EXISTS gemini_review_status, + DROP COLUMN IF EXISTS profile_backfill_status, + DROP COLUMN IF EXISTS profile_backfilled_at, + DROP COLUMN IF EXISTS profile_backfill_error; + +ALTER TABLE public.brands + ADD COLUMN IF NOT EXISTS country_of_origin text NOT NULL DEFAULT 'NA'; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = 'instagram_accounts_account_type_check' + AND conrelid = 'public.instagram_accounts'::regclass + ) THEN + ALTER TABLE public.instagram_accounts + ADD CONSTRAINT instagram_accounts_account_type_check + CHECK (account_type IN ('artist', 'brand', 'group', 'other', 'unknown')); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = 'instagram_accounts_entity_ig_role_check' + AND conrelid = 'public.instagram_accounts'::regclass + ) THEN + ALTER TABLE public.instagram_accounts + ADD CONSTRAINT instagram_accounts_entity_ig_role_check + CHECK (entity_ig_role IN ('primary', 'secondary', 'regional', 'unknown')); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = 'instagram_accounts_single_entity_link_check' + AND conrelid = 'public.instagram_accounts'::regclass + ) THEN + ALTER TABLE public.instagram_accounts + ADD CONSTRAINT instagram_accounts_single_entity_link_check + CHECK (num_nonnulls(artist_id, brand_id, group_id) <= 1); + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS idx_instagram_accounts_account_type_role + ON public.instagram_accounts (account_type, entity_ig_role); + +DROP TABLE IF EXISTS public.gemini_grounding_usage_daily; +DROP TABLE IF EXISTS public.entity_enrichment_settings; + +COMMENT ON COLUMN public.instagram_accounts.account_type IS + 'Reviewed account classification: artist, brand, group, other, or unknown.'; + +COMMENT ON COLUMN public.instagram_accounts.entity_ig_role IS + 'Instagram role for the linked entity: primary, secondary, regional, or unknown.'; + +COMMENT ON COLUMN public.brands.country_of_origin IS + 'Country where the brand was founded; NA if unknown.'; + +COMMIT; From bd0faad8477f9ea6ccf8bd57b7d68bd4974c652e Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 18 May 2026 21:56:56 +0900 Subject: [PATCH 02/21] feat(admin): improve group member management UI (#546) Co-authored-by: Cursor --- docs/agent/web-routes-and-features.md | 2 +- .../group-members/__tests__/page.test.tsx | 69 +- .../app/admin/entities/group-members/page.tsx | 672 ++++++++++++++++-- .../api/admin/entities/group-members/route.ts | 350 ++++++++- packages/web/lib/api/admin/entities.ts | 125 +++- 5 files changed, 1137 insertions(+), 81 deletions(-) diff --git a/docs/agent/web-routes-and-features.md b/docs/agent/web-routes-and-features.md index 7d091a03..f7a59539 100644 --- a/docs/agent/web-routes-and-features.md +++ b/docs/agent/web-routes-and-features.md @@ -40,7 +40,7 @@ App Router 기준 (`packages/web/app/`). 작업 시 이 표와 실제 `app/` 트 | `/admin/seed/post-spots` | 시드 포스트 스팟 | | `/admin/entities/artists` | 아티스트 관리 (CRUD, paginated, searchable) | | `/admin/entities/brands` | 브랜드 관리 (CRUD) | -| `/admin/entities/group-members` | 그룹 멤버 관리 | +| `/admin/entities/group-members` | 그룹 멤버 관리 — group별 artist membership 조회·추가·수정·삭제 | | `/admin/raw-post-sources` | 수집 소스 등록/관리 (Pinterest 등 — #327) | | `/admin/raw-posts` | **검증 큐** (#333) — assets 의 raw_posts 를 status 탭(COMPLETED/IN_PROGRESS/ERROR/VERIFIED) 으로 필터링, "검증" 버튼으로 prod posts 반영 | | `/admin/data-pipeline/instagram-accounts` | Instagram tagged account enrichment queue, Gemini grounding quota, scheduler controls, manual entity review. See [`docs/database/entity-enrichment-pipeline.md`](../database/entity-enrichment-pipeline.md). | diff --git a/packages/web/app/admin/entities/group-members/__tests__/page.test.tsx b/packages/web/app/admin/entities/group-members/__tests__/page.test.tsx index 3e9a9e67..1205e540 100644 --- a/packages/web/app/admin/entities/group-members/__tests__/page.test.tsx +++ b/packages/web/app/admin/entities/group-members/__tests__/page.test.tsx @@ -1,10 +1,7 @@ /** * @vitest-environment jsdom * - * GroupMembersPage empty / loading / error state tests. - * - * Group members has no search/status filter, so isEmpty is simply: - * not loading AND not error AND no rows. + * GroupMembersPage group list / member table state tests. */ import React from "react"; import { describe, test, expect, vi, beforeEach } from "vitest"; @@ -19,44 +16,87 @@ vi.mock("next/navigation", () => ({ // --- Mock the data hooks --- const useGroupMemberListMock = vi.fn(); +const mutateMock = vi.fn(); vi.mock("@/lib/api/admin/entities", () => ({ useGroupMemberList: (...args: unknown[]) => useGroupMemberListMock(...args), + useGroupMemberArtistSearch: () => ({ data: { data: [] }, isLoading: false }), + useCreateGroupMember: () => ({ mutate: mutateMock, isPending: false }), + useUpdateGroupMember: () => ({ mutate: mutateMock, isPending: false }), + useDeleteGroupMember: () => ({ mutate: mutateMock, isPending: false }), })); import GroupMembersPage from "../page"; beforeEach(() => { useGroupMemberListMock.mockReset(); + mutateMock.mockReset(); }); describe("GroupMembersPage — empty state", () => { test("renders AdminEmptyState when no data", () => { useGroupMemberListMock.mockReturnValue({ - data: { data: [], pagination: undefined }, + data: { + data: [], + groups: [], + selected_group: null, + pagination: undefined, + }, isLoading: false, isError: false, }); render(); - expect(screen.getByText("No group members")).toBeInTheDocument(); + expect(screen.getByText("No groups")).toBeInTheDocument(); expect( screen.getByText( - "Artist–group relationships will appear here once they are seeded." + "Groups created by entity enrichment or manual admin actions will appear here." ) ).toBeInTheDocument(); }); - test("renders table when data has rows", () => { + test("renders selected group and member rows", () => { useGroupMemberListMock.mockReturnValue({ data: { + groups: [ + { + id: "group-uuid-5678", + name_en: "BLACKPINK", + name_ko: "블랙핑크", + profile_image_url: null, + primary_instagram_account_id: null, + primary_instagram_account: null, + member_count: 1, + total_member_count: 1, + metadata: null, + }, + ], + selected_group: { + id: "group-uuid-5678", + name_en: "BLACKPINK", + name_ko: "블랙핑크", + profile_image_url: null, + primary_instagram_account_id: null, + primary_instagram_account: null, + member_count: 1, + total_member_count: 1, + metadata: null, + }, data: [ { artist_id: "artist-uuid-1234", group_id: "group-uuid-5678", is_active: true, metadata: null, + artist: { + id: "artist-uuid-1234", + name_en: "Jennie", + name_ko: "제니", + profile_image_url: null, + primary_instagram_account_id: null, + primary_instagram_account: { username: "jennierubyjane" }, + }, }, ], pagination: { @@ -72,9 +112,10 @@ describe("GroupMembersPage — empty state", () => { render(); - expect(screen.queryByText("No group members")).not.toBeInTheDocument(); - // row data is rendered (truncated artist_id prefix) - expect(screen.getByText("artist-u…")).toBeInTheDocument(); + expect(screen.queryByText("No groups")).not.toBeInTheDocument(); + expect(screen.getAllByText("BLACKPINK").length).toBeGreaterThan(0); + expect(screen.getByText("Jennie")).toBeInTheDocument(); + expect(screen.getByText("@jennierubyjane")).toBeInTheDocument(); }); test("renders error state when fetch fails", () => { @@ -87,10 +128,8 @@ describe("GroupMembersPage — empty state", () => { render(); expect( - screen.getByText( - "Failed to load group members. Please try refreshing the page." - ) + screen.getByText("Failed to load groups. Please try refreshing the page.") ).toBeInTheDocument(); - expect(screen.queryByText("No group members")).not.toBeInTheDocument(); + expect(screen.queryByText("No groups")).not.toBeInTheDocument(); }); }); diff --git a/packages/web/app/admin/entities/group-members/page.tsx b/packages/web/app/admin/entities/group-members/page.tsx index 41e4a67b..493dbe26 100644 --- a/packages/web/app/admin/entities/group-members/page.tsx +++ b/packages/web/app/admin/entities/group-members/page.tsx @@ -1,16 +1,351 @@ "use client"; -import { useCallback, Suspense } from "react"; +import { useCallback, useEffect, useRef, useState, Suspense } from "react"; import { useSearchParams, useRouter } from "next/navigation"; -import { Users } from "lucide-react"; +import { + Check, + Pencil, + Plus, + Search, + Trash2, + UserPlus, + Users, + X, +} from "lucide-react"; import { AdminDataTable, type Column, + AdminImagePreview, AdminStatusBadge, AdminPagination, AdminEmptyState, } from "@/lib/components/admin/common"; -import { useGroupMemberList, type GroupMember } from "@/lib/api/admin/entities"; +import { + useCreateGroupMember, + useDeleteGroupMember, + useGroupMemberArtistSearch, + useGroupMemberList, + useUpdateGroupMember, + type GroupMember, + type GroupMemberArtist, + type GroupSummary, +} from "@/lib/api/admin/entities"; + +function displayName(entity: { + name_en: string | null; + name_ko: string | null; +}) { + return entity.name_en || entity.name_ko || "Untitled"; +} + +function secondaryName(entity: { + name_en: string | null; + name_ko: string | null; +}) { + return entity.name_en && entity.name_ko ? entity.name_ko : null; +} + +function metadataToText(metadata: Record | null | undefined) { + if (!metadata || Object.keys(metadata).length === 0) return ""; + return JSON.stringify(metadata, null, 2); +} + +function parseMetadata(text: string) { + const trimmed = text.trim(); + if (!trimmed) return null; + const parsed = JSON.parse(trimmed); + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + throw new Error("Metadata must be a JSON object"); + } + return parsed as Record; +} + +interface AddMemberPanelProps { + group: GroupSummary | null; + onCancel: () => void; +} + +function AddMemberPanel({ group, onCancel }: AddMemberPanelProps) { + const [artistSearch, setArtistSearch] = useState(""); + const [selectedArtist, setSelectedArtist] = + useState(null); + const [isActive, setIsActive] = useState(true); + const [metadataText, setMetadataText] = useState(""); + const [error, setError] = useState(null); + const createMember = useCreateGroupMember(); + const artistOptions = useGroupMemberArtistSearch( + artistSearch, + group?.id ?? "" + ); + + const handleSubmit = () => { + if (!group || !selectedArtist) return; + try { + createMember.mutate( + { + group_id: group.id, + artist_id: selectedArtist.id, + is_active: isActive, + metadata: parseMetadata(metadataText), + }, + { + onSuccess: () => { + setArtistSearch(""); + setSelectedArtist(null); + setMetadataText(""); + setIsActive(true); + setError(null); + onCancel(); + }, + onError: (err) => + setError( + err instanceof Error ? err.message : "Failed to add member" + ), + } + ); + } catch (err) { + setError(err instanceof Error ? err.message : "Invalid metadata JSON"); + } + }; + + return ( +
+
+
+

+ Add artist to {group ? displayName(group) : "group"} +

+

+ Search by artist name or Instagram username. +

+
+ +
+ +
+ +
+ + { + setArtistSearch(event.target.value); + setSelectedArtist(null); + }} + className="w-full rounded-lg bg-gray-700 border border-border pl-9 pr-3 py-2 text-sm text-gray-100 focus:outline-none focus:border-primary" + placeholder="Search artists or @username" + /> +
+
+ + {artistSearch && !selectedArtist && ( +
+ {artistOptions.isLoading ? ( +

+ Searching… +

+ ) : artistOptions.data?.data.length ? ( + artistOptions.data.data.map((artist) => ( + + )) + ) : ( +

+ No matching artists found. +

+ )} +
+ )} + + {selectedArtist && ( +
+

+ {displayName(selectedArtist)} +

+

+ {selectedArtist.primary_instagram_account?.username + ? `@${selectedArtist.primary_instagram_account.username}` + : selectedArtist.id} +

+
+ )} + + + +
+ +