Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions labelbox/schema/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,12 +1454,35 @@ def _wait_until_data_rows_are_processed(
""" Wait until all the specified data rows are processed"""
start_time = datetime.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the start_time is set to the beginning of this function, I think _poll_data_row_processing_status may not wait the full wait_processing_max_seconds per chunk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that the wait_processing_max_seconds was to be applied to the entire call for _wait_until_data_rows_are_processed, and not the individual chunks. This should preserve original behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha makes sense


max_data_rows_per_poll = 100_000
if data_row_ids is not None:
for i in range(0, len(data_row_ids), max_data_rows_per_poll):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice use of step for chunking

chunk = data_row_ids[i:i + max_data_rows_per_poll]
self._poll_data_row_processing_status(
chunk, [], start_time, wait_processing_max_seconds,
sleep_interval)

if global_keys is not None:
for i in range(0, len(global_keys), max_data_rows_per_poll):
chunk = global_keys[i:i + max_data_rows_per_poll]
self._poll_data_row_processing_status(
[], chunk, start_time, wait_processing_max_seconds,
sleep_interval)

def _poll_data_row_processing_status(
self,
data_row_ids: List[str],
global_keys: List[str],
start_time: datetime,
wait_processing_max_seconds: int = _wait_processing_max_seconds,
sleep_interval=30):

while True:
if (datetime.now() -
start_time).total_seconds() >= wait_processing_max_seconds:
raise ProcessingWaitTimeout(
"Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
)
"""Maximum wait time exceeded while waiting for data rows to be processed.
Try creating a batch a bit later""")

all_good = self.__check_data_rows_have_been_processed(
data_row_ids, global_keys)
Expand Down