Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/deepquantum/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from . import channel
from . import circuit
from . import communication
from . import cutting
from . import distributed
from . import gate
from . import layer
from . import operation
from . import optimizer
from . import qmath
from . import qpd
from . import state
from . import utils

Expand Down
179 changes: 129 additions & 50 deletions src/deepquantum/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from copy import copy, deepcopy
from collections import defaultdict
from collections.abc import Sequence, Hashable
from itertools import product
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING

Expand All @@ -15,15 +17,17 @@
from .adjoint import AdjointExpectation
from .channel import BitFlip, PhaseFlip, Depolarizing, Pauli, AmplitudeDamping, PhaseDamping
from .channel import GeneralizedAmplitudeDamping
from .cutting import transform_cut2move, partition_problem
from .distributed import measure_dist
from .gate import ParametricSingleGate
from .gate import U3Gate, PhaseShift, PauliX, PauliY, PauliZ, Hadamard, SGate, SDaggerGate, TGate, TDaggerGate
from .gate import Rx, Ry, Rz, ProjectionJ, CNOT, Swap, Rxx, Ryy, Rzz, Rxy, ReconfigurableBeamSplitter, Toffoli, Fredkin
from .gate import CombinedSingleGate, UAnyGate, LatentGate, HamiltonianGate, Barrier
from .gate import CombinedSingleGate, UAnyGate, LatentGate, HamiltonianGate, Reset, Barrier, WireCut, Move
from .layer import Observable, U3Layer, XLayer, YLayer, ZLayer, HLayer, RxLayer, RyLayer, RzLayer, CnotLayer, CnotRing
from .operation import Operation, Gate, Layer, Channel
from .operation import Operation, Gate, Layer, Channel, MeasureQPD
from .qmath import amplitude_encoding, measure, expectation, sample_sc_mcmc, sample2expval
from .qmath import slice_state_vector, inner_product_mps, get_prob_mps
from .qpd import SingleGateQPD
from .state import QubitState, MatrixProductState, DistributedQubitState

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,6 +77,7 @@ def __init__(
self.state = None
self.ndata = 0
self.depth = np.array([0] * nqubit)
self._cut_lst = [] # [(index of cutting, wire of cutting), ...]
self.wires_measure = []
self.wires_condition = []
# MBQC
Expand Down Expand Up @@ -107,12 +112,15 @@ def __add__(self, rhs: 'QubitCircuit') -> 'QubitCircuit':
assert self.nqubit == rhs.nqubit
cir = QubitCircuit(nqubit=self.nqubit, init_state=self.init_state, name=self.name, den_mat=self.den_mat,
reupload=self.reupload, mps=self.mps, chi=self.chi)
shift = len(self.operators)
cir.operators = self.operators + rhs.operators
cir.encoders = self.encoders + rhs.encoders
cir.observables = rhs.observables
cir.npara = self.npara + rhs.npara
cir.ndata = self.ndata + rhs.ndata
cir.depth = self.depth + rhs.depth
for idx, wire in rhs._cut_lst:
cir._cut_lst.append((idx + shift, wire))
cir.wires_measure = rhs.wires_measure
cir.wires_condition += rhs.wires_condition
cir.wires_condition = list(set(cir.wires_condition))
Expand Down Expand Up @@ -246,7 +254,7 @@ def init_encoder(self) -> None: # deal with the problem of state_dict() with vma
for op in self.encoders:
op.init_para()

def reset(self, init_state: Any = 'zeros') -> None:
def reset_circuit(self, init_state: Any = 'zeros') -> None:
"""Reset the ``QubitCircuit`` according to ``init_state``."""
self.set_init_state(init_state)
self.operators = nn.Sequential()
Expand All @@ -256,6 +264,7 @@ def reset(self, init_state: Any = 'zeros') -> None:
self.npara = 0
self.ndata = 0
self.depth = np.array([0] * self.nqubit)
self._cut_lst = []
self.wires_measure = []
self.wires_condition = []

Expand Down Expand Up @@ -492,15 +501,15 @@ def inverse(self, encode: bool = False) -> 'QubitCircuit':
op_inv = op
else:
op_inv = op.inverse()
cir.operators.append(op_inv)
cir.add(op_inv)
if encode and op in self.encoders:
cir.encoders.append(op_inv)
cir.depth = self.depth
cir.npara = self.npara
cir.wires_condition = self.wires_condition
if encode:
cir.npara = self.npara
cir.ndata = self.ndata
else:
cir.npara = self.npara + self.ndata
cir.ndata = 0
return cir

Expand Down Expand Up @@ -640,6 +649,88 @@ def _update_pattern(self, pattern: 'Pattern', gate: Gate, node_next: int, encode
self.wire2node_dict[wire] = node
return pattern

def transform_cut2move(self) -> 'QubitCircuit':
"""Transform ``WireCut`` to ``Move`` and expand the observables accordingly."""
operators = deepcopy(self.operators)
if len(self.observables) == 0:
observables = None
else:
observables = deepcopy(self.observables)
operators, observables = transform_cut2move(operators, self._cut_lst, observables, False)
cir = QubitCircuit(operators[0].nqubit, den_mat=self.den_mat, reupload=self.reupload,
mps=self.mps, chi=self.chi, shots=self.shots)
for op in operators:
cir.add(op)
if observables is not None:
cir.observables = observables
return cir

def get_subexperiments(self, qubit_labels: Optional[Sequence[Hashable]] = None) -> Tuple[Dict, List[float]]:
"""Generate cutting subexperiments and their associated coefficients."""
operators = deepcopy(self.operators)
if len(self.observables) == 0:
observables = None
else:
observables = deepcopy(self.observables)
operators, observables = transform_cut2move(operators, self._cut_lst, observables, True)
label2sub_dict, label2obs_dict = partition_problem(operators, qubit_labels, observables)
label2qpd_dict = defaultdict(list) # {label: [idx, ...]}
gate_label_lst = []
gate_coeff_lst = []
nbasis_lst = []
for label, sub_ops in label2sub_dict.items():
for i, op in enumerate(sub_ops):
if isinstance(op, SingleGateQPD):
label2qpd_dict[label].append(i)
if op.label is not None and op.label not in gate_label_lst:
gate_label_lst.append(op.label)
gate_coeff_lst.append(op.coeffs)
nbasis_lst.append(len(op.bases))
indices = sorted(range(len(gate_label_lst)), key=lambda i: gate_label_lst[i])
gate_label_lst_sorted = [gate_label_lst[i] for i in indices]
gate_coeff_lst_sorted = [gate_coeff_lst[i] for i in indices]
nbasis_lst_sorted = [nbasis_lst[i] for i in indices]
ranges = [range(0, nbasis) for nbasis in nbasis_lst_sorted]
subexperiments = defaultdict(list)
coefficients = []
for combination in product(*ranges):
for label, sub_ops in label2sub_dict.items():
nqubit = sub_ops[0].nqubit
cir = QubitCircuit(nqubit, den_mat=self.den_mat, reupload=self.reupload, mps=self.mps, chi=self.chi,
shots=self.shots)
if observables is not None:
obs = nn.ModuleList()
for ob in label2obs_dict[label]:
new_ob = Observable(ob.nqubit, [], den_mat=self.den_mat)
new_ob.wires = ob.wires
new_ob.basis = ob.basis
new_ob.gates.extend(ob.gates)
obs.append(new_ob)
for op in sub_ops:
if isinstance(op, SingleGateQPD):
for i, idx in enumerate(combination):
if op.label == gate_label_lst_sorted[i]:
for ops in op.bases[idx]:
for gate in ops:
cir.add(gate)
if observables is not None and len(op.bases[idx][0]) > 0:
if isinstance(op.bases[idx][0][-1], MeasureQPD):
pauliz = PauliZ(nqubit, op.wires, den_mat=op.den_mat, tsr_mode=True)
for new_ob in obs:
new_ob.wires = new_ob.wires + [op.wires]
new_ob.basis = new_ob.basis + 'z'
new_ob.gates.append(pauliz)
else:
cir.add(op)
if observables is not None:
cir.observables = obs
subexperiments[label].append(cir)
coeff = 1.
for i, idx in enumerate(combination):
coeff *= gate_coeff_lst_sorted[i][idx]
coefficients.append(coeff)
return subexperiments, coefficients

def draw(self, output: str = 'mpl', **kwargs):
"""Visualize the quantum circuit."""
qc = QuantumCircuit.from_qasm_str(self._qasm())
Expand Down Expand Up @@ -686,29 +777,36 @@ def add(
op.controls = controls
if isinstance(op, QubitCircuit):
assert self.nqubit == op.nqubit
shift = len(self.operators)
self.operators += op.operators
self.encoders += op.encoders
self.observables = op.observables
self.npara += op.npara
self.ndata += op.ndata
self.depth += op.depth
for idx, wire in op._cut_lst:
self._cut_lst.append((idx + shift, wire))
self.wires_measure = op.wires_measure
self.wires_condition += op.wires_condition
self.wires_condition = list(set(self.wires_condition))
else:
op.tsr_mode = True
self.operators.append(op)
if isinstance(op, Gate):
if isinstance(op, WireCut):
self._cut_lst.append((len(self.operators), op.wires[0]))
self.operators.append(op)
for i in op.wires + op.controls:
self.depth[i] += 1
if op.condition:
self.wires_condition += op.controls
self.wires_condition = list(set(self.wires_condition))
elif isinstance(op, Layer):
self.operators.extend(op.gates)
for wire in op.wires:
for i in wire:
self.depth[i] += 1
# elif isinstance(op, Channel):
elif isinstance(op, Channel):
self.operators.append(op)
# for i in op.wires:
# self.depth[i] += 1
if encode:
Expand Down Expand Up @@ -1202,12 +1300,7 @@ def cnot_ring(self, minmax: Optional[List[int]] = None, step: int = 1, reverse:
cxr = CnotRing(nqubit=self.nqubit, minmax=minmax, step=step, reverse=reverse, den_mat=self.den_mat)
self.add(cxr)

def bit_flip(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def bit_flip(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a bit-flip channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1216,12 +1309,7 @@ def bit_flip(
bf = BitFlip(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(bf, encode=encode)

def phase_flip(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def phase_flip(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a phase-flip channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1230,12 +1318,7 @@ def phase_flip(
pf = PhaseFlip(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(pf, encode=encode)

def depolarizing(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def depolarizing(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a depolarizing channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1244,12 +1327,7 @@ def depolarizing(
dp = Depolarizing(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(dp, encode=encode)

def pauli(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def pauli(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a Pauli channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1258,12 +1336,7 @@ def pauli(
p = Pauli(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(p, encode=encode)

def amp_damp(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def amp_damp(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add an amplitude-damping channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1272,12 +1345,7 @@ def amp_damp(
ad = AmplitudeDamping(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(ad, encode=encode)

def phase_damp(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def phase_damp(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a phase-damping channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1286,12 +1354,7 @@ def phase_damp(
pd = PhaseDamping(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(pd, encode=encode)

def gen_amp_damp(
self,
wires: int,
inputs: Any = None,
encode: bool = False
) -> None:
def gen_amp_damp(self, wires: int, inputs: Any = None, encode: bool = False) -> None:
"""Add a generalized amplitude-damping channel."""
assert self.den_mat
requires_grad = not encode
Expand All @@ -1300,11 +1363,27 @@ def gen_amp_damp(
gad = GeneralizedAmplitudeDamping(inputs=inputs, nqubit=self.nqubit, wires=wires, requires_grad=requires_grad)
self.add(gad, encode=encode)

def reset(self, wires: Union[int, List[int], None] = None, postselect: Optional[int] = 0) -> None:
"""Add a reset operation."""
assert not self.den_mat and not self.mps, 'Currently NOT supported'
rs = Reset(nqubit=self.nqubit, wires=wires, postselect=postselect)
self.add(rs)

def barrier(self, wires: Union[int, List[int], None] = None) -> None:
"""Add a barrier."""
br = Barrier(nqubit=self.nqubit, wires=wires)
self.add(br)

def cut(self, wires: Union[int, List[int]]) -> None:
"""Add a wire-cut operation."""
wc = WireCut(nqubit=self.nqubit, wires=wires)
self.add(wc)

def move(self, wire1: int, wire2: int, postselect: Optional[int] = 0) -> None:
"""Add a move operation."""
mv = Move(nqubit=self.nqubit, wires=[wire1, wire2], postselect=postselect)
self.add(mv)


class DistributedQubitCircuit(QubitCircuit):
"""Quantum circuit for a distributed state vector.
Expand Down
Loading