-
Notifications
You must be signed in to change notification settings - Fork 38
/
dwave_micro_client.py
1626 lines (1274 loc) · 58.9 KB
/
dwave_micro_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
An implementation of the REST API exposed by D-Wave Solver API (SAPI) servers.
This API lets you submit an Ising model and receive samples from a distribution over the model
as defined by the solver you have selected.
- The SAPI servers provide authentication, queuing, and scheduling services, and
provide a network interface to the solvers.
- A solver is a resource that can sample from a discrete quadratic model.
- This package implements the REST interface these servers provide.
An example using the client:
.. code-block:: python
:linenos:
import dwave_micro_client
import random
# Connect using explicit connection information
conn = dwave_micro_client.Connection('https://sapi-url', 'token-string')
# Load a solver by name
solver = conn.get_solver('test-solver')
# Build a random Ising model on +1, -1. Build it to exactly fit the graph the solver provides
linear = {index: random.choice([-1, 1]) for index in solver.nodes}
quad = {key: random.choice([-1, 1]) for key in solver.undirected_edges}
# Send the problem for sampling, include a solver specific parameter 'num_reads'
results = solver.sample_ising(linear, quad, num_reads=100)
# Print out the first sample
print(results.samples[0])
Rough workflow within the SAPI server:
1. Submitted problems enter an input queue. Each user has an input queue per solver.
2. Drawing from all input queues for a solver, problems are scheduled.
3. Results of the server are cached for retrieval by the client.
By default all sampling requests will be processed asynchronously. Reading results from
any future object is a blocking operation.
.. code-block:: python
:linenos:
# We can submit several sample requests without blocking
# (In this specific case we could accomplish the same thing by increasing 'num_reads')
futures = [solver.sample_ising(linear, quad, num_reads=100) for _ in range(10)]
# We can check if a set of samples are ready without blocking
print(futures[0].done())
# We can wait on a single future
futures[0].wait()
# Or we can wait on several futures
dwave_micro_client.Future.wait_multiple(futures)
"""
# TODOS:
# - More testing for sample_qubo
from __future__ import division, absolute_import
import json
import threading
import base64
import struct
import time
import sys
import os
import posixpath
import types
import logging
import requests
import collections
import datetime
import six
import six.moves.queue as queue
import six.moves
range = six.moves.range
# Get the logger using the recommended name
log = logging.getLogger(__name__)
# log.setLevel(logging.DEBUG)
# log.addHandler(logging.StreamHandler(sys.stdout))
# Use numpy if available for fast decoding
try:
import numpy as np
_numpy = True
except ImportError:
# If numpy isn't available we can do the encoding slower in native python
_numpy = False
class SolverFailureError(Exception):
"""An exception raised when there is a remote failure calling a solver."""
pass
class SolverAuthenticationError(Exception):
"""An exception raised when there is an authentication error."""
def __init__(self):
super(SolverAuthenticationError, self).__init__("Token not accepted for that action.")
class CanceledFutureError(Exception):
"""An exception raised when code tries to read from a canceled future."""
def __init__(self):
super(CanceledFutureError, self).__init__("An error occured reading results from a canceled request")
class Connection:
"""
Connect to a SAPI server to expose the solvers that the server advertises.
Args:
url (str): URL of the SAPI server.
token (str): Authentication token from the SAPI server.
proxies (dict): Mapping from the connection scheme (http[s]) to the proxy server address.
permissive_ssl (boolean; false by default): Disables SSL verification.
"""
# The status flags that a problem can have
STATUS_IN_PROGRESS = 'IN_PROGRESS'
STATUS_PENDING = 'PENDING'
STATUS_COMPLETE = 'COMPLETED'
STATUS_FAILED = 'FAILED'
STATUS_CANCELLED = 'CANCELLED'
# Cases when multiple status flags qualify
ANY_STATUS_ONGOING = [STATUS_IN_PROGRESS, STATUS_PENDING]
ANY_STATUS_NO_RESULT = [STATUS_FAILED, STATUS_CANCELLED]
# Number of problems to include in a status query
_STATUS_QUERY_SIZE = 100
# Number of worker threads for each problem processing task
_SUBMISSION_THREAD_COUNT = 5
_CANCEL_THREAD_COUNT = 1
_POLL_THREAD_COUNT = 2
_LOAD_THREAD_COUNT = 5
def __init__(self, url=None, token=None, proxies=None, permissive_ssl=False):
"""To setup the connection a pipeline of queues/workers is costructed.
There are five interations with the server the connection manages:
1. Downloading solver information.
2. Submitting problem data.
3. Polling problem status.
4. Downloading problem results.
5. Canceling problems
Loading solver information is done syncronously. The other four tasks are
performed by asyncronous workers. For 2, 3, and 5 the workers gather
togeather tasks into in batches.
"""
# Use configuration from parameters passed, if parts are
# missing, try the configuration function
self.default_solver = None
if token is None:
url, token, proxies, self.default_solver = load_configuration(url)
log.debug("Creating a connection to SAPI server: %s", url)
self.base_url = url
self.token = token
# Create a :mod:`requests` session. `requests` will manage our url parsing, https, etc.
self.session = requests.Session()
self.session.headers.update({'X-Auth-Token': self.token})
self.session.proxies = proxies
if permissive_ssl:
self.session.verify = False
# Build the problem submission queue, start its workers
self._submission_queue = queue.Queue()
self._submission_workers = []
for _ in range(self._SUBMISSION_THREAD_COUNT):
worker = threading.Thread(target=self._do_submit_problems)
worker.daemon = True
worker.start()
# Build the cancel problem queue, start its workers
self._cancel_queue = queue.Queue()
self._cancel_workers = []
for _ in range(self._CANCEL_THREAD_COUNT):
worker = threading.Thread(target=self._do_cancel_problems)
worker.daemon = True
worker.start()
# Build the problem status polling queue, start its workers
self._poll_queue = queue.Queue()
self._poll_workers = []
for _ in range(self._POLL_THREAD_COUNT):
worker = threading.Thread(target=self._do_poll_problems)
worker.daemon = True
worker.start()
# Build the result loading queue, start its workers
self._load_queue = queue.Queue()
self._load_workers = []
for _ in range(self._LOAD_THREAD_COUNT):
worker = threading.Thread(target=self._do_load_results)
worker.daemon = True
worker.start()
# Prepare an empty set of solvers
self.solvers = {}
self._solvers_lock = threading.RLock()
self._all_solvers_ready = False
# Set the parameters for requests; disable SSL verification if needed
self._request_parameters = {}
if permissive_ssl:
self._request_parameters['verify'] = False
def close(self):
"""Perform a clean shutdown.
Wait for all the currently scheduled work to finish, kill the workers,
and close the connection pool. Assumes no one is submitting more work
while the connection is closing.
"""
# Finish all the work that requires the connection
log.debug("Joining submission queue")
self._submission_queue.join()
log.debug("Joining cancel queue")
self._cancel_queue.join()
log.debug("Joining poll queue")
self._poll_queue.join()
log.debug("Joining load queue")
self._load_queue.join()
# Kill off the worker threads, (which should now be blocking on the empty)
[worker.kill() for worker in self._submission_workers]
[worker.kill() for worker in self._cancel_workers]
[worker.kill() for worker in self._poll_workers]
[worker.kill() for worker in self._load_workers]
# Close the connection pool
self.session.close()
def __enter__(self):
"""Let connections be used in with blocks."""
return self
def __exit__(self, *args):
"""At the end of a with block perform a clean shutdown of the connection."""
self.close()
return False
def solver_names(self):
"""List all the solvers this connection can provide, and load the data about the solvers.
To get all solver data: ``GET /solvers/remote/``
Returns:
list of str
"""
with self._solvers_lock:
if self._all_solvers_ready:
return self.solvers.keys()
log.debug("Requesting list of all solver data.")
response = self.session.get(posixpath.join(self.base_url, 'solvers/remote/'))
if response.status_code == 401:
raise SolverAuthenticationError()
response.raise_for_status()
log.debug("Received list of all solver data.")
data = response.json()
for solver in data:
log.debug("Found solver: %s", solver['id'])
self.solvers[solver['id']] = Solver(self, solver)
self._all_solvers_ready = True
return self.solvers.keys()
def get_solver(self, name=None):
"""Load the configuration for a single solver.
To get specific solver data: ``GET /solvers/remote/{solver_name}/``
Args:
name (str): Id of the requested solver. None will return the default solver.
Returns:
:obj:`Solver`
"""
log.debug("Looking for solver: %s", name)
if name is None:
if self.default_solver is not None:
name = self.default_solver
else:
raise ValueError("No name or default name provided when loading solver.")
with self._solvers_lock:
if name not in self.solvers:
if self._all_solvers_ready:
raise KeyError(name)
response = self.session.get(posixpath.join(self.base_url, 'solvers/remote/{}/'.format(name)))
if response.status_code == 401:
raise SolverAuthenticationError()
if response.status_code == 404:
raise KeyError("No solver with the name {} was available".format(name))
response.raise_for_status()
data = json.loads(response.text)
self.solvers[data['id']] = Solver(self, data)
return self.solvers[name]
def _submit(self, body, future):
"""Enqueue a problem for submission to the server.
This method is thread safe.
"""
self._submission_queue.put(self._submit.Message(body, future))
_submit.Message = collections.namedtuple('Message', ['body', 'future'])
def _do_submit_problems(self):
"""Pull problems from the submission queue and submit them.
Note:
This method is always run inside of a daemon thread.
"""
try:
while True:
# Pull as many problems as we can, block on the first one,
# but once we have one problem, switch to non-blocking then
# submit without blocking again.
ready_problems = [self._submission_queue.get()]
while True:
try:
ready_problems.append(self._submission_queue.get_nowait())
except queue.Empty:
break
# Submit the problems
log.debug("submitting {} problems".format(len(ready_problems)))
body = '[' + ','.join(mess.body for mess in ready_problems) + ']'
try:
response = self.session.post(posixpath.join(self.base_url, 'problems/'), body)
if response.status_code == 401:
raise SolverAuthenticationError()
response.raise_for_status()
message = response.json()
log.debug("Finished submitting {} problems".format(len(ready_problems)))
except BaseException as exception:
if not isinstance(exception, SolverAuthenticationError):
exception = IOError(exception)
for mess in ready_problems:
mess.future._set_error(exception, sys.exc_info())
self._submission_queue.task_done()
continue
# Pass on the information
for submission, res in zip(ready_problems, message):
self._handle_problem_status(res, submission.future, False)
self._submission_queue.task_done()
# this is equivalent to a yield to scheduler in other threading libraries
time.sleep(0)
except BaseException as err:
log.exception(err)
def _handle_problem_status(self, message, future, in_poll):
"""Handle the results of a problem submission or results request.
This method checks the status of the problem and puts it in the correct queue.
Args:
message (dict): Update message from the SAPI server wrt. this problem.
future `Future`: future corresponding to the problem
in_poll (bool): Flag set to true if the problem is in the poll loop already.
Returns:
true if the problem has been processed out of the status poll loop
Note:
This method is always run inside of a daemon thread.
"""
try:
status = message['status']
log.debug("Status: %s %s", message['id'], status)
# The future may not have the ID set yet
with future._single_cancel_lock:
# This handles the case where cancel has been called on a future
# before that future recived the problem id
if future._cancel_requested:
if not future._cancel_sent and status == self.STATUS_PENDING:
# The problem has been canceled but the status says its still in queue
# try to cancel it
self._cancel(message['id'], future)
# If a cancel request could meaningfully be sent it has been now
future._cancel_sent = True
# Set the id field in the future
future.id = message['id']
future.remote_status = status
if future.time_received is not None and 'submitted_on' in message and message['submitted_on'] is not None:
future.time_received = datetime.strptime(message['submitted_on'])
if future.time_solved is not None and 'solved_on' in message and message['solved_on'] is not None:
future.time_solved = datetime.strptime(message['solved_on'])
if status == self.STATUS_COMPLETE:
# If the message is complete, forward it to the future object
if 'answer' in message:
future._set_message(message)
# If the problem is complete, but we don't have the result data
# put the problem in the queue for loading results.
else:
self._load(future)
elif status in self.ANY_STATUS_ONGOING:
# If the response is pending add it to the queue.
if not in_poll:
self._poll(future)
return False
elif status == self.STATUS_CANCELLED:
# If canceled return error
future._set_error(CanceledFutureError())
else:
# Return an error to the future object
future._set_error(SolverFailureError(message.get('error_message', 'An unknown error has occurred.')))
except Exception as error:
# If there were any unhandled errors we need to release the
# lock in the future, otherwise deadlock occurs.
future._set_error(error, sys.exc_info())
return True
def _cancel(self, id_, future):
"""Enqueue a problem to be canceled.
This method is thread safe.
"""
self._cancel_queue.put((id_, future))
def _do_cancel_problems(self):
"""Pull ids from the cancel queue and submit them.
Note:
This method is always run inside of a daemon thread.
"""
try:
while True:
# Pull as many problems as we can, block when none are avaialble.
item_list = [self._cancel_queue.get()]
while True:
try:
item_list.append(self._cancel_queue.get_nowait())
except queue.Empty:
break
# Submit the problems, attach the ids as a json list in the
# body of the delete query.
try:
body = [item[0] for item in item_list]
self.session.delete(posixpath.join(self.base_url, 'problems/'), json=body)
except Exception as err:
for _, future in item_list:
if future is not None:
future._set_error(err, sys.exc_info())
# Mark all the ids as processed regardless of success or failure.
[self._cancel_queue.task_done() for _ in item_list]
# this is equivalent to a yield to scheduler in other threading libraries
time.sleep(0)
except Exception as err:
log.exception(err)
def _poll(self, future):
"""Enqueue a problem to poll the server for status.
This method is threadsafe.
"""
self._poll_queue.put(future)
def _do_poll_problems(self):
"""Poll the server for the status of a set of problems.
Note:
This method is always run inside of a daemon thread.
"""
try:
# Maintain an active group of queries
futures = {}
active_queries = set()
# Add a query to the active queries
def add(ftr):
if ftr.id not in futures and not ftr.done():
active_queries.add(ftr.id)
futures[ftr.id] = ftr
else:
self._poll_queue.task_done()
# Remve a query from the active set
def remove(id_):
del futures[id_]
active_queries.remove(id_)
self._poll_queue.task_done()
while True:
try:
# If we have no active queries, wait on the status queue
while len(active_queries) == 0:
add(self._poll_queue.get())
# Once there is any active queries try to fill up the set and move on
while len(active_queries) < self._STATUS_QUERY_SIZE:
add(self._poll_queue.get_nowait())
except queue.Empty:
pass
# Build a query string with block of ids
log.debug("Query on futures: %s", ', '.join(active_queries))
query_string = 'problems/?id=' + ','.join(active_queries)
try:
response = self.session.get(posixpath.join(self.base_url, query_string))
if response.status_code == 401:
raise SolverAuthenticationError()
response.raise_for_status()
message = response.json()
except BaseException as exception:
if not isinstance(exception, SolverAuthenticationError):
exception = IOError(exception)
for id_ in list(active_queries):
futures[id_]._set_error(IOError(exception), sys.exc_info())
remove(id_)
continue
# If problems are removed from the polling by _handle_problem_status
# remove them from the active set
for single_message in message:
if self._handle_problem_status(single_message, futures[single_message['id']], True):
remove(single_message['id'])
# Remove the finished queries
for id_ in list(active_queries):
if futures[id_].done():
remove(id_)
# this is equivalent to a yield to scheduler in other threading libraries
time.sleep(0)
except Exception as err:
log.exception(err)
def _load(self, future):
"""Enqueue a problem to download results from the server.
Args:
future: Future` object corresponding to the query
This method is threadsafe.
"""
self._load_queue.put(future)
def _do_load_results(self):
"""Submit a query asking for the results for a particular problem.
To request the results of a problem: ``GET /problems/{problem_id}/``
Note:
This method is always run inside of a daemon thread.
"""
try:
while True:
# Select a problem
future = self._load_queue.get()
log.debug("Query for results: %s", future.id)
# Submit the query
query_string = 'problems/{}/'.format(future.id)
try:
response = self.session.get(posixpath.join(self.base_url, query_string))
if response.status_code == 401:
raise SolverAuthenticationError()
response.raise_for_status()
message = response.json()
except BaseException as exception:
if not isinstance(exception, SolverAuthenticationError):
exception = IOError(exception)
future._set_error(IOError(exception), sys.exc_info())
continue
# Dispatch the results, mark the task complete
self._handle_problem_status(message, future, False)
self._load_queue.task_done()
# this is equivalent to a yield to scheduler in other threading libraries
time.sleep(0)
except Exception as err:
log.error('Load result error: ' + str(err))
class Solver:
"""
A solver enables sampling from an Ising model.
Get solver objects by calling get_solver(name) on a connection object.
The solver has responsibilty for:
- Encoding problems submitted
- Checking the submitted parameters
- Add problems to the Connection's submission queue
Args:
connection (`Connection`): Connection through which the solver is accessed.
data: Data from the server describing this solver.
"""
# Special flag to notify the system a solver needs access to special hardware
_PARAMETER_ENABLE_HARDWARE = 'use_hardware'
def __init__(self, connection, data):
self.connection = connection
self.id = data['id']
self.data = data
#: When True the solution data will be returned as numpy matrices: False
self.return_matrix = False
# The exact sequence of nodes/edges is used in encoding problems and must be preserved
self._encoding_qubits = data['properties']['qubits']
self._encoding_couplers = [tuple(edge) for edge in data['properties']['couplers']]
#: The nodes in this solver's graph: set(int)
self.nodes = self.variables = set(self._encoding_qubits)
#: The edges in this solver's graph, every edge will be present as (a, b) and (b, a): set(tuple(int, int))
self.edges = self.couplers = set(tuple(edge) for edge in self._encoding_couplers) | \
set((edge[1], edge[0]) for edge in self._encoding_couplers)
#: The edges in this solver's graph, each edge will only be represented once: set(tuple(int, int))
self.undirected_edges = {edge for edge in self.edges if edge[0] < edge[1]}
#: Properties of this solver the server presents: dict
self.properties = data['properties']
#: The set of extra parameters this solver will accept in sample_ising or sample_qubo: dict
self.parameters = self.properties['parameters']
# Create a set of default parameters for the queries
self._params = {}
# As a heuristic to guess if this is a hardware sampler check if
# the 'annealing_time_range' property is set.
if 'annealing_time_range' in data['properties']:
self._params[self._PARAMETER_ENABLE_HARDWARE] = True
def sample_ising(self, linear, quadratic, **params):
"""Draw samples from the provided Ising model.
To submit a problem: ``POST /problems/``
Args:
linear (list/dict): Linear terms of the model (h).
quadratic (dict of (int, int):float): Quadratic terms of the model (J).
**params: Parameters for the sampling method, specified per solver.
Returns:
:obj:`Future`
"""
# Our linear and quadratic objective terms are already separated in an
# ising model so we can just directly call `_sample`.
return self._sample('ising', linear, quadratic, params)
def sample_qubo(self, qubo, **params):
"""Draw samples from the provided QUBO.
To submit a problem: ``POST /problems/``
Args:
qubo (dict of (int, int):float): Terms of the model.
**params: Parameters for the sampling method, specified per solver.
Returns:
:obj:`Future`
"""
# In a QUBO the linear and quadratic terms in the objective are mixed into
# a matrix. For the sake of encoding, we will separate them before calling `_sample`
linear = {i1: v for (i1, i2), v in _uniform_iterator(qubo) if i1 == i2}
quadratic = {(i1, i2): v for (i1, i2), v in _uniform_iterator(qubo) if i1 != i2}
return self._sample('qubo', linear, quadratic, params)
def _sample(self, type_, linear, quadratic, params, reuse_future=None):
"""Internal method for both sample_ising and sample_qubo.
Args:
linear (list/dict): Linear terms of the model.
quadratic (dict of (int, int):float): Quadratic terms of the model.
**params: Parameters for the sampling method, specified per solver.
Returns:
:obj: `Future`
"""
# Check the problem
if not self.check_problem(linear, quadratic):
raise ValueError("Problem graph incompatible with solver.")
# Mix the new parameters with the default parameters
combined_params = dict(self._params)
combined_params.update(params)
# Check the parameters before submitting
for key in combined_params:
if key not in self.parameters and key != self._PARAMETER_ENABLE_HARDWARE:
raise KeyError("{} is not a parameter of this solver.".format(key))
# Encode the problem, use the newer format
data = self._base64_format(self, linear, quadratic)
# data = self._text_format(solver, lin, quad)
body = json.dumps({
'solver': self.id,
'data': data,
'type': type_,
'params': params
})
# Construct where we will put the result when we finish, submit the query
if reuse_future is not None:
future = reuse_future
future.__init__(self, None, self.return_matrix, (type_, linear, quadratic, params))
else:
future = Future(self, None, self.return_matrix, (type_, linear, quadratic, params))
log.debug("Submitting new problem to: %s", self.id)
self.connection._submit(body, future)
return future
def check_problem(self, linear, quadratic):
"""Test if an Ising model matches the graph provided by the solver.
Args:
linear (list/dict): Linear terms of the model (h).
quadratic (dict of (int, int):float): Quadratic terms of the model (J).
Returns:
boolean
"""
for key, value in _uniform_iterator(linear):
if value != 0 and key not in self.nodes:
return False
for key, value in _uniform_iterator(quadratic):
if value != 0 and tuple(key) not in self.edges:
return False
return True
def retrieve_problem(self, id_):
"""Resume polling for a problem previously submitted.
Args:
id_: Identification of the query.
Returns:
:obj: `Future`
"""
future = Future(self, id_, self.return_matrix, None)
self.connection._poll(future)
return future
def _text_format(self, solver, lin, quad):
"""Perform the legacy problem encoding.
Deprecated encoding method; included only for reference.
Args:
solver: solver requested.
lin: linear terms of the model.
quad: Quadratic terms of the model.
Returns:
data: text formatted problem
"""
data = ''
counter = 0
for index, value in _uniform_iterator(lin):
if value != 0:
data = data + '{} {} {}\n'.format(index, index, value)
counter += 1
for (index1, index2), value in six.iteritems(quad):
if value != 0:
data = data + '{} {} {}\n'.format(index1, index2, value)
counter += 1
data = '{} {}\n'.format(max(solver.nodes) + 1, counter) + data
return data
def _base64_format(self, solver, lin, quad):
"""Encode the problem for submission to a given solver.
Args:
solver: solver requested.
lin: linear terms of the model.
quad: Quadratic terms of the model.
Returns:
encoded submission dictionary
"""
# Encode linear terms. The coefficients of the linear terms of the objective
# are encoded as an array of little endian 64 bit doubles.
# This array is then base64 encoded into a string safe for json.
# The order of the terms is determined by the _encoding_qubits property
# specified by the server.
lin = [_uniform_get(lin, qubit, 0) for qubit in solver._encoding_qubits]
lin = base64.b64encode(struct.pack('<' + ('d' * len(lin)), *lin))
# Encode the coefficients of the quadratic terms of the objective
# in the same manner as the linear terms, in the order given by the
# _encoding_couplers property
quad = [quad.get(edge, 0) + quad.get((edge[1], edge[0]), 0)
for edge in solver._encoding_couplers]
quad = base64.b64encode(struct.pack('<' + ('d' * len(quad)), *quad))
# The name for this encoding is 'qp' and is explicitly included in the
# message for easier extension in the future.
return {
'format': 'qp',
'lin': lin.decode('utf-8'),
'quad': quad.decode('utf-8')
}
class Future:
"""An object for a pending SAPI call.
Waits for a request to complete and parses the message returned.
The future will be block to resolve when any data value is accessed.
The method :meth:`done` can be used to query for resolution without blocking.
:meth:`wait`, and :meth:`wait_multiple` can be used to block for a variable
number of jobs for a given ammount of time.
Note:
Only constructed by :obj:`Solver` objects.
Args:
solver: The solver that is going to fulfil this future.
id_: Identification of the query we are waiting for. (May be None and filled in later.)
return_matrix: Request return values as numpy matrices.
"""
def __init__(self, solver, id_, return_matrix, submission_data):
self.solver = solver
# Store the query data in case the problem needs to be resubmitted
self._submission_data = submission_data
# Has the client tried to cancel this job
self._cancel_requested = False
self._cancel_sent = False
self._single_cancel_lock = threading.Lock() # Make sure we only call cancel once
# Should the results be decoded as python lists or numpy matrices
if return_matrix and not _numpy:
raise ValueError("Matrix result requested without numpy.")
self.return_matrix = return_matrix
#: The id the server will use to identify this problem, None until the id is actually known
self.id = id_
#: `datetime` corriesponding to the time when the problem was accepted by the server (None before then)
self.time_received = None
#: `datetime` corriesponding to the time when the problem was completed by the server (None before then)
self.time_solved = None
#: `datetime` corriesponding to the time when the problem was completed by the server (None before then)
self.time_solved = None
# Track how long it took us to parse the data
self.parse_time = None
# Data from the server before it is parsed
self._message = None
#: Status flag most recently returned by the server
self.remote_status = None
# Data from the server after it is parsed (either data or an error)
self._result = None
self.error = None
# Event(s) to signal when the results are ready
self._results_ready_event = threading.Event()
self._other_events = []
def _set_message(self, message):
"""Complete the future with a message from the server.
The message from the server may actually be an error.
Args:
message (dict): Data from the server from trying to complete query.
"""
self._message = message
self._signal_ready()
def _set_error(self, error, exc_info=None):
"""Complete the future with an error.
Args:
error: An error string or exception object.
exc_info: Stack trace info from sys module for reraising exceptions nicely.
"""
self.error = error
self._exc_info = exc_info
self._signal_ready()
def _signal_ready(self):
"""Signal all the events waiting on this future."""
self._results_ready_event.set()
[ev.set() for ev in self._other_events]
def _add_event(self, event):
"""Add an event to be signaled after this event completes."""
self._other_events.append(event)
if self.done():
event.set()
def _remove_event(self, event):
"""Remove a completion event from this future."""
self._other_events.remove(event)
@staticmethod
def wait_multiple(futures, min_done=None, timeout=float('inf')):
"""Wait for multiple Future objects to complete.
Python doesn't provide a multi-wait, but we can jury rig something reasonably
efficent using an event object.
Args:
futures (list of Future): list of objects to wait on
min_done (int): Stop waiting when this many results are ready
timeout (float): Maximum number of seconds to wait
Returns:
boolean: True if the minimum number of results have been reached.
"""
if min_done is None:
min_done = len(futures)
# Track the exit conditions
finish = time.time() + timeout
done = 0
# Keep track of what futures havn't finished
remaining = list(futures)
# Insert our event into all the futures
event = threading.Event()
[f._add_event(event) for f in remaining]
# Check the exit conditions
while done < min_done and finish > time.time():
# Prepare to wait on any of the jobs finishing
event.clear()
# Check if any of the jobs have finished. After the clear just in
# case one finished and we erased the signal it by calling clear above
finished_futures = {f for f in remaining if f.done()}
if len(finished_futures) > 0:
# If we did make a mistake reseting the event, undo that now
# so that we double check the finished list before a wait blocks
event.set()
# Update our exit conditions
done += len(finished_futures)