Skip to content

feat: make classify a standalone durably job#209

Merged
coji merged 5 commits intomainfrom
feat/durably-classify-job
Mar 18, 2026
Merged

feat: make classify a standalone durably job#209
coji merged 5 commits intomainfrom
feat/durably-classify-job

Conversation

@coji
Copy link
Owner

@coji coji commented Mar 17, 2026

Summary

  • Decouple classify from the shared-steps pipeline (crawl/recalculate)
  • After crawl completes, classify is triggered as a separate fire-and-forget durably job
  • All classify runs share concurrencyKey: classify:${orgId}, preventing duplicate concurrent LLM API calls
  • Remove --classify flag from recalculate CLI (use standalone classify command instead)
  • Remove classify checkbox from Data Management UI
  • Add error handling so classify trigger failures don't fail the parent job

Pipeline change:

Before: crawl → [fetch → analyze → upsert → classify → export → finalize]
After:  crawl → [fetch → analyze → upsert → export → finalize] → trigger classify job

Test plan

  • pnpm validate passes
  • pnpm tsx batch/cli.ts classify <org-id> — standalone classify works
  • pnpm dev → Data Management → Recalculate — no classify checkbox, upsert/export work
  • crawl job completion triggers classify job (check logs)
  • pnpm tsx batch/cli.ts recalculate <org-id> — no --classify flag

🤖 Generated with Claude Code

Summary by CodeRabbit

リリースノート

  • 新機能

    • エクスポート完了後、分類処理が自動的にトリガーされるようになりました。
  • 変更

    • 手動再計算フローから分類オプションを削除しました。分類は自動実行されます。
    • ワークフローシーケンスを更新し、分類がエクスポート後に非同期で実行されるようになりました。

- Add classify durably job with force/limit options and progress tracking
- Register classify job in durably instance
- Update CLI command to use triggerAndWait with progress/log callbacks
- Add return value to classifyPullRequests for job output

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Mar 17, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bc810218-d9ab-4b55-a8ae-9e4410e83e52

📥 Commits

Reviewing files that changed from the base of the PR and between 3f887a2 and 031ac4d.

📒 Files selected for processing (6)
  • app/routes/$orgSlug/settings/data-management/index.tsx
  • app/services/jobs/classify.server.ts
  • app/services/jobs/recalculate.server.ts
  • app/services/jobs/shared-steps.server.ts
  • batch/cli.ts
  • batch/commands/recalculate.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • app/services/jobs/recalculate.server.ts
  • app/services/jobs/classify.server.ts

📝 Walkthrough

Walkthrough

分類ステップを同期パイプラインから切り離し、Durably ジョブ(classifyJob)として新規定義。クロール/再計算の実行後に非同期で分類ジョブをトリガーするよう、サーバー側ジョブ、Durably登録、バッチコマンド、UI を更新しました。

Changes

Cohort / File(s) Summary
新しい分類ジョブ定義
app/services/jobs/classify.server.ts
Durably ジョブ classifyJob を追加。入力に organizationId, force, limit を取り、{ classifiedCount: number } を返す。
Durably 登録
app/services/durably.server.ts
Durably の jobs マップに classify ジョブを登録。
メインジョブパイプライン更新
app/services/jobs/crawl.server.ts, app/services/jobs/recalculate.server.ts
analyze→upsert→export→finalize にパイプラインを変更。同期 classify を削除し、完了後に非同期トリガーを起動する呼び出しを追加。
共有ステップ & トリガー関数
app/services/jobs/shared-steps.server.ts
JobSteps から classify を削除。分類をファイア・アンド・フォーゲットで起動する triggerClassifyStep を追加(同時実行制御とエラーハンドリング付き)。
設定UI / 再計算
app/routes/$orgSlug/settings/data-management/index.tsx, batch/commands/recalculate.ts, batch/cli.ts
再計算の UI と CLI フラグから classify を削除。送信ペイロードとヘルプ文言を更新。
バッチ分類ワークフロー更新
batch/commands/classify.ts, batch/usecases/classify-pull-requests.ts
classify 実行を Durably ジョブ経由に変更。classifyPullRequests の戻り値を { classifiedCount: number } に変更し、各分岐で明示的に数値を返すように。

Sequence Diagram(s)

