Minimize Database Calls During Workflow Submission#701
Conversation
📝 WalkthroughWalkthroughReplaces per-task DB inserts with a batched multi-row insert via a new Task.batch_insert_to_db; expands per-task payload fields. Also extends credential/bucket validation APIs to accept and propagate default buckets and credential caches with cached generic credential lookups. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/service/core/workflow/objects.py (1)
1044-1069: Extract this task-row assembly behind one shared helper.This 15-field positional tuple now exists here and in
src/utils/job/jobs.pylines 225-250, and both copies have to stay perfectly aligned withTask.batch_insert_to_db(...)/Task.insert_to_db(...). One field-order drift will turn into a runtime failure or bad writes. A small typed row object/helper onTaskwould keep that contract in one place.As per coding guidelines, "Add type annotations where they improve code clarity and catch errors in Python code."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/service/core/workflow/objects.py` around lines 1044 - 1069, Create a single typed helper on the Task model (e.g., Task.build_db_row or a TaskRow dataclass) that encapsulates the 15-field DB row structure and its types, and update Task.batch_insert_to_db and Task.insert_to_db to accept/consume that helper so the field order lives in one place; replace the inline tuple assembly in objects.py (the task_entries tuple creation loop referencing task.Task.batch_insert_to_db, task_obj, task_obj_spec, kb_objects.construct_pod_name, common.convert_resource_value_str, json.dumps, etc.) and the duplicate assembly in src/utils/job/jobs.py to call the new helper instead; add type annotations to the helper and its return type to satisfy the coding guideline and ensure callers build lists of the typed rows rather than positional tuples.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/job/task.py`:
- Around line 1045-1061: Validate the batch_size argument at the start of
batch_insert_to_db and raise a clear exception for non-positive values: check
that batch_size is an int > 0 (or at least > 0) and raise ValueError with a
descriptive message if not; this prevents range() crashes on 0 and silent no-ops
for negatives and makes the helper fail fast and predictably.
In `@src/utils/job/workflow.py`:
- Around line 533-535: The code eagerly calls
connectors.UserProfile.fetch_from_db(...) and database.get_all_data_creds(...)
during workflow validation; change this to lazy, cached lookups that only run
when a task actually performs dataset I/O: add a small helper (e.g.
lazy_user_profile_lookup()) that checks whether any task needs dataset access
and, on first need, calls connectors.UserProfile.fetch_from_db(...) to populate
default_user_bucket and calls database.get_all_data_creds(user) and caches the
results for subsequent uses; ensure you do not call these functions during
compute-only validations (avoid side-effecting inserts by deferring
fetch_from_db until required) and replace the direct variable uses
(default_user_bucket, default_service_bucket, user_creds) with the lazy
accessor.
- Around line 534-536: The parameter type for default_service_bucket should be
made nullable to match dataset_config.default_bucket and the defensive handling
in _fetch_bucket_info(); update the function/method signature that currently
declares default_service_bucket: str to default_service_bucket: Optional[str]
(and any other occurrences in the same parameter group around the current
signature span) and import or reference typing.Optional as needed so static type
checking aligns with dataset_config.default_bucket and the connector behavior.
---
Nitpick comments:
In `@src/service/core/workflow/objects.py`:
- Around line 1044-1069: Create a single typed helper on the Task model (e.g.,
Task.build_db_row or a TaskRow dataclass) that encapsulates the 15-field DB row
structure and its types, and update Task.batch_insert_to_db and
Task.insert_to_db to accept/consume that helper so the field order lives in one
place; replace the inline tuple assembly in objects.py (the task_entries tuple
creation loop referencing task.Task.batch_insert_to_db, task_obj, task_obj_spec,
kb_objects.construct_pod_name, common.convert_resource_value_str, json.dumps,
etc.) and the duplicate assembly in src/utils/job/jobs.py to call the new helper
instead; add type annotations to the helper and its return type to satisfy the
coding guideline and ensure callers build lists of the typed rows rather than
positional tuples.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 44847340-86f2-41ae-b14c-3f1f23dea924
📒 Files selected for processing (4)
src/service/core/workflow/objects.pysrc/utils/job/jobs.pysrc/utils/job/task.pysrc/utils/job/workflow.py
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/utils/job/workflow.py (1)
533-536:⚠️ Potential issue | 🟡 MinorKeep the data-profile lookups lazy.
These calls now run for every submission, even when the workflow has no dataset I/O. That adds DB work back to the submission hot path, and
UserProfile.fetch_from_db()can also create the default profile row during validation. Cache these values on first actual data validation instead of upfront.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/workflow.py` around lines 533 - 536, The code eagerly fetches user/data profile values (default_user_bucket via UserProfile.fetch_from_db, default_service_bucket via dataset_config.default_bucket, and user_creds via database.get_all_data_creds) on every submission; defer these lookups and cache them on first use during actual data validation instead: remove the upfront calls that assign default_user_bucket, default_service_bucket, and user_creds, keep generic_cred_cache, and implement lazy accessors (or move the fetch logic) inside the data-validation path so the first time you need a bucket or creds you call UserProfile.fetch_from_db and database.get_all_data_creds, store results in generic_cred_cache (or local cached variables) and reuse thereafter; reference the symbols default_user_bucket, default_service_bucket, user_creds, generic_cred_cache, UserProfile.fetch_from_db, database.get_all_data_creds, and dataset_config when making the change.src/utils/job/task.py (1)
1047-1063:⚠️ Potential issue | 🟠 MajorRestore the intended default batch size and fail fast on invalid input.
Current submission callers use the default, so
batch_size=2turns a 200-task submit into 100INSERTs instead of the ~2 queries this change is aiming for. Also, silently rewriting non-positive values to100hides caller bugs; this helper should raise instead.Suggested fix
def batch_insert_to_db( database: connectors.PostgresConnector, task_entries: List[Tuple], - batch_size: int = 2, - ): + batch_size: int = 100, + ) -> None: @@ - if batch_size <= 0: - batch_size = 100 + if batch_size <= 0: + raise ValueError('batch_size must be greater than 0')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/task.py` around lines 1047 - 1063, The batch-insert helper currently sets batch_size default to 2 and silently rewrites non-positive values to 100; change the default batch_size back to the intended 100 and make invalid inputs fail fast by raising a ValueError when batch_size <= 0 instead of silently resetting it. Locate the batch-insert function that takes task_entries and batch_size (the function in src/utils/job/task.py around the docstring shown) and update the function signature default to batch_size: int = 100 and add an input validation that raises a ValueError with a clear message referencing batch_size when it's <= 0. Ensure callers relying on the default get the intended batching behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/job/task.py`:
- Around line 1047-1063: The batch-insert helper currently sets batch_size
default to 2 and silently rewrites non-positive values to 100; change the
default batch_size back to the intended 100 and make invalid inputs fail fast by
raising a ValueError when batch_size <= 0 instead of silently resetting it.
Locate the batch-insert function that takes task_entries and batch_size (the
function in src/utils/job/task.py around the docstring shown) and update the
function signature default to batch_size: int = 100 and add an input validation
that raises a ValueError with a clear message referencing batch_size when it's
<= 0. Ensure callers relying on the default get the intended batching behavior.
In `@src/utils/job/workflow.py`:
- Around line 533-536: The code eagerly fetches user/data profile values
(default_user_bucket via UserProfile.fetch_from_db, default_service_bucket via
dataset_config.default_bucket, and user_creds via database.get_all_data_creds)
on every submission; defer these lookups and cache them on first use during
actual data validation instead: remove the upfront calls that assign
default_user_bucket, default_service_bucket, and user_creds, keep
generic_cred_cache, and implement lazy accessors (or move the fetch logic)
inside the data-validation path so the first time you need a bucket or creds you
call UserProfile.fetch_from_db and database.get_all_data_creds, store results in
generic_cred_cache (or local cached variables) and reuse thereafter; reference
the symbols default_user_bucket, default_service_bucket, user_creds,
generic_cred_cache, UserProfile.fetch_from_db, database.get_all_data_creds, and
dataset_config when making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 5681c3b2-d953-4043-905b-0d9cb4eff2cb
📒 Files selected for processing (3)
src/utils/job/jobs.pysrc/utils/job/task.pysrc/utils/job/workflow.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/utils/job/task.py (1)
1047-1048: Consider a more specific type annotation fortask_entries.
List[Tuple]is quite generic. Since callers must provide tuples with exactly 15 elements in a specific order (matching the INSERT columns), a more descriptive type would improve maintainability:♻️ Suggested type alias
# At module level or in a types module TaskInsertRow = Tuple[ str, # workflow_id str, # name str, # group_name str, # task_db_key int, # retry_id str, # task_uuid str, # status str, # pod_name Optional[str], # failure_message float, # gpu_count float, # cpu_count float, # disk_count float, # memory_count str, # exit_actions (JSON) bool, # lead ] # Then in the method signature: def batch_insert_to_db( database: connectors.PostgresConnector, task_entries: List[TaskInsertRow], batch_size: int = 100, ):Alternatively, a dataclass could enforce structure at construction time rather than relying on tuple ordering.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/task.py` around lines 1047 - 1048, The parameter task_entries in batch_insert_to_db is typed as List[Tuple], which is too generic given callers must supply 15-position tuples matching the INSERT columns; define a specific type alias (e.g., TaskInsertRow) or a dataclass representing the 15 fields (workflow_id, name, group_name, task_db_key, retry_id, task_uuid, status, pod_name, failure_message, gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead) and change the signature to List[TaskInsertRow] (or List[YourDataclass]) and update any callers to construct that typed tuple/dataclass to improve clarity and maintainability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/utils/job/task.py`:
- Around line 1047-1048: The parameter task_entries in batch_insert_to_db is
typed as List[Tuple], which is too generic given callers must supply 15-position
tuples matching the INSERT columns; define a specific type alias (e.g.,
TaskInsertRow) or a dataclass representing the 15 fields (workflow_id, name,
group_name, task_db_key, retry_id, task_uuid, status, pod_name, failure_message,
gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead) and change
the signature to List[TaskInsertRow] (or List[YourDataclass]) and update any
callers to construct that typed tuple/dataclass to improve clarity and
maintainability.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f650e166-851c-4d27-a212-aefdbfa49b18
📒 Files selected for processing (1)
src/utils/job/task.py
Description
For big workflow submissions (e.g. hundreds of tasks), sometimes the service spends too long processing that the request times out, giving the user a 504.
This PR tackles this by minimizing the number of DB calls:
Issue #411 #646
Checklist
Summary by CodeRabbit
Performance Improvements
Data/Telemetry