Skip to content

feat: Add admin reindex API to rebuild ES indices from database#905

Open
Li-Xingyu wants to merge 30 commits into
conductor-oss:mainfrom
Li-Xingyu:feature/admin-reindex
Open

feat: Add admin reindex API to rebuild ES indices from database#905
Li-Xingyu wants to merge 30 commits into
conductor-oss:mainfrom
Li-Xingyu:feature/admin-reindex

Conversation

@Li-Xingyu
Copy link
Copy Markdown

@Li-Xingyu Li-Xingyu commented Mar 22, 2026

fix #875

Pull Request type

  • Bugfix
  • Feature
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • WHOSUSING.md
  • Other (please describe):

Changes in this PR

Add admin reindex API to rebuild Elasticsearch indices from the primary database (Postgres/MySQL) when ES data is lost.

  • POST /api/admin/reindex — triggers an async background reindex job (single daemon thread, non-blocking)
  • GET /api/admin/reindex/status — returns progress (state, processed, errors, total)
  • ExecutionDAO.getAllWorkflowIds() — new paginated query to iterate all workflow IDs from Postgres
  • Thread-safe progress tracking via AtomicInteger / AtomicReference; idempotent (re-POST resets and restarts)
  • Fix: use workflow_id instead of id column in getAllWorkflowIds query
  • Added reindex documentation (Chinese & English)

Alternatives considered

  • Synchronous endpoint: rejected because reindexing thousands of workflows would block the HTTP thread and likely time out.
  • Queue-based approach (e.g. push each workflow ID to a message queue for async indexing): more complex infrastructure dependency, unnecessary for a one-off admin operation.
  • Bulk ES API: considered batching ES writes via _bulk API, but using the existing IndexDAO.indexWorkflow() / indexTask() keeps the logic simple and consistent with normal indexing paths.

Li-Xingyu and others added 6 commits March 17, 2026 01:45
… database

When Elasticsearch data is lost but the primary database (Postgres) is intact,
there was no way to rebuild the search indices. This adds a reindex endpoint
that iterates all workflows and tasks from the database and re-indexes them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- POST /api/admin/reindex    - starts background job, returns immediately
- GET  /api/admin/reindex/status - returns state/processed/errors/total/message

Uses single-threaded ExecutorService + AtomicInteger counters to track
progress safely without blocking the HTTP thread.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
V6 migration dropped the id column from workflow table and changed
primary key to workflow_id. ORDER BY id caused column not found error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@nthmost-orkes
Copy link
Copy Markdown
Contributor

Two issues worth fixing before merge:

The pre-count will OOM on large databases. In doReindex(), the first thing it does is executionDAO.getAllWorkflowIds(0, Integer.MAX_VALUE).size() — loading every workflow ID into heap just to get a count. On a large deployment (the exact scenario this is designed for), that's an OOM before any reindexing happens. Either add a SELECT COUNT(*) FROM workflow query and expose it on the DAO interface, or just drop the total field from the status response and only report processed / errors.

Only Postgres works. getAllWorkflowIds is implemented in PostgresExecutionDAO but the default on the interface throws UnsupportedOperationException. MySQL, Redis, SQLite, and Cassandra users all get an unhandled exception when they POST to /reindex. MySQL at minimum should get an implementation since the workflow table schema is the same. For Redis/Cassandra, the endpoint should return a clear 501/unsupported error rather than a stack trace.

@nthmost-orkes
Copy link
Copy Markdown
Contributor

Also — could you file an issue for this and link it from the PR description? For changes larger than a few lines, having a tracked issue is really valuable in a fast-moving project like this. It helps other Conductor users see what's being considered, and helps contributors and maintainers prioritize work.

…ackend handling

- Replace getAllWorkflowIds(0, MAX_VALUE).size() with SELECT COUNT(*)
  via new ExecutionDAO.getWorkflowCount() to prevent OOM on large databases
- Add getAllWorkflowIds + getWorkflowCount for MySQL and SQLite
- Fail fast with clear UNSUPPORTED state for Redis/Cassandra instead of stack trace
- Use AtomicLong for progress counters to match long return type
@Li-Xingyu
Copy link
Copy Markdown
Author

Li-Xingyu commented Mar 25, 2026

Hi @nthmost-orkes
Thank you for the detailed feedback! I’ve addressed the issues you pointed out in the latest commits:

OOM Prevention: Replaced the memory-intensive getAllWorkflowIds(0, Integer.MAX_VALUE).size() with a dedicated SELECT COUNT(*) query via new ExecutionDAO.getWorkflowCount() method. This ensures we don't load the entire list of workflow IDs into the heap on large databases.

Multi-DB Support: Implemented getAllWorkflowIds and getWorkflowCount for Postgres, MySQL, and SQLite. For
unsupported storage backends (Redis, Cassandra), the API now returns a clear UNSUPPORTED state with a descriptive
message instead of throwing an unhandled exception.
Type Safety: Changed progress counters from AtomicInteger to AtomicLong to match the long return type of
getWorkflowCount() and avoid potential overflow on very large datasets.

