-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
utils.py
295 lines (253 loc) · 12.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2018.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Utility functions for working with Results."""
from typing import Sequence, Union, Optional, Dict, List
from collections import Counter
from copy import deepcopy
import numpy as np
from qiskit.exceptions import QiskitError
from qiskit.result.result import Result
from qiskit.result.counts import Counts
from qiskit.result.distributions.probability import ProbDistribution
from qiskit.result.distributions.quasi import QuasiDistribution
from qiskit.result.postprocess import _bin_to_hex
from qiskit._accelerate import results as results_rs # pylint: disable=no-name-in-module
def marginal_counts(
result: Union[dict, Result],
indices: Optional[List[int]] = None,
inplace: bool = False,
format_marginal: bool = False,
marginalize_memory: Optional[bool] = True,
) -> Union[Dict[str, int], Result]:
"""Marginalize counts from an experiment over some indices of interest.
Args:
result: result to be marginalized
(a Result object or a dict(str, int) of counts).
indices: The bit positions of interest
to marginalize over. If ``None`` (default), do not marginalize at all.
inplace: Default: False. Operates on the original Result
argument if True, leading to loss of original Job Result.
It has no effect if ``result`` is a dict.
format_marginal: Default: False. If True, takes the output of
marginalize and formats it with placeholders between cregs and
for non-indices.
marginalize_memory: If True, then also marginalize the memory field (if present).
If False, remove the memory field from the result.
If None, leave the memory field as is.
Returns:
Result or dict(str, int): A Result object or a dictionary with
the observed counts, marginalized to only account for frequency
of observations of bits of interest.
Raises:
QiskitError: in case of invalid indices to marginalize over.
"""
if isinstance(result, Result):
if not inplace:
result = deepcopy(result)
for i, experiment_result in enumerate(result.results):
counts = result.get_counts(i)
new_counts = _marginalize(counts, indices)
new_counts_hex = {}
for k, v in new_counts.items():
new_counts_hex[_bin_to_hex(k)] = v
experiment_result.data.counts = new_counts_hex
if indices is not None:
experiment_result.header.memory_slots = len(indices)
csize = getattr(experiment_result.header, "creg_sizes", None)
if csize is not None:
experiment_result.header.creg_sizes = _adjust_creg_sizes(csize, indices)
if getattr(experiment_result.data, "memory", None) is not None and indices is not None:
if marginalize_memory is False:
delattr(experiment_result.data, "memory")
elif marginalize_memory is None:
pass # leave as is
else:
sorted_indices = sorted(
indices, reverse=True
) # same convention as for the counts
experiment_result.data.memory = results_rs.marginal_memory(
experiment_result.data.memory, sorted_indices, return_hex=True
)
return result
else:
marg_counts = _marginalize(result, indices)
if format_marginal and indices is not None:
marg_counts = _format_marginal(result, marg_counts, indices)
return marg_counts
def _adjust_creg_sizes(creg_sizes, indices):
"""Helper to reduce creg_sizes to match indices"""
# Zero out creg_sizes list
new_creg_sizes = [[creg[0], 0] for creg in creg_sizes]
indices_sort = sorted(indices)
# Get creg num values and then convert to the cumulative last index per creg.
# e.g. [2, 1, 3] => [1, 2, 5]
creg_nums = [x for _, x in creg_sizes]
creg_limits = [sum(creg_nums[0:x:1]) - 1 for x in range(0, len(creg_nums) + 1)][1:]
# Now iterate over indices and find which creg that index is in.
# When found increment the creg size
creg_idx = 0
for ind in indices_sort:
for idx in range(creg_idx, len(creg_limits)):
if ind <= creg_limits[idx]:
creg_idx = idx
new_creg_sizes[idx][1] += 1
break
# Throw away any cregs with 0 size
new_creg_sizes = [creg for creg in new_creg_sizes if creg[1] != 0]
return new_creg_sizes
def marginal_memory(
memory: Union[List[str], np.ndarray],
indices: Optional[List[int]] = None,
int_return: bool = False,
hex_return: bool = False,
avg_data: bool = False,
parallel_threshold: int = 1000,
) -> Union[List[str], np.ndarray]:
"""Marginalize shot memory
This function is multithreaded and will launch a thread pool with threads equal to the number
of CPUs by default. You can tune the number of threads with the ``RAYON_NUM_THREADS``
environment variable. For example, setting ``RAYON_NUM_THREADS=4`` would limit the thread pool
to 4 threads.
Args:
memory: The input memory list, this is either a list of hexadecimal strings to be marginalized
representing measure level 2 memory or a numpy array representing level 0 measurement
memory (single or avg) or level 1 measurement memory (single or avg).
indices: The bit positions of interest to marginalize over. If
``None`` (default), do not marginalize at all.
int_return: If set to ``True`` the output will be a list of integers.
By default the return type is a bit string. This and ``hex_return``
are mutually exclusive and can not be specified at the same time. This option only has an
effect with memory level 2.
hex_return: If set to ``True`` the output will be a list of hexadecimal
strings. By default the return type is a bit string. This and
``int_return`` are mutually exclusive and can not be specified
at the same time. This option only has an effect with memory level 2.
avg_data: If a 2 dimensional numpy array is passed in for ``memory`` this can be set to
``True`` to indicate it's a avg level 0 data instead of level 1
single data.
parallel_threshold: The number of elements in ``memory`` to start running in multiple
threads. If ``len(memory)`` is >= this value, the function will run in multiple
threads. By default this is set to 1000.
Returns:
marginal_memory: The list of marginalized memory
Raises:
ValueError: if both ``int_return`` and ``hex_return`` are set to ``True``
"""
if int_return and hex_return:
raise ValueError("Either int_return or hex_return can be specified but not both")
if isinstance(memory, np.ndarray):
if int_return:
raise ValueError("int_return option only works with memory list input")
if hex_return:
raise ValueError("hex_return option only works with memory list input")
if indices is None:
return memory.copy()
if memory.ndim == 1:
return results_rs.marginal_measure_level_1_avg(memory, indices)
if memory.ndim == 2:
if avg_data:
return results_rs.marginal_measure_level_0_avg(memory, indices)
else:
return results_rs.marginal_measure_level_1(memory, indices)
if memory.ndim == 3:
return results_rs.marginal_measure_level_0(memory, indices)
raise ValueError("Invalid input memory array")
return results_rs.marginal_memory(
memory,
indices,
return_int=int_return,
return_hex=hex_return,
parallel_threshold=parallel_threshold,
)
def marginal_distribution(
counts: dict,
indices: Optional[Sequence[int]] = None,
format_marginal: bool = False,
) -> Dict[str, int]:
"""Marginalize counts from an experiment over some indices of interest.
Unlike :func:`~.marginal_counts` this function respects the order of
the input ``indices``. If the input ``indices`` list is specified then the order
the bit indices are specified will be the output order of the bitstrings
in the marginalized output.
Args:
counts: result to be marginalized
indices: The bit positions of interest
to marginalize over. If ``None`` (default), do not marginalize at all.
format_marginal: Default: False. If True, takes the output of
marginalize and formats it with placeholders between cregs and
for non-indices.
Returns:
dict(str, int): A marginalized dictionary
Raises:
QiskitError: If any value in ``indices`` is invalid or the ``counts`` dict
is invalid.
"""
num_clbits = len(max(counts.keys()).replace(" ", ""))
if indices is not None and (len(indices) == 0 or not set(indices).issubset(range(num_clbits))):
raise QiskitError(f"indices must be in range [0, {num_clbits - 1}].")
if isinstance(counts, Counts):
res = results_rs.marginal_counts(counts, indices)
elif isinstance(counts, (ProbDistribution, QuasiDistribution)):
res = results_rs.marginal_distribution(counts, indices)
else:
first_value = next(iter(counts.values()))
if isinstance(first_value, (int, np.integer)):
res = results_rs.marginal_counts(counts, indices)
elif isinstance(first_value, (float, np.floating)):
res = results_rs.marginal_distribution(counts, indices)
else:
raise QiskitError("Values of counts must be an int or float")
if format_marginal and indices is not None:
return _format_marginal(counts, res, indices)
return res
def _marginalize(counts, indices=None):
"""Get the marginal counts for the given set of indices"""
num_clbits = len(next(iter(counts)).replace(" ", ""))
# Check if we do not need to marginalize and if so, trim
# whitespace and '_' and return
if (indices is None) or set(range(num_clbits)) == set(indices):
ret = {}
for key, val in counts.items():
key = _remove_space_underscore(key)
ret[key] = val
return ret
if not indices or not set(indices).issubset(set(range(num_clbits))):
raise QiskitError(f"indices must be in range [0, {num_clbits - 1}].")
# Sort the indices to keep in descending order
# Since bitstrings have qubit-0 as least significant bit
indices = sorted(indices, reverse=True)
# Build the return list
new_counts = Counter()
for key, val in counts.items():
new_key = "".join([_remove_space_underscore(key)[-idx - 1] for idx in indices])
new_counts[new_key] += val
return dict(new_counts)
def _format_marginal(counts, marg_counts, indices):
"""Take the output of marginalize and add placeholders for
multiple cregs and non-indices."""
format_counts = {}
counts_template = next(iter(counts))
counts_len = len(counts_template.replace(" ", ""))
indices_rev = sorted(indices, reverse=True)
for count in marg_counts:
index_dict = dict(zip(indices_rev, count))
count_bits = "".join(
[index_dict[index] if index in index_dict else "_" for index in range(counts_len)]
)[::-1]
for index, bit in enumerate(counts_template):
if bit == " ":
count_bits = count_bits[:index] + " " + count_bits[index:]
format_counts[count_bits] = marg_counts[count]
return format_counts
def _remove_space_underscore(bitstring):
"""Removes all spaces and underscores from bitstring"""
return bitstring.replace(" ", "").replace("_", "")