sequenceDiagram
    participant Crawler as Crawl Job
    participant Shared as shared-steps
    participant Durably as Durably
    participant Classify as classifyJob
    participant Usecase as classifyPullRequests

    Crawler->>Shared: analyze → upsert → export → finalize
    Shared->>Durably: triggerClassifyStep(orgId) (fire-and-forget)
    Durably->>Classify: start classifyJob(orgId, force?, limit?)
    Classify->>Usecase: classifyPullRequests(orgId, {force, limit})
    Usecase-->>Classify: { classifiedCount }
    Classify-->>Durably: return { classifiedCount }
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 エクスポート終わりでピョンと一跳び
背後で走るは classify の呪文
非同期でコトコト、数を数え
ジョブは流れを守りつつ
みんなで静かにハッピーだよ ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: converting classify from an integrated pipeline step into a standalone durably job.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/durably-classify-job
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Remove classify step from the shared analyze→upsert→export→finalize
pipeline and trigger it as a separate durably job after crawl/recalculate
completion. This unifies the classify execution path so all classify runs
share concurrencyKey `classify:${orgId}`, preventing duplicate LLM API
calls when crawl and standalone classify run concurrently.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/routes/`$orgSlug/settings/data-management/index.tsx:
- Line 70: Add a regression test for the recalculate action that verifies the
classify step is not triggered when steps.classify is false: invoke the
recalculate action (the action handler named "recalculate" in the
data-management route) with payload containing organizationId: org.id and steps:
{ ...steps, classify: false }, then assert that the classify job/side-effect is
not enqueued/invoked and other expected effects of recalculate still occur;
ensure the test specifically checks the hard-coded classify: false behavior so
future changes to the recalculate logic will be caught.

In `@app/services/jobs/classify.server.ts`:
- Around line 6-29: Add unit/integration tests for the new classify job: create
tests that import classifyJob (from classify.server.ts) and exercise its run
flow by mocking classifyPullRequests to return a known classifiedCount and
asserting the job returns that value; include cases that pass force=true/false
and different limit values to ensure inputs are forwarded to
classifyPullRequests, and a test that simulates step.run/step.progress behavior
(mock the step runner or use the job runner helper) to validate the execution
path; ensure tests assert the output schema (classifiedCount is a number) and
that classifyPullRequests was called with the expected args.
- Around line 19-27: After classifyPullRequests completes inside the
step.run('classify') block, call clearAllCache() to invalidate caches so the new
classification is visible; locate the classify flow around step.run('classify')
and add a clearAllCache() invocation (importing clearAllCache from
app/services/cache.server.ts) immediately after the result is returned/after
finalize so that the returned classifiedCount is written and caches are cleared.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8c292acb-9556-4a14-a8dc-8ff5388640d9

📥 Commits

Reviewing files that changed from the base of the PR and between 81e0941 and 3f887a2.

📒 Files selected for processing (9)
  • CLAUDE.md
  • app/routes/$orgSlug/settings/data-management/index.tsx
  • app/services/durably.server.ts
  • app/services/jobs/classify.server.ts
  • app/services/jobs/crawl.server.ts
  • app/services/jobs/recalculate.server.ts
  • app/services/jobs/shared-steps.server.ts
  • batch/commands/classify.ts
  • batch/usecases/classify-pull-requests.ts

Comment on lines +6 to +29
export const classifyJob = defineJob({
name: 'classify',
input: z.object({
organizationId: z.string(),
force: z.boolean().default(false),
limit: z.number().optional(),
}),
output: z.object({
classifiedCount: z.number(),
}),
run: async (step, input) => {
const orgId = input.organizationId as OrganizationId

const result = await step.run('classify', async () => {
step.progress(0, 0, 'Classifying PRs...')
return await classifyPullRequests(orgId, {
force: input.force,
limit: input.limit,
})
})

return { classifiedCount: result.classifiedCount }
},
})
Copy link

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

新規 classify ジョブの変更に対するテスト追加が必要です。

force/limit 入力、classifiedCount 出力、実行フローの最低限のユニット/統合テストを追加してください。

As per coding guidelines **/*.{ts,tsx}: Ensure added or modified logic includes corresponding unit tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/jobs/classify.server.ts` around lines 6 - 29, Add
unit/integration tests for the new classify job: create tests that import
classifyJob (from classify.server.ts) and exercise its run flow by mocking
classifyPullRequests to return a known classifiedCount and asserting the job
returns that value; include cases that pass force=true/false and different limit
values to ensure inputs are forwarded to classifyPullRequests, and a test that
simulates step.run/step.progress behavior (mock the step runner or use the job
runner helper) to validate the execution path; ensure tests assert the output
schema (classifiedCount is a number) and that classifyPullRequests was called
with the expected args.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

スキップします。classify ジョブは classifyPullRequests を durably step でラップしているだけで、ロジック自体は classifyPullRequests 側にあります。durably の step.run/step.progress のモックは費用対効果が低いと判断しました。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coji、了解しました。classifyJob はオーケストレーション用のグルーコードであり、ビジネスロジックは classifyPullRequests 側に集約されているというご判断は理にかなっています。テストのスキップを承認します。


✏️ Learnings added
Learnt from: coji
Repo: coji/upflow PR: 209
File: app/services/jobs/classify.server.ts:6-30
Timestamp: 2026-03-18T00:26:56.831Z
Learning: In coji/upflow, thin durably job wrappers (e.g., classifyJob in app/services/jobs/classify.server.ts) that simply delegate to a usecase function (e.g., classifyPullRequests) via step.run/step.progress do not require dedicated unit tests. The actual business logic is tested at the usecase layer. Mocking durably's step.run/step.progress for such wrappers is considered low cost-benefit by the project owner.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
Repo: coji/upflow PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-17T23:33:34.917Z
Learning: Applies to **/*.{ts,tsx} : Ensure added or modified logic includes corresponding unit tests

Learnt from: coji
Repo: coji/upflow PR: 141
File: CLAUDE.md:215-215
Timestamp: 2026-03-09T08:25:06.885Z
Learning: In coji/upflow, for any UPDATE or DELETE on org-scoped tables, always include WHERE organizationId = ? with a server-derived value, and obtain per-org tenant data using getTenantDb(organizationId). There is no automatic Kysely plugin for org scoping; apply this pattern project-wide to ensure proper scoping.

Learnt from: coji
Repo: coji/upflow PR: 196
File: app/services/jobs/recalculate.server.ts:127-132
Timestamp: 2026-03-17T06:05:49.427Z
Learning: Use the established project-wide cache invalidation pattern: call clearAllCache() from ~/app/services/cache.server.ts when invalidating cache. This should be used in recalculate and crawl jobs (and any similar cache invalidation points in app/services/jobs). The cache is small and org-scoped entries, so full cache wipes have limited blast radius. Note that org-scoped invalidation is a known future improvement and currently not implemented; prefer clearAllCache() for consistency and traceability now.

Learnt from: coji
Repo: coji/upflow PR: 198
File: app/services/jobs/crawl.server.ts:159-165
Timestamp: 2026-03-17T07:58:29.114Z
Learning: Guideline: Do not apply the global 'organizationId' filter to updates/deletes when using the per-organization tenant database accessed via getTenantDb(). In coji/upflow, getTenantDb(organizationId) uses a separate SQLite database file per organization, and tables there (e.g., organizationSettings, repositories) are implicitly scoped to that organization. The mandatory WHERE organizationId = ? rule only applies to the shared/global database accessed via the top-level db instance (app/services/db.server.ts). During reviews, ensure queries executed through getTenantDb() do not include or require organizationId in WHERE clauses for UPDATE/DELETE, unless referencing the global/db.server.ts path. This should be enforced by code patterns or CI checks that distinguish between tenant DB vs global DB usage.

Comment on lines +19 to +27
const result = await step.run('classify', async () => {
step.progress(0, 0, 'Classifying PRs...')
return await classifyPullRequests(orgId, {
force: input.force,
limit: input.limit,
})
})

return { classifiedCount: result.classifiedCount }
Copy link

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

分類結果を書き込んだ後のキャッシュ無効化が抜けています。

classifyfinalize 後に別ジョブ実行される構成になったため、ここで無効化しないと分類結果がキャッシュに反映されず古い表示が残る可能性があります。

💡 修正案
 import { defineJob } from '@coji/durably'
 import { z } from 'zod'
+import { clearAllCache } from '~/app/services/cache.server'
 import type { OrganizationId } from '~/app/types/organization'
 import { classifyPullRequests } from '~/batch/usecases/classify-pull-requests'
@@
     const result = await step.run('classify', async () => {
       step.progress(0, 0, 'Classifying PRs...')
       return await classifyPullRequests(orgId, {
         force: input.force,
         limit: input.limit,
       })
     })
+
+    clearAllCache()
 
     return { classifiedCount: result.classifiedCount }
   },
 })

Based on learnings: Use the established project-wide cache invalidation pattern: call clearAllCache() from ~/app/services/cache.server.ts when invalidating cache.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const result = await step.run('classify', async () => {
step.progress(0, 0, 'Classifying PRs...')
return await classifyPullRequests(orgId, {
force: input.force,
limit: input.limit,
})
})
return { classifiedCount: result.classifiedCount }
import { defineJob } from '@coji/durably'
import { z } from 'zod'
import { clearAllCache } from '~/app/services/cache.server'
import type { OrganizationId } from '~/app/types/organization'
import { classifyPullRequests } from '~/batch/usecases/classify-pull-requests'
// ... other code ...
const result = await step.run('classify', async () => {
step.progress(0, 0, 'Classifying PRs...')
return await classifyPullRequests(orgId, {
force: input.force,
limit: input.limit,
})
})
clearAllCache()
return { classifiedCount: result.classifiedCount }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/jobs/classify.server.ts` around lines 19 - 27, After
classifyPullRequests completes inside the step.run('classify') block, call
clearAllCache() to invalidate caches so the new classification is visible;
locate the classify flow around step.run('classify') and add a clearAllCache()
invocation (importing clearAllCache from app/services/cache.server.ts)
immediately after the result is returned/after finalize so that the returned
classifiedCount is written and caches are cleared.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

検討の結果、明示的なキャッシュクリアは不要と判断しました (031ac4d)。classify は crawl 完了後に非同期で実行されるため、ユーザーが即座に結果を確認するケースは少なく、インメモリキャッシュの TTL (5分) で自然失効に任せます。コメントも追加済みです。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coji、ご判断ありがとうございます。非同期実行という性質上、TTL による自然失効で十分という考え方は理にかなっています。了解しました。


✏️ Learnings added
Learnt from: coji
Repo: coji/upflow PR: 209
File: app/services/jobs/classify.server.ts:19-28
Timestamp: 2026-03-18T00:26:59.440Z
Learning: In coji/upflow, the classify job (app/services/jobs/classify.server.ts) intentionally does NOT call clearAllCache() after classifyPullRequests completes. Since classify runs asynchronously after crawl finishes, users are unlikely to check results immediately, and the in-memory cache TTL (5 minutes) is sufficient for natural expiration. This is a deliberate exception to the general project-wide pattern of calling clearAllCache() after data-modifying jobs.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: coji
Repo: coji/upflow PR: 196
File: app/services/jobs/recalculate.server.ts:127-132
Timestamp: 2026-03-17T06:05:49.427Z
Learning: Use the established project-wide cache invalidation pattern: call clearAllCache() from ~/app/services/cache.server.ts when invalidating cache. This should be used in recalculate and crawl jobs (and any similar cache invalidation points in app/services/jobs). The cache is small and org-scoped entries, so full cache wipes have limited blast radius. Note that org-scoped invalidation is a known future improvement and currently not implemented; prefer clearAllCache() for consistency and traceability now.

Learnt from: coji
Repo: coji/upflow PR: 141
File: CLAUDE.md:215-215
Timestamp: 2026-03-09T08:25:06.885Z
Learning: In coji/upflow, for any UPDATE or DELETE on org-scoped tables, always include WHERE organizationId = ? with a server-derived value, and obtain per-org tenant data using getTenantDb(organizationId). There is no automatic Kysely plugin for org scoping; apply this pattern project-wide to ensure proper scoping.

Learnt from: coji
Repo: coji/upflow PR: 198
File: app/services/jobs/crawl.server.ts:159-165
Timestamp: 2026-03-17T07:58:29.114Z
Learning: Guideline: Do not apply the global 'organizationId' filter to updates/deletes when using the per-organization tenant database accessed via getTenantDb(). In coji/upflow, getTenantDb(organizationId) uses a separate SQLite database file per organization, and tables there (e.g., organizationSettings, repositories) are implicitly scoped to that organization. The mandatory WHERE organizationId = ? rule only applies to the shared/global database accessed via the top-level db instance (app/services/db.server.ts). During reviews, ensure queries executed through getTenantDb() do not include or require organizationId in WHERE clauses for UPDATE/DELETE, unless referencing the global/db.server.ts path. This should be enforced by code patterns or CI checks that distinguish between tenant DB vs global DB usage.

coji and others added 3 commits March 18, 2026 09:16
… handling to trigger

- Remove classify field from recalculate job input schema entirely
- Remove --classify flag from recalculate CLI command
- Add try/catch in triggerClassifyStep so classify trigger failures
  don't fail the parent crawl job
- Update crawl CLI description to reflect new pipeline

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without this, classification results wouldn't be reflected in the UI
until the next crawl/recalculate finalize step cleared the cache.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Classify runs asynchronously after crawl, so users aren't watching for
immediate results. The in-memory cache has a 5-minute TTL and will
naturally expire before users typically view updated data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coji coji merged commit ab81f5f into main Mar 18, 2026
6 checks passed
@coji coji deleted the feat/durably-classify-job branch March 18, 2026 00:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant