Skip to content

Commit

Permalink
Merge branch 'production'
Browse files Browse the repository at this point in the history
  • Loading branch information
idocx committed May 14, 2024
2 parents 69fdfc3 + 2b9fa08 commit dea9329
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions alab_management/resource_manager/resource_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
] # the raw request sent by task process


class RequestCanceledError(Exception):
"""Request Canceled Error."""


class CombinedTimeoutError(TimeoutError, concurrent.futures.TimeoutError):
"""
Combined TimeoutError.
Expand Down Expand Up @@ -240,7 +244,7 @@ def request_resources(
_id: ObjectId = cast(ObjectId, result.inserted_id)
self._waiting[_id] = {"f": f, "device_str_to_request": device_str_to_request}
try:
result = f.result(timeout=timeout)
result = self.get_concurrent_result(f, timeout=timeout)
except concurrent.futures.TimeoutError as e:
# if the request is not fulfilled, cancel it to make sure the resources are released
request = self._request_collection.find_one_and_update({
Expand All @@ -256,8 +260,7 @@ def request_resources(
f"Request {result.inserted_id} timed out after {timeout} seconds."
) from e
else: # if the request is fulfilled, return the result normally, wrong timeout
result = f.result(timeout=None)

result = self.get_concurrent_result(f)
return {
**self._post_process_requested_resource(
devices=result["devices"],
Expand All @@ -267,6 +270,22 @@ def request_resources(
"request_id": result["request_id"],
}

@staticmethod
def get_concurrent_result(f: Future, timeout: float | None = None):
"""
Get the result of a future with a timeout.
If the request is canceled, we will catch a RequestCanceledError and hang the program.
The hanged program will be killed by the abort exception in the task actor, which will
be handled in the task actor to clean up the lab.
"""
try:
return f.result(timeout=timeout)
except RequestCanceledError:
# if there is an abort signal, we will just hang the program
while True:
# abort signal here. It should be handled in the task actor
time.sleep(1)

def release_resources(self, request_id: ObjectId):
"""Release a request by request_id."""
# For the requests that were CANCELED or ERROR, but have assigned resources, release them
Expand Down Expand Up @@ -412,11 +431,7 @@ def _handle_canceled_request(self, request_id: ObjectId):

# for the canceled request, we will return an empty result
# and wait for the abort to be handled by the task actor
f.set_result({
"devices": {},
"sample_positions": {},
"request_id": request_id,
})
f.set_exception(RequestCanceledError("Abort signal received."))

@staticmethod
def _post_process_requested_resource(
Expand Down

0 comments on commit dea9329

Please sign in to comment.