Skip to content

Commit

Permalink
Merge 8f13b04 into ad1f1dd
Browse files Browse the repository at this point in the history
  • Loading branch information
kt474 committed Mar 7, 2022
2 parents ad1f1dd + 8f13b04 commit 994437f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 27 deletions.
51 changes: 38 additions & 13 deletions qiskit_ibm_runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def __init__(
self._params = params or {}
self._creation_date = creation_date
self._program_id = program_id
self._status = JobStatus.INITIALIZING
self._state = {"status": JobStatus.INITIALIZING}
self._reason: Optional[str] = None
self._error_message = None # type: Optional[str]
self._result_decoder = result_decoder
self._image = image
Expand Down Expand Up @@ -174,7 +175,7 @@ def result(
_decoder = decoder or self._result_decoder
if self._results is None or (_decoder != self._result_decoder):
self.wait_for_final_state(timeout=timeout)
if self._status == JobStatus.ERROR:
if self._state["status"] == JobStatus.ERROR:
raise RuntimeJobFailureError(
f"Unable to retrieve job result. " f"{self.error_message()}"
)
Expand All @@ -198,7 +199,7 @@ def cancel(self) -> None:
) from None
raise IBMRuntimeError(f"Failed to cancel job: {ex}") from None
self.cancel_result_streaming()
self._status = JobStatus.CANCELLED
self._state["status"] = JobStatus.CANCELLED

def status(self) -> JobStatus:
"""Return the status of the job.
Expand All @@ -207,7 +208,7 @@ def status(self) -> JobStatus:
Status of this job.
"""
self._set_status_and_error_message()
return self._status
return self._state["status"]

def error_message(self) -> Optional[str]:
"""Returns the reason if the job failed.
Expand All @@ -226,7 +227,7 @@ def wait_for_final_state(self, timeout: Optional[float] = None) -> None:
Args:
timeout: Seconds to wait for the job. If ``None``, wait indefinitely.
"""
if self._status not in JOB_FINAL_STATES and not self._is_streaming():
if self._state["status"] not in JOB_FINAL_STATES and not self._is_streaming():
self._ws_client_future = self._executor.submit(self._start_websocket_client)
if self._is_streaming():
self._ws_client_future.result(timeout)
Expand All @@ -250,7 +251,7 @@ def stream_results(
RuntimeInvalidStateError: If a callback function is already streaming results or
if the job already finished.
"""
if self._status in JOB_FINAL_STATES:
if self._state["status"] in JOB_FINAL_STATES:
raise RuntimeInvalidStateError("Job already finished.")
if self._is_streaming():
raise RuntimeInvalidStateError(
Expand Down Expand Up @@ -293,7 +294,7 @@ def logs(self) -> str:

def _set_status_and_error_message(self) -> None:
"""Fetch and set status and error message."""
if self._status not in JOB_FINAL_STATES:
if self._state["status"] not in JOB_FINAL_STATES:
response = self._api_client.job_get(job_id=self.job_id)
self._set_status(response)
self._set_error_message(response)
Expand All @@ -308,24 +309,48 @@ def _set_status(self, job_response: Dict) -> None:
IBMError: If an unknown status is returned from the server.
"""
try:
self._status = API_TO_JOB_STATUS[job_response["status"].upper()]
if "reason" in job_response["state"]:
self._reason = job_response["state"]["reason"]
status = job_response["state"]["status"]
self._state["status"] = self._get_api_msg(
status, API_TO_JOB_STATUS, self._reason
)
except KeyError:
raise IBMError(f"Unknown status: {job_response['status']}")
raise IBMError(f"Unknown status: {status}")

def _set_error_message(self, job_response: Dict) -> None:
"""Set error message if the job failed.
Args:
job_response: Job response from runtime API.
"""
if self._status == JobStatus.ERROR:
if self._state["status"] == JobStatus.ERROR:
job_result_raw = self._api_client.job_results(job_id=self.job_id)
self._error_message = API_TO_JOB_ERROR_MESSAGE[
job_response["status"].upper()
].format(self.job_id, job_result_raw)
self._error_message = self._get_api_msg(
job_response["state"]["status"], API_TO_JOB_ERROR_MESSAGE, self._reason
).format(self.job_id, job_result_raw)
else:
self._error_message = None

def _get_api_msg(
self,
status: str,
msg_type: Dict,
reason: str = None,
) -> str:
"""Return the job status or error message.
Args:
status: Job status returned from the API.
msg_type: Dictionary to use to convert API message.
reason: Job status reason returned from the API.
Returns: A job status or errror message.
"""
if reason:
status = status + " - " + reason
return msg_type[status.upper()]

def _is_streaming(self) -> bool:
"""Return whether job results are being streamed.
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def result_callback(job_id, result):
called_back_count = 0
job = self._run_program(service, interim_results="foobar")
job.wait_for_final_state()
job._status = JobStatus.RUNNING # Allow stream_results()
job._state["status"] = JobStatus.RUNNING # Allow stream_results()
job.stream_results(result_callback)
time.sleep(2)
# Callback is expected twice because both interim and final results are returned
Expand Down
26 changes: 13 additions & 13 deletions test/unit/mock/fake_runtime_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(
):
"""Initialize a fake job."""
self._job_id = job_id
self._status = final_status or "QUEUED"
self._state = {"status": final_status or "QUEUED"}
self._program_id = program_id
self._hub = hub
self._group = group
Expand All @@ -137,9 +137,9 @@ def _auto_progress(self):
"""Automatically update job status."""
for status in self._job_progress:
time.sleep(0.5)
self._status = status
self._state["status"] = status

if self._status == "COMPLETED":
if self._state["status"] == "COMPLETED":
self._result = json.dumps("foo")

def to_dict(self):
Expand All @@ -150,7 +150,7 @@ def to_dict(self):
"group": self._group,
"project": self._project,
"backend": self._backend_name,
"status": self._status,
"state": self._state,
"params": [self._params],
"program": {"id": self._program_id},
"image": self._image,
Expand All @@ -166,7 +166,7 @@ def interim_results(self):

def status(self):
"""Return job status."""
return self._status
return self._state["status"]


class FailedRuntimeJob(BaseFakeRuntimeJob):
Expand All @@ -178,7 +178,7 @@ def _auto_progress(self):
"""Automatically update job status."""
super()._auto_progress()

if self._status == "FAILED":
if self._state["status"] == "FAILED":
self._result = "Kaboom!"


Expand All @@ -191,7 +191,7 @@ def _auto_progress(self):
"""Automatically update job status."""
super()._auto_progress()

if self._status == "CANCELLED - RAN TOO LONG":
if self._state["status"] == "CANCELLED - RAN TOO LONG":
self._result = "Kaboom!"


Expand All @@ -214,7 +214,7 @@ def to_dict(self):
"""Convert to dictionary format."""
data = super().to_dict()
if self._cancelled:
data["status"] = "CANCELLED"
data["state"]["status"] = "CANCELLED"
return data


Expand All @@ -227,7 +227,7 @@ def _auto_progress(self):
"""Automatically update job status."""
super()._auto_progress()

if self._status == "COMPLETED":
if self._state["status"] == "COMPLETED":
self._result = json.dumps(self.custom_result, cls=RuntimeEncoder)


Expand All @@ -239,11 +239,11 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

def _auto_progress(self):
self._status = "RUNNING"
self._state["status"] = "RUNNING"
time.sleep(self._runtime)
self._status = "COMPLETED"
self._state["status"] = "COMPLETED"

if self._status == "COMPLETED":
if self._state["status"] == "COMPLETED":
self._result = json.dumps("foo")


Expand Down Expand Up @@ -412,7 +412,7 @@ def jobs_get(
count = len(self._jobs)
if pending is not None:
job_status_list = pending_statuses if pending else returned_statuses
jobs = [job for job in jobs if job._status in job_status_list]
jobs = [job for job in jobs if job._state["status"] in job_status_list]
count = len(jobs)
if program_id:
jobs = [job for job in jobs if job._program_id == program_id]
Expand Down

0 comments on commit 994437f

Please sign in to comment.