-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
job.py
147 lines (117 loc) · 4.93 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Job abstract interface."""
from __future__ import annotations
import time
from abc import ABC, abstractmethod
from typing import Callable, Optional, TYPE_CHECKING
from qiskit.exceptions import QiskitError
from qiskit.providers.backend import Backend
from qiskit.providers.exceptions import JobTimeoutError
from qiskit.providers.jobstatus import JOB_FINAL_STATES, JobStatus
if TYPE_CHECKING:
from qiskit.result import Result
class Job:
"""Base common type for all versioned Job abstract classes.
Note this class should not be inherited from directly, it is intended
to be used for type checking. When implementing a provider you should use
the versioned abstract classes as the parent class and not this class
directly.
"""
version = 0
class JobV1(Job, ABC):
"""Class to handle jobs
This first version of the Backend abstract class is written to be mostly
backwards compatible with the legacy providers interface. This was done to ease
the transition for users and provider maintainers to the new versioned providers. Expect,
future versions of this abstract class to change the data model and
interface.
"""
version = 1
_async = True
def __init__(self, backend: Optional[Backend], job_id: str, **kwargs) -> None:
"""Initializes the asynchronous job.
Args:
backend: the backend used to run the job.
job_id: a unique id in the context of the backend used to run
the job.
kwargs: Any key value metadata to associate with this job.
"""
self._job_id = job_id
self._backend = backend
self.metadata = kwargs
def job_id(self) -> str:
"""Return a unique id identifying the job."""
return self._job_id
def backend(self) -> Backend:
"""Return the backend where this job was executed."""
if self._backend is None:
raise QiskitError("The job does not have any backend.")
return self._backend
def done(self) -> bool:
"""Return whether the job has successfully run."""
return self.status() == JobStatus.DONE
def running(self) -> bool:
"""Return whether the job is actively running."""
return self.status() == JobStatus.RUNNING
def cancelled(self) -> bool:
"""Return whether the job has been cancelled."""
return self.status() == JobStatus.CANCELLED
def in_final_state(self) -> bool:
"""Return whether the job is in a final job state such as ``DONE`` or ``ERROR``."""
return self.status() in JOB_FINAL_STATES
def wait_for_final_state(
self, timeout: Optional[float] = None, wait: float = 5, callback: Optional[Callable] = None
) -> None:
"""Poll the job status until it progresses to a final state such as ``DONE`` or ``ERROR``.
Args:
timeout: Seconds to wait for the job. If ``None``, wait indefinitely.
wait: Seconds between queries.
callback: Callback function invoked after each query.
The following positional arguments are provided to the callback function:
* job_id: Job ID
* job_status: Status of the job from the last query
* job: This BaseJob instance
Note: different subclass might provide different arguments to
the callback function.
Raises:
JobTimeoutError: If the job does not reach a final state before the
specified timeout.
"""
if not self._async:
return
start_time = time.time()
status = self.status()
while status not in JOB_FINAL_STATES:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise JobTimeoutError(f"Timeout while waiting for job {self.job_id()}.")
if callback:
callback(self.job_id(), status, self)
time.sleep(wait)
status = self.status()
return
@abstractmethod
def submit(self):
"""Submit the job to the backend for execution."""
pass
@abstractmethod
def result(self) -> Result:
"""Return the results of the job."""
pass
def cancel(self):
"""Attempt to cancel the job."""
raise NotImplementedError
@abstractmethod
def status(self) -> JobStatus:
"""Return the status of the job, among the values of ``JobStatus``."""
pass