Issue Tracking: I have created a dedicated issue for this feature and linked it in the PR description as requested.

Please let me know if there is anything else that needs adjustment. Looking forward to your review!

@nthmost-orkes
Copy link
Copy Markdown
Contributor

Thanks for the quick turnaround on the OOM + multi-DB fixes — this is much closer now. One real bug and a few smaller things:

MySQL query will fail at runtime. The earlier ORDER BY idORDER BY workflow_id fix was only applied to Postgres. The new MySQL implementation reintroduces the same bug:

"SELECT workflow_id FROM workflow ORDER BY id LIMIT ? OFFSET ?"

MySQL's V8__update_pk.sql drops id from the workflow table (FixPkIfNeeded('workflow', 'workflow_id')), so this will error with "column not found" on any migrated MySQL instance — exactly the deployments this feature is meant to help. One-character fix to match Postgres/SQLite.

No tests for ~480 new lines. At minimum I'd want coverage for the state transitions (IDLE→RUNNING, double-POST → ALREADY_RUNNING, COMPLETED→RUNNING reset), the UNSUPPORTED fallback, and the new pagination queries. Happy to merge without exhaustive coverage but something would be good.

OFFSET pagination won't scale to the target scenario. On a large workflow table, LIMIT ? OFFSET ? is O(n²) overall — the scenario this endpoint exists for (large deployment, lost ES) is where it hurts most. Keyset pagination (WHERE workflow_id > :cursor ORDER BY workflow_id LIMIT ?) is a pretty clean swap. Fine as a follow-up issue if you'd rather keep this PR focused.

Throttling isn't implemented. Issue #875 explicitly calls it out and without it this can starve live traffic on both the DB and ES. Even an optional inter-batch sleep would help. Also a follow-up candidate.

Minor: PR description mentions a docker/login-action v4→v3 downgrade that isn't actually in the diff — looks stale, worth trimming.

If you can fix the MySQL ORDER BY and add a couple of tests, I think this is ready to go. Keyset pagination + throttling I'd be happy to track as separate issues.

Li-Xingyu and others added 5 commits April 22, 2026 09:50
MySQLExecutionDAO.getAllWorkflowIds used ORDER BY id, but V8__update_pk.sql
drops the id column in favour of workflow_id as primary key. Any migrated
MySQL instance would fail at runtime with "column not found".

Also adds AdminServiceImplReindexTest covering:
- IDLE → RUNNING → COMPLETED state transition
- COMPLETED → RUNNING counter reset on re-run
- double-POST returns ALREADY_RUNNING
- unsupported backend returns UNSUPPORTED state
- pagination offset arguments are correct
OFFSET pagination is O(n²) overall — each page requires the DB to scan
and discard all preceding rows. On large workflow tables (the exact
scenario this endpoint targets) this degrades badly.

Keyset pagination (WHERE workflow_id > :cursor ORDER BY workflow_id LIMIT ?)
is O(n) overall because each page starts from a bookmark rather than
re-scanning from the beginning.

Changes:
- Add ExecutionDAO.getAllWorkflowIdsAfter(cursor, limit) interface method
- Implement in PostgresExecutionDAO, MySQLExecutionDAO, SqliteExecutionDAO
- Rewrite doReindex() to use cursor loop instead of offset loop
- Update tests: replace offset-based mocks with cursor-based mocks and
  add testKeysetCursorAdvancesCorrectly to verify cursor propagation
Without throttling the reindex job can saturate both the DB and ES,
starving live traffic. A simple inter-batch sleep is too coarse and
doesn't account for actual processing time.

Guava RateLimiter provides per-workflow granularity: it self-adjusts
so that if a batch takes longer than expected no extra wait is added,
and if processing is fast it blocks just enough to maintain the target
rate. The semantic is clear: rateLimitPerSecond=50 means at most 50
workflows per second enter ES, regardless of batch size.

Configuration:
  conductor.reindex.rateLimitPerSecond=50   # workflows/sec; 0 = unlimited (default)

Changes:
- Add @value("${conductor.reindex.rateLimitPerSecond:0}") field
- Create RateLimiter in startReindex() when rate > 0; null otherwise
- Call limiter.acquire() before each workflow in doReindex()
- Tests: verify limiter is null at rate=0, non-null at rate=50, and
  that the job completes successfully in both cases
@Li-Xingyu
Copy link
Copy Markdown
Author

Li-Xingyu commented Apr 22, 2026

@nthmost-orkes Thanks for the detailed review — all four points have been addressed in the latest commits:

MySQL ORDER BY fix — corrected ORDER BY idORDER BY workflow_id in MySQLExecutionDAO.getAllWorkflowIds. The same one-character fix that was already applied to Postgres and SQLite.

Unit tests — added AdminServiceImplReindexTest covering the state transitions (IDLE→RUNNING→COMPLETED, COMPLETED→RUNNING counter reset, double-POST→ALREADY_RUNNING), the UNSUPPORTED fallback, and pagination argument correctness.

