Skip to content

Conversation

@marcorudolphflex
Copy link
Contributor

@marcorudolphflex marcorudolphflex commented Oct 15, 2025

refers to #2893

Previous State
Batch.run() waits for every job to finish (Batch.monitor()) before calling Batch.download(), so even tasks that completed early don’t start transferring until the whole batch is done. This is a noticeable delay for large batches with many short simulations.

Problem
Users want each simulation’s results to begin downloading immediately after that simulation hits success, rather than waiting for the entire batch to complete.

Solution
Monitor-triggered downloads: Extend the Batch.monitor() loop to detect job transitions to success and queue job.download() right away via a shared executor given the kwarg download_on_success.
Download in Batch.load() is then skipped by having kwarg skip_download which is False as default for other callers.

Greptile Overview

Updated On: 2025-10-15 14:32:52 UTC

Greptile Summary

This PR implements per-simulation downloads during batch runs to improve user experience by downloading completed simulations immediately rather than waiting for the entire batch to finish. The change extends the Batch.monitor() method with a download_on_success parameter that creates a ThreadPoolExecutor to handle concurrent downloads when individual jobs reach "success" status. The Batch.run() method is updated to use monitor(download_on_success=True) followed by load(skip_download=True) to avoid double-downloading. This addresses a performance bottleneck where users experienced unnecessary delays for large batches with many short simulations that completed at different times.

Changed Files
Filename Score Overview
CHANGELOG.md 5/5 Added changelog entry documenting the new per-simulation download feature
tests/test_web/test_webapi.py 5/5 Added comprehensive tests for per-simulation downloads with proper mocking and timing verification
tidy3d/web/api/container.py 4/5 Implemented core feature with new parameters for monitor/load methods and download scheduling logic

Confidence score: 4/5

  • This PR is safe to merge with minimal risk as it maintains backward compatibility through default parameters and proper error handling
  • Score reflects well-implemented feature with comprehensive testing, though the complex threading logic in container.py requires attention to ensure proper resource cleanup
  • Pay close attention to the download executor cleanup logic in the finally block and concurrent access patterns in the monitor method

Sequence Diagram

sequenceDiagram
    participant User
    participant Batch as "Batch"
    participant Job as "Job"
    participant WebAPI as "webapi"
    participant Executor as "ThreadPoolExecutor"
    participant Progress as "Progress Bar"
    
    User->>+Batch: "run(path_dir)"
    Batch->>+Batch: "_check_path_dir(path_dir)"
    Batch-->>-Batch: "create dir if needed"
    Batch->>+Batch: "upload()"
    
    Note over Batch,Job: "Upload Phase"
    Batch->>+Executor: "ThreadPoolExecutor(max_workers)"
    loop "For each job"
        Batch->>+Job: "job.upload()"
        Job->>+WebAPI: "web.upload(**kwargs)"
        WebAPI-->>-Job: "task_id"
        Job-->>-Batch: "uploaded"
    end
    Batch->>+Progress: "create progress bar"
    Progress-->>-Batch: "show upload progress"
    Executor-->>-Batch: "all uploads complete"
    
    Batch->>+Batch: "to_file(batch_path)"
    Batch-->>-Batch: "save batch.hdf5 with task_ids"
    
    Note over Batch,Job: "Start Phase"
    Batch->>+Batch: "start(priority)"
    loop "For each job"
        Batch->>+Job: "job.start(priority)"
        Job->>+WebAPI: "web.start(task_id, priority)"
        WebAPI-->>-Job: "started"
        Job-->>-Batch: "started"
    end
    Batch-->>-Batch: "all jobs started"
    
    Note over Batch,Job: "Monitor Phase with Downloads"
    Batch->>+Batch: "monitor(download_on_success=True)"
    Batch->>+Executor: "download_executor = ThreadPoolExecutor()"
    
    loop "Until all jobs complete"
        loop "For each job"
            Batch->>+Job: "job.status"
            Job->>+WebAPI: "web.get_info(task_id)"
            WebAPI-->>-Job: "status info"
            Job-->>-Batch: "status"
            
            alt "status == 'success'"
                Batch->>+Executor: "submit download job"
                Executor->>+Job: "job.download(path)"
                Job->>+WebAPI: "web.download(task_id, path)"
                WebAPI-->>-Job: "download complete"
                Job-->>-Executor: "file saved"
                Executor-->>-Batch: "download future"
            end
        end
        Batch->>+Progress: "update progress bars"
        Progress-->>-Batch: "display updated status"
    end
    
    Executor-->>-Batch: "all downloads complete"
    Batch-->>-Batch: "monitoring complete"
    
    Note over Batch,User: "Load Phase"
    Batch->>+Batch: "load(path_dir, skip_download=True)"
    Batch->>+Batch: "create BatchData object"
    Batch-->>-Batch: "BatchData with task_paths"
    Batch-->>-User: "BatchData"
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

