UN-3197 [MISC] Remove Celery file processing workers and dead code#1777
Conversation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis pull request consolidates the Celery worker infrastructure by removing legacy v1 worker implementations (worker, worker-logging, worker-file-processing, worker-file-processing-callback), eliminating the FileExecutionTasks orchestration layer, and refactoring task invocations to use unified celery_app.send_task patterns. Updates deployment matrix and environment configuration accordingly. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…' of github-muhammad:Zipstack/unstract into feat/UN-3197-MISC_remove_celery_file_processing_workers
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/workflow_manager/workflow_v2/workflow_helper.py`:
- Around line 244-275: The failure handler _handle_execution_failure currently
updates only the WorkflowExecution and logs errors, which can leave the pipeline
run stuck if chord enqueue exceptions bypass the outer run_workflow path; import
and call PipelineUtils.update_pipeline_status (or the equivalent pipeline status
updater) from inside _handle_execution_failure using the pipeline/pipeline_run
identifier available on workflow_execution (e.g.,
workflow_execution.pipeline_run.id or workflow_execution.workflow.pipeline.id)
to mark the pipeline run as failed/errored and include the error message, or
alternatively re-raise the exception after cleanup so the outer run_workflow
path updates pipeline status; ensure you reference
PipelineUtils.update_pipeline_status and _handle_execution_failure in the
change.
|
@muhammad-ali-e what about changes to the |
Remove legacy worker, worker-logging, worker-file-processing, and worker-file-processing-callback services from docker-compose. Promote v2 unified workers from opt-in profile to default. Clean up related flags, env vars, docs, and override configs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…' of github-muhammad:Zipstack/unstract into feat/UN-3197-MISC_remove_celery_file_processing_workers
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docker/sample.env (1)
10-18:⚠️ Potential issue | 🟡 MinorRename
WORKER_FILE_PROCESSING_NEW_AUTOSCALEtoWORKER_FILE_PROCESSING_AUTOSCALE.Line 15 inconsistently includes a
_NEWsuffix that doesn't appear in any other autoscale variable. All other autoscale variables follow the patternWORKER_{TYPE}_AUTOSCALE, and the file processing worker's other variables (e.g.,WORKER_FILE_PROCESSING_CONCURRENCYon line 85) don't use this suffix either. Remove the_NEWsuffix to align with the naming convention.
Move `get_plugin` import to top-level and rename WORKER_FILE_PROCESSING_NEW_AUTOSCALE to WORKER_FILE_PROCESSING_AUTOSCALE. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (3)
backend/workflow_manager/workflow_v2/workflow_helper.py (2)
209-232: Hoistis_apiout of the loop — it's computed redundantly and creates a fragile cross-scope reference.
is_apiis assigned inside thefor batch in batches:loop (lines 210–212) but then referenced after the loop at line 230 (inside thetryblock).source.endpoint.connection_typeis constant across all iterations, so re-evaluating it on every pass is wasteful. More importantly, ifbatchesis ever empty (e.g., if the early-return guard at line 165 is removed, orget_file_batchesis changed),is_apiwill be undefined when line 230 executes, raising aNameError.♻️ Proposed fix: hoist `is_api` before the loop
+ # Determine the appropriate queue based on connection type + is_api = ( + source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API + ) + file_processing_queue = ( + FileProcessingQueue.API_FILE_PROCESSING + if is_api + else FileProcessingQueue.FILE_PROCESSING + ) + for batch in batches: # Convert all UUIDs to strings in batch_data file_data = FileData(...) batch_data = FileBatchData(files=batch, file_data=file_data) - # Determine the appropriate queue based on connection type - is_api = ( - source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API - ) - file_processing_queue = ( - FileProcessingQueue.API_FILE_PROCESSING - if is_api - else FileProcessingQueue.FILE_PROCESSING - ) # Send each batch to the dedicated file_processing queue batch_tasks.append( celery_app.signature( "process_file_batch", args=[batch_data.to_dict()], queue=file_processing_queue, ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/workflow_helper.py` around lines 209 - 232, The variable is_api is computed inside the for batch in batches: loop (using source.endpoint.connection_type) but is later referenced after the loop when selecting FileProcessingQueue.* for file_processing_callback_queue, which can raise NameError if batches is empty; move the is_api computation (is_api = source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API) to before the loop so it is evaluated once outside the loop and then use that single value when building batch_tasks (celery_app.signature "process_file_batch") and when selecting FileProcessingQueue.API_FILE_PROCESSING_CALLBACK vs FILE_PROCESSING_CALLBACK; ensure no other logic depends on per-batch evaluation.
256-272: Address static analysis warnings in_handle_execution_failure.Four Ruff findings in the new method:
- Lines 256 & 272:
str(error)inside f-strings should use the!sconversion flag (RUF010).- Line 268: Bare
except Exception(BLE001) — catch a more specific exception type if possible, or at minimum acknowledge the broad catch.- Line 269:
logger.errorinside anexceptblock should belogger.exceptionso the traceback is captured automatically (TRY400).♻️ Proposed fix
workflow_execution.update_execution( status=ExecutionStatus.ERROR, - error=f"Error while processing files: {str(error)}", + error=f"Error while processing files: {error!s}", ) organization_id = workflow_execution.workflow.organization.organization_id subscription_usage_plugin = get_plugin("subscription_usage") if subscription_usage_plugin: try: service = subscription_usage_plugin["service_class"]() service.handle_workflow_execution_failure( organization_id=organization_id, execution_id=str(workflow_execution.id), ) - except Exception as e: - logger.error(f"Error in subscription usage plugin failure handler: {e}") + except Exception as e: # noqa: BLE001 + logger.exception(f"Error in subscription usage plugin failure handler: {e}") logger.error( - f"Execution {workflow_execution.id} failed: {str(error)}", exc_info=True + f"Execution {workflow_execution.id} failed: {error!s}", exc_info=True )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/workflow_helper.py` around lines 256 - 272, In _handle_execution_failure update the f-strings that currently use str(error) to use the !s conversion (e.g., f"...{error!s}...") for both the "Error while processing files" and "Execution {workflow_execution.id} failed" logs; in the subscription_usage_plugin block replace the bare "except Exception as e" with a more explicit catch if possible or at minimum keep "except Exception as e" but change the inner logger call to logger.exception(...) so the traceback is recorded when service.handle_workflow_execution_failure raises (reference workflow_execution, subscription_usage_plugin, service.handle_workflow_execution_failure, and logger.error/logger.exception).docker/sample.env (1)
10-11: Clarify the comment —WORKER_*_AUTOSCALEand*_AUTOSCALEare distinct variable names.The phrase "matches hierarchical configuration below" is slightly misleading: the top block (lines 12–18) uses
WORKER_*_AUTOSCALEprefixed variables, while the hierarchical section (lines 77–100) uses unprefixed*_AUTOSCALEnames (e.g.,WORKER_GENERAL_AUTOSCALE=6,2vsGENERAL_AUTOSCALE=6,2). They share the same values, but they're separate env vars consumed by different layers (Docker Compose vs Celery config hierarchy). A comment like# Docker Compose–level autoscale limitswould be less ambiguous.✏️ Suggested comment clarification
-# -# Worker autoscaling (matches hierarchical configuration below) +# +# Docker Compose–level autoscale limits (separate from per-worker Celery AUTOSCALE vars below)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker/sample.env` around lines 10 - 11, The comment "matches hierarchical configuration below" is ambiguous because Docker Compose uses prefixed environment variables (WORKER_*_AUTOSCALE) while the Celery/hierarchical config uses unprefixed variables (*_AUTOSCALE) even though they hold the same values; update the comment to explicitly differentiate these two sets (e.g., "Docker Compose–level autoscale limits" for WORKER_*_AUTOSCALE and note that separate unprefixed *_AUTOSCALE vars are used by the Celery/hierarchy), so readers know they are distinct env var names consumed by different layers (refer to WORKER_*_AUTOSCALE and *_AUTOSCALE).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@backend/workflow_manager/workflow_v2/workflow_helper.py`:
- Around line 245-246: The exception handler currently only calls
cls._handle_execution_failure(workflow_execution, e) but does not update/persist
the pipeline/workflow_execution status for chord enqueue failures; modify the
except block so after (or inside) cls._handle_execution_failure it also sets the
workflow_execution status to the failed state (e.g., call
workflow_execution.update_status('FAILED') or workflow_execution.mark_failed())
and persists/saves the change so the pipeline status reflects the failure;
ensure you reference the existing workflow_execution object and keep the call to
cls._handle_execution_failure to preserve existing failure handling logic.
---
Nitpick comments:
In `@backend/workflow_manager/workflow_v2/workflow_helper.py`:
- Around line 209-232: The variable is_api is computed inside the for batch in
batches: loop (using source.endpoint.connection_type) but is later referenced
after the loop when selecting FileProcessingQueue.* for
file_processing_callback_queue, which can raise NameError if batches is empty;
move the is_api computation (is_api = source.endpoint.connection_type ==
WorkflowEndpoint.ConnectionType.API) to before the loop so it is evaluated once
outside the loop and then use that single value when building batch_tasks
(celery_app.signature "process_file_batch") and when selecting
FileProcessingQueue.API_FILE_PROCESSING_CALLBACK vs FILE_PROCESSING_CALLBACK;
ensure no other logic depends on per-batch evaluation.
- Around line 256-272: In _handle_execution_failure update the f-strings that
currently use str(error) to use the !s conversion (e.g., f"...{error!s}...") for
both the "Error while processing files" and "Execution {workflow_execution.id}
failed" logs; in the subscription_usage_plugin block replace the bare "except
Exception as e" with a more explicit catch if possible or at minimum keep
"except Exception as e" but change the inner logger call to
logger.exception(...) so the traceback is recorded when
service.handle_workflow_execution_failure raises (reference workflow_execution,
subscription_usage_plugin, service.handle_workflow_execution_failure, and
logger.error/logger.exception).
In `@docker/sample.env`:
- Around line 10-11: The comment "matches hierarchical configuration below" is
ambiguous because Docker Compose uses prefixed environment variables
(WORKER_*_AUTOSCALE) while the Celery/hierarchical config uses unprefixed
variables (*_AUTOSCALE) even though they hold the same values; update the
comment to explicitly differentiate these two sets (e.g., "Docker Compose–level
autoscale limits" for WORKER_*_AUTOSCALE and note that separate unprefixed
*_AUTOSCALE vars are used by the Celery/hierarchy), so readers know they are
distinct env var names consumed by different layers (refer to WORKER_*_AUTOSCALE
and *_AUTOSCALE).
…ing_workers Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
…tadata Resolve conflict: accept deletion of file_execution_tasks.py (dead code removed in #1777 after workers v2 migration). The API metadata enrichment change from that file is no longer needed as workers v2 handles destination processing differently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>



What
file_execution_tasks.py(all remaining methods were dead code after task removal)FileExecutionTasksimport and usage fromworkflow_helper.pyutils/constants.pyworker,worker-logging,worker-file-processing,worker-file-processing-callback) from docker-composeworkers-v2profile) to default services--workers-v2flag fromrun-platform.shWhy
process_file_batch,process_batch_callback), all 18 remaining methods inFileExecutionTaskshad zero external callers — pure dead codebackend/workers/package (file_processing, file_processing_callback) is fully supersededHow
backend/backend/workers/package entirely (file_processing + file_processing_callback workers)backend/workflow_manager/workflow_v2/file_execution_tasks.py(1200+ lines of dead code)celery_task.pyandcelery_service.pyFileProcessingQueueconstants tobackend/utils/constants.pyworkflow_helper.pyto remove all references to deleted tasksrun-platform.shand CI workflow to remove old worker servicespyproject.tomlworker,worker-logging,worker-file-processing,worker-file-processing-callback) fromdocker/docker-compose.yamlprofiles: [workers-v2]from 8 v2 worker services, making them start by defaultcelery-flowerdepends_on to reference only remaining services--workers-v2flag and conditional profile logic fromrun-platform.shWORKER_*_AUTOSCALEenv vars fromdocker/sample.envdocker/sample.compose.override.yamldocker/README.mdCan this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
FileExecutionTasksorfile_execution_tasksremain in the codebaseFileProcessingQueueconstants are preserved inutils/constants.pyfor continued use byworkflow_helper.pyworkers-v2profileDatabase Migrations
Env Config
CELERY_FILE_PROCESSING_*,CELERY_FILE_PROCESSING_CALLBACK_*) are no longer needed but their presence is harmlessWORKER_*_AUTOSCALEvariables for old workers are removed fromsample.envRelevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
--workers-v2flagScreenshots
N/A
Checklist
I have read and understood the Contribution Guidelines.