Keyset pagination — replaced LIMIT ? OFFSET ? with WHERE workflow_id > :cursor ORDER BY workflow_id LIMIT ? across all three supported backends (Postgres, MySQL, SQLite). The cursor advances to the last id of each batch, making the full scan O(n) instead of O(n²). The existing getAllWorkflowIds(offset, limit) interface method is preserved for any other callers.

Throttling — added a RateLimiter (Guava, already a conductor-core dependency) that fires per workflow rather than per batch. Configured via conductor.reindex.rateLimitPerSecond (default 0 = unlimited, no behaviour change for existing deployments). At 50, for example, the reindex job will push at most 50 workflows/sec into ES regardless of batch size or processing speed.

The stale docker/login-action mention in the PR description has also been removed.

@nthmost-orkes
Copy link
Copy Markdown
Contributor

Returning to check this out again tomorrow.

@nthmost-orkes
Copy link
Copy Markdown
Contributor

Just noting that the original issue called for time-based filtering which is not implemented in this PR. Not a blocker since this is a voluntary reindexing call.

Copy link
Copy Markdown
Contributor

@nthmost-orkes nthmost-orkes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES Cluster Safety — Two Items Before Merge

1. Missing danger warning

This API can put an ES/OS cluster into a red state if used carelessly on a large deployment. There is currently nothing in the docs, the POST response, or the code that communicates this risk.

Required: Add a prominent warning — in both the English and Chinese docs and in the STARTED response body itself — along the lines of:

⚠️ This will blow your arm off if you are not careful. Bulk-writing every workflow and task back to ES can saturate ingest throughput, breach disk watermarks, and cause the cluster to start rejecting requests or enter a red state. Verify cluster health (GET /_cluster/health) and available disk space before pulling this trigger. The fact that this is a voluntary call is the only thing that makes it safe at all — that means the operator needs to know what they are doing before they call it.

The voluntary/on-demand nature of the call is what makes this acceptable — but that protection only works if operators are actually warned.

2. No pre-flight ES health / disk check

startReindex() currently transitions to RUNNING without checking:

  • ES/OS cluster health (if the cluster is already yellow or red, a bulk re-index will very likely make it worse)
  • ES/OS disk watermarks (if nodes are above the high-watermark, writes will be rejected immediately and reindexErrors will spike without the caller understanding why)

Recommended: Before the CAS transitions to RUNNING, query ES cluster health and refuse to start (returning a new PREFLIGHT_FAILED state with a descriptive message) if the cluster is not green. At minimum, surface a loud warning and require an explicit ?force=true query param to override. Giving operators actionable feedback here is much better than a flood of opaque errors in the log mid-run.

@nthmost-orkes
Copy link
Copy Markdown
Contributor

Follow-up suggestions (not blocking merge)

Two further improvements worth tracking as follow-up issues once the safety items above are addressed:

Backpressure on ES 429 responses

The current inner loop treats ES 429 Too Many Requests the same as any other error — it logs, increments reindexErrors, and immediately moves on. Under sustained load this means Conductor keeps firing requests into an already-overwhelmed cluster, accelerating the problem.

A targeted fix: catch 429s specifically and apply exponential backoff + retry rather than counting them as permanent failures. This directly reduces the red-state risk during a live reindex and plays well with the rate limiter that's already there.

Move batch/bulk ingest logic into the IndexDAO layer

Each workflow currently produces N+1 individual index calls: indexWorkflow + one indexTask per task. For a workflow with 50 tasks that's 51 round-trips, and 5,100 per 100-workflow batch.

The batching logic belongs in the ES/OS IndexDAO implementations, not in AdminServiceImpl. A bulkIndex(List<WorkflowSummary>, List<TaskSummary>) method on IndexDAO would let the ES/OS layer use the _bulk API for the entire batch — dramatically reducing per-document overhead and cluster pressure. This is a more substantial refactor so it makes sense as a follow-up, but it's worth filing now so it doesn't get lost.

Li-Xingyu and others added 2 commits May 6, 2026 10:48
…ger warning

* IndexDAO: add isClusterHealthy() default returning true; ES7/ES8 query
  GET /_cluster/health (5s timeout) and report green vs not-green.
* AdminServiceImpl.startReindex: refuse to start when the index cluster
  is not green, returning the new PREFLIGHT_FAILED state. ?force=true
  bypasses the check. STARTED and PREFLIGHT_FAILED responses both carry
  a 'warning' field describing the red-state risk.
* AdminResource: expose ?force=true query parameter on POST /reindex.
* docs/reindex.md and reindex-en.md: lead with a danger warning, document
  the pre-flight, force flag, PREFLIGHT_FAILED state, and warning field.
* tests: lenient stub for isClusterHealthy in setUp; new tests cover
  PREFLIGHT_FAILED, force bypass, and the warning field.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Support for Administrative Re-indexing / Data Synchronization from Persistence (Postgres) to Indexing (ES/OpenSearch)

2 participants