@marcorudolphflex marcorudolphflex force-pushed the FXC-3689-per-simulation-downloads-during-batch-runs branch from 2177a21 to b5e27b7 Compare October 15, 2025 14:41
@github-actions
Copy link
Contributor

github-actions bot commented Oct 15, 2025

Diff Coverage

Diff: origin/develop...HEAD, staged and unstaged changes

  • tidy3d/web/api/container.py (65.7%): Missing lines 953-955,973,984-985,1039-1040,1043,1048,1051,1053,1056-1058,1060-1063,1065-1066,1068-1070,1072-1073,1083-1086,1088-1089,1108,1281
  • tidy3d/web/api/tidy3d_stub.py (85.7%): Missing lines 69

Summary

  • Total: 106 lines
  • Missing: 35 lines
  • Coverage: 66%

tidy3d/web/api/container.py

  949         def _should_download(job) -> bool:
  950             status = job.status
  951             if not web._is_modeler_batch(job.task_id):
  952                 return status == "success"
! 953             if status == "success":
! 954                 return True
! 955             return status == "run_success" and getattr(job, "postprocess_status", None) == "success"
  956 
  957         def schedule_download(job) -> None:
  958             if download_executor is None or not _should_download(job):
  959                 return

  969                         f"File '{job_path_str}' already exists. Skipping download "
  970                         "(set `replace_existing=True` to overwrite)."
  971                     )
  972                     return
! 973                 log.info(f"File '{job_path_str}' already exists. Overwriting.")
  974 
  975             downloads_started.add(task_id)
  976             download_futures[task_id] = download_executor.submit(job.download, job_path_str)

  980             status = job.status
  981             if not web._is_modeler_batch(job.task_id):
  982                 return status not in END_STATES
  983             if status == "run_success":
! 984                 return job.postprocess_status not in END_STATES
! 985             return status not in END_STATES
  986 
  987         def pbar_description(
  988             task_name: str, status: str, max_name_length: int, status_width: int
  989         ) -> str:

  1035                             desc, total=COMPLETED_PERCENT, completed=completed
  1036                         )
  1037 
  1038                     while any(check_continue_condition(job) for job in self.jobs.values()):
! 1039                         for task_name, job in self.jobs.items():
! 1040                             status = job.status
  1041 
  1042                             # auto-start postprocess for modeler jobs when run finishes
! 1043                             if (
  1044                                 web._is_modeler_batch(job.task_id)
  1045                                 and status == "run_success"
  1046                                 and job.task_id not in postprocess_started_tasks
  1047                             ):
! 1048                                 job.postprocess_start(
  1049                                     worker_group=postprocess_worker_group, verbose=True
  1050                                 )
! 1051                                 postprocess_started_tasks.add(job.task_id)
  1052 
! 1053                             schedule_download(job)
  1054 
  1055                             # choose display status & percent
! 1056                             if status != "run_success":
! 1057                                 display_status = status
! 1058                                 pct = STATE_PROGRESS_PERCENTAGE.get(status, 0)
  1059                             else:
! 1060                                 post_st = getattr(job, "postprocess_status", None)
! 1061                                 if post_st in END_STATES:
! 1062                                     display_status = post_st
! 1063                                     pct = STATE_PROGRESS_PERCENTAGE.get(post_st, 0)
  1064                                 else:
! 1065                                     display_status = "postprocess"
! 1066                                     pct = STATE_PROGRESS_PERCENTAGE.get("postprocess", 0)
  1067 
! 1068                             pbar = pbar_tasks[task_name]
! 1069                             desc = pbar_description(task_name, display_status, max_name_length, 0)
! 1070                             progress.update(pbar, description=desc, completed=pct)
  1071 
! 1072                         progress.refresh()
! 1073                         time.sleep(BATCH_MONITOR_PROGRESS_REFRESH_TIME)
  1074 
  1075                     # final render to terminal state for all bars
  1076                     for task_name, job in self.jobs.items():
  1077                         schedule_download(job)

  1079                         if status != "run_success":
  1080                             display_status = status
  1081                             pct = STATE_PROGRESS_PERCENTAGE.get(status, COMPLETED_PERCENT)
  1082                         else:
! 1083                             post_st = getattr(job, "postprocess_status", None)
! 1084                             if post_st in END_STATES:
! 1085                                 display_status = post_st
! 1086                                 pct = STATE_PROGRESS_PERCENTAGE.get(post_st, COMPLETED_PERCENT)
  1087                             else:
! 1088                                 display_status = "postprocess"
! 1089                                 pct = STATE_PROGRESS_PERCENTAGE.get(
  1090                                     "postprocess", COMPLETED_PERCENT
  1091                                 )
  1092 
  1093                         pbar = pbar_tasks[task_name]

  1104                             web._is_modeler_batch(job.task_id)
  1105                             and job.status == "run_success"
  1106                             and job.task_id not in postprocess_started_tasks
  1107                         ):
! 1108                             job.postprocess_start(
  1109                                 worker_group=postprocess_worker_group, verbose=False
  1110                             )
  1111                             postprocess_started_tasks.add(job.task_id)

  1277             if isinstance(job.simulation, ModeSolver):
  1278                 job_data = data[task_name]
  1279                 job.simulation._patch_data(data=job_data)
  1280         if not skip_download:
! 1281             self.download(path_dir=path_dir, replace_existing=replace_existing)
  1282 
  1283         return data
  1284 
  1285     def delete(self) -> None:

tidy3d/web/api/tidy3d_stub.py

  65 def task_type_name_of(simulation: WorkflowType) -> str:
  66     for cls, ttype in TYPE_MAP.items():
  67         if isinstance(simulation, cls):
  68             return ttype.name
! 69     raise TypeError(f"Could not find task type for: {type(simulation).__name__}")
  70 
  71 
  72 class Tidy3dStub(BaseModel, TaskStub):
  73     simulation: WorkflowType = pd.Field(discriminator="type")

@marcorudolphflex marcorudolphflex force-pushed the FXC-3689-per-simulation-downloads-during-batch-runs branch from b5e27b7 to c2e34b5 Compare October 17, 2025 10:15
Copy link
Collaborator

@yaugenst-flex yaugenst-flex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @marcorudolphflex looks great!

@marcorudolphflex marcorudolphflex changed the title feat(tidy3d): Per-Simulation Downloads During Batch Runs feat(tidy3d): FXC-3689 Per-Simulation Downloads During Batch Runs Oct 17, 2025
@marcorudolphflex marcorudolphflex force-pushed the FXC-3689-per-simulation-downloads-during-batch-runs branch from c2e34b5 to 5ec4203 Compare October 17, 2025 14:18
@marcorudolphflex marcorudolphflex added this pull request to the merge queue Oct 17, 2025
Merged via the queue into develop with commit 8d478df Oct 17, 2025
25 checks passed
@marcorudolphflex marcorudolphflex deleted the FXC-3689-per-simulation-downloads-during-batch-runs branch October 17, 2025 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants