In [None]:
from qiskit import QuantumCircuit, transpile, assemble
from qiskit.quantum_info import Statevector, DensityMatrix, Pauli, Operator, random_statevector, partial_trace, random_clifford
from qiskit_aer import AerSimulator, Aer
import numpy as np
from scipy.linalg import sqrtm
import cvxpy as cp
import itertools
from IPython.display import clear_output
from winsound import Beep


# 0. General Definitions

In [None]:
import random

def alias_setup(probabilities):
    n = len(probabilities)
    prob = [0] * n
    alias = [0] * n

    # Normalize the probabilities
    scaled_prob = [p * n for p in probabilities]

    # Create two lists to track small and large probabilities
    small = []
    large = []

    for i, sp in enumerate(scaled_prob):
        if sp < 1.0:
            small.append(i)
        else:
            large.append(i)

    # Process the small and large lists
    while small and large:
        small_index = small.pop()
        large_index = large.pop()

        prob[small_index] = scaled_prob[small_index]
        alias[small_index] = large_index

        scaled_prob[large_index] = (scaled_prob[large_index] + scaled_prob[small_index]) - 1.0

        if scaled_prob[large_index] < 1.0:
            small.append(large_index)
        else:
            large.append(large_index)

    # Fill remaining probabilities
    for i in large:
        prob[i] = 1.0
    for i in small:
        prob[i] = 1.0

    return prob, alias

def alias_sample(prob, alias):
    n = len(prob)
    i = random.randint(0, n - 1)
    r = random.random()
    if r < prob[i]:
        return i
    else:
        return alias[i]

# Example usage
probabilities = [0.1, 0.3, 0.4, 0.2]
prob, alias = alias_setup(probabilities)

# Sample from the distribution
sampled_index = alias_sample(prob, alias)
print(sampled_index)



In [None]:
def tensor_prod(*tensors):
    if len(tensors) == 2:
        return np.kron(tensors[0], tensors[1])
    else:
        return np.kron(tensors[0], tensor_prod(*tensors[1:]))
    
def hermitian(matrix):
    return np.allclose(matrix, matrix.conj().T)

def trace_one(matrix):
    return np.isclose(np.trace(matrix), 1)

def positive_semi_definite(matrix, tol=1e-8):
    return np.all(np.linalg.eigvals(matrix) + tol >= 0)

def is_legal(matrix):
    return hermitian(matrix) and trace_one(matrix) and positive_semi_definite(matrix)

def int_to_bin_list(n, length):
    bin_list = np.zeros(length)
    bin_list[n] = 1
    return bin_list

def expand_to_tensor_product(array):
    index = np.argmax(array)
    n = int(np.log2(len(array)))
    binary_string = format(index, f'0{n}b')
    tensor_product = []
    for bit in binary_string:
        if bit == '0':
            tensor_product.append(np.array([1, 0]))
        else:
            tensor_product.append(np.array([0, 1]))
    return tensor_product

def single_sample(prob_list):
    assert np.isclose(sum(prob_list), 1), "probability does not sum up to 1"
    # rd = np.random.random()
    # inf, sup = 0, 0
    # for i, e in enumerate(prob_list):
    #     sup += e
    #     if inf <= rd <= sup:
    #         return i
    #     else:
    #         inf = sup
    # raise ValueError("random value does not meet any interval")
    return alias_sample(*alias_setup(prob_list))

def split_and_calculate_mean(values, group_size):
    groups = [values[i:i + group_size] for i in range(0, len(values), group_size)]
    means = [np.sum(group, axis=0) / len(group) for group in groups]
    return means


# 1. Classical Shadow

In [None]:
class QuantumState():
    def __init__(self, num_qubits:int, num_shots:int|list, batch_size:int|list, pauli_observables:list, purity:bool, veri:bool, meas:str):
        assert meas in ['Clifford', 'Pauli', 'direct'], 'unrecognized measurement pattern, use either Clifford, Pauli, or direct'
        self._num_qubits = num_qubits
        self._observables = pauli_observables
        self._compute_purity = purity
        self._batch_size = batch_size
        self._num_shots = num_shots
        self._veri = veri
        self._meas = meas
        self._dm = None
        self._entangled = None
        
    @property
    def dm(self):
        return self._dm
    
    @dm.setter
    def dm(self, new_dm):
        if not (self._veri or is_legal(new_dm)):
            raise ValueError("density matrix is not physical")
        else:
            self._dm = new_dm
    
    def set_dm(self):
        raise NotImplementedError("without information to construct density matrix")
    
    def random_evolve(self):
        if self._meas == 'Clifford':
            self._U = random_clifford(self._num_qubits).to_matrix()
            self._dm = self._U @ self.dm @ np.conj(self._U).T
        elif self._meas == 'Pauli':
            self._U = [random_clifford(1).to_matrix() for _ in range(self._num_qubits)]
            self._dm = tensor_prod(*self._U) @ self.dm @ np.conj(tensor_prod(*self._U)).T
    
    def single_shot_measure(self):
        prob_list = [self._dm[i, i] for i in range(2 ** self._num_qubits)]
        single_shot_state = int_to_bin_list(single_sample(prob_list), 2 ** self._num_qubits)
        del self._dm
        if self._meas == 'Clifford':
            self._state = single_shot_state
        elif self._meas == 'Pauli':
            self._state = expand_to_tensor_product(single_shot_state)
    
    def reconstruct_dm(self):
        dim = 2 ** self._num_qubits
        if self._meas == 'Clifford':
            return (dim + 1) * (np.conj(self._U).T @ np.outer(self._state, self._state) @ self._U) - np.eye(dim)
        elif self._meas == 'Pauli':
            return tensor_prod(*[3 * (np.conj(single_U).T @ np.outer(single_state, single_state) @ single_U) - np.eye(2) 
                                 for single_U, single_state in zip(self._U, self._state)])

    # def classical_shadow(self):
    #     shadows = {obs: [] for obs in self._observables}
    #     temp_shadows = {obs: [] for obs in self._observables}
    #     dm_copy = self._dm
    #     for _ in range(self._num_shots // self._batch_size):
    #         for _ in range(self._batch_size):
    #             self._dm = dm_copy
    #             self.random_evolve()
    #             self.single_shot_measure()
    #             rdm = self.reconstruct_dm()
    #             for k, v in temp_shadows.items():
    #                 v.append(np.trace(Pauli(k).to_matrix() @ rdm))
    #         for k, v in shadows.items():
    #             v.append(np.mean(temp_shadows[k]))
    #         temp_shadows = {obs: [] for obs in self._observables}
    #     del temp_shadows
    #     return {k: np.median(v) for k, v in shadows.items()}
    
    def classical_shadow(self):
        assert self._meas in ['Clifford', 'Pauli'], 'only Clifford and Pauli pattern have classical_shadow method'
        if not self._compute_purity:
            shadows = {obs: [] for obs in self._observables}
            dm_copy = self._dm
            for _ in range(self._num_shots // self._batch_size):
                snapshots = []
                for _ in range(self._batch_size):
                    self._dm = dm_copy
                    self.random_evolve()
                    self.single_shot_measure()
                    snapshots.append(self.reconstruct_dm())
                mean = np.mean(np.stack(snapshots), axis=0)
                for k, v in shadows.items():
                    v.append(np.trace(Pauli(k).to_matrix() @ mean).real)
            return {k: np.median(v) for k, v in shadows.items()}
        else:
            dm_copy = self._dm
            snapshots = []
            max_shots = np.max(self._num_shots)
            for _ in range(max_shots):
                self._dm = dm_copy
                self.random_evolve()
                self.single_shot_measure()
                snapshots.append(self.reconstruct_dm())
            purities = []   
            for num_shots in self._num_shots:
                temp_snapshots = snapshots[:num_shots]
                mean = np.mean(np.stack(temp_snapshots), axis=0)
                purities.append(np.trace(mean @ mean).real)
            return purities
        
    def quantum_state_tomography_for_purity(self):
        max_shots = np.max(self._num_shots)
        max_repetition = max_shots // (4 ** self._num_qubits)
        all_observables = [''.join(obsv) for obsv in list(itertools.product(*['IXYZ' for _ in range(self._num_qubits)]))]
        
        def pm1_sample(expct, num_samples):
            prob_p1 = (1 + expct) / 2
            return [+1 if np.random.random() < prob_p1 else -1 for _ in range(num_samples)]
        
        all_samples = dict()
        for obsv in all_observables:
            all_samples[obsv] = pm1_sample(np.trace(self._dm @ Pauli(obsv).to_matrix()).real, max_repetition)
        purities = []
        for num_shots in self._num_shots:
            repetition = num_shots // (4 ** self._num_qubits)
            temp_samples = {k: v[:repetition] for k, v in all_samples.items()}
            estm_dm = np.sum(np.stack([np.mean(new_v) * Pauli(k).to_matrix() for k, new_v in temp_samples.items()]), axis=0) / (2 ** self._num_qubits)
            purities.append(np.trace(estm_dm @ estm_dm).real)
        return purities
    
    def classical_shadow_multi_batches(self):
        assert self._meas in ['Clifford', 'Pauli'], 'only Clifford and Pauli pattern have classical_shadow method'
        assert isinstance(self._batch_size, list) or isinstance(self._batch_size, np.ndarray), 'there must be more than one batch size'
        if not self._compute_purity:
            all_shadows = [{obs: [] for obs in self._observables} for _ in self._batch_size]
            dm_copy = self._dm
            snapshots = []
            for _ in range(self._num_shots):
                self._dm = dm_copy
                self.random_evolve()
                self.single_shot_measure()
                snapshots.append(self.reconstruct_dm())
            for index, size in enumerate(self._batch_size):
                snapshots = split_and_calculate_mean(snapshots, size)
                for k in self._observables:
                    samples = [np.trace(snapshot @ Pauli(k).to_matrix()).real for snapshot in snapshots]
                    all_shadows[index][k] = np.median(samples)
            return all_shadows
    
    def direct_sample(self):
        assert self._meas == 'direct', 'only direct pattern have classical_shadow method'
        all_samples = {obs: [] for obs in self._observables}
        repetition = self._num_shots // len(self._observables)
        for k in all_samples.keys():
            expct = np.trace(Pauli(k).to_matrix() @ self._dm).real
            prob_p1 = (1 + expct) / 2
            prob_m1 = (1 - expct) / 2
            all_samples[k] = [+1 if np.random.random() < prob_p1 else -1 for _ in range(repetition)]
            all_samples[k] = np.mean(all_samples[k])
        return all_samples

In [None]:
def generate_prob_lst(num_states):
    prob_lst = np.array([np.random.random() for _ in range(num_states)])
    prob_lst /= np.sum(prob_lst)
    return prob_lst

def generate_dm(num_qubits, num_states, state_lst=None, prob_lst=None, prime_prob=None):
    assert (prob_lst is None) or (prime_prob is None), 'cannot set prob_lst and prime_prob together'
    if state_lst is None:
        state_lst = [random_statevector(2**num_qubits) for _ in range(num_states)]
    if prime_prob is not None:
        prob_lst = np.array([prime_prob] + (generate_prob_lst(num_states - 1) * (1 - prime_prob)).tolist())
    elif prob_lst is None:
        prob_lst = generate_prob_lst(num_states)
    density_matrix = sum([DensityMatrix(state_lst[i]).data * prob_lst[i] for i in range(num_states)])
    return density_matrix

In [None]:
import pandas as pd

state = generate_dm(4, 8, prime_prob=.8)

observables = np.random.choice([''.join(obsv) for obsv in list(itertools.product(*['IXYZ' for _ in range(4)]))], size=256, replace=False)
qstate_c = QuantumState(4, 50000, [50, 250, 1000, 5000], observables, False, 'Clifford')
qstate_p = QuantumState(4, 50000, [50, 250, 1000, 5000], observables, False, 'Pauli')
qstate_d = QuantumState(4, 50000, 0, observables, False, 'direct')
qstate_c.dm, qstate_p.dm, qstate_d.dm = state, state, state

dict_th = {k: np.trace(Pauli(k).to_matrix() @ state).real for k in observables}
lst_dict_c = qstate_c.classical_shadow_multi_batches()
lst_dict_p = qstate_p.classical_shadow_multi_batches()
dict_d = qstate_d.direct_sample()
df = pd.DataFrame([dict_th, *lst_dict_c, *lst_dict_p, dict_d])
df.index = ['theoretical', 'Clifford-50', 'Clifford-250', 'Clifford-1000', 'Clifford-5000'
            , 'Pauli-50', 'Pauli-250', 'Pauli-1000', 'Pauli-5000', 'direct']
df.to_json('cs2.json', orient='records', lines=True)


In [None]:
df.index = ['theoretical', 'Clifford-50', 'Clifford-250', 'Clifford-1000', 'Clifford-5000'
            , 'Pauli-50', 'Pauli-250', 'Pauli-1000', 'Pauli-5000', 'direct']
df.to_json('cs1.json', orient='records', lines=True)


In [None]:
import matplotlib.pyplot as plt

selected_cols = np.random.choice(observables, size=64, replace=False)
selected_indices = ['theoretical', 'Clifford-5000', 'Pauli-5000', 'direct'] 
df_selected = df.loc[selected_indices, selected_cols]

df_transposed = df_selected.T
df_sorted = df_transposed.sort_values(by='theoretical')
# df_sorted = df_sorted.T


# Set up the figure size
plt.figure(figsize=(18, 10))  # Adjusted to a more typical size

# Plot each series with lines and scatter points
for index in df_sorted.columns:
    if index == 'theoretical':
        plt.plot(df_sorted.index, df_sorted[index], label=index, color='black')
    else:
        plt.scatter(df_sorted.index, df_sorted[index], label=index)

# Add title and labels
plt.title('Classical Shadow v.s. Direct Sampling')
plt.xlabel('Observables')
plt.ylabel('Expectation')
plt.xticks(rotation=45)

# Add legend
plt.legend()

# Adjust layout and save the figure
plt.tight_layout()
plt.grid()
plt.savefig('cs01.png', dpi=600)
plt.show()

In [None]:
th_values = df.loc['theoretical'].values
c_esti_values = df.loc['Clifford-5000'].values
p_esti_values = df.loc['Pauli-5000'].values
d_esti_values = df.loc['direct'].values
filtered_indices = np.abs(th_values) > .1
th_values = th_values[filtered_indices]
c_esti_values = c_esti_values[filtered_indices]
p_esti_values = p_esti_values[filtered_indices]
d_esti_values = d_esti_values[filtered_indices]

print(f"Clifford: {np.mean(np.sort(np.abs((c_esti_values - th_values) / th_values))[:240])}")
print(f"Pauli: {np.mean(np.sort(np.abs((p_esti_values - th_values) / th_values))[:240])}")
print(f"direct: {np.mean(np.sort(np.abs((d_esti_values - th_values) / th_values))[:240])}")

In [None]:
state = generate_dm(5, 4, prime_prob=.8)
observables = np.random.choice([''.join(obsv) for obsv in list(itertools.product(*['IXYZ' for _ in range(5)]))], size=8, replace=False)
qstate_d = QuantumState(5, 10000, 50, observables, False, 'Clifford')
qstate_d.dm = state
qstate_d.classical_shadow()

In [None]:
import pandas as pd

state = generate_dm(4, 8, prime_prob=.8)
shots = list(np.linspace(100, 1000, 10)) + list(np.linspace(2000, 10000, 9)) + list(np.linspace(20000, 100000, 9))
shots = [int(shot) for shot in shots]

# observables = np.random.choice([''.join(obsv) for obsv in list(itertools.product(*['IXYZ' for _ in range(4)]))], size=256, replace=False)
qstate_c = QuantumState(4, shots, [], [], True, False, 'Clifford')
qstate_p = QuantumState(4, shots, [], [], True, False, 'Pauli')
qstate_d = QuantumState(4, shots, [], [], True, False, 'direct')
qstate_c.dm, qstate_p.dm, qstate_d.dm = state, state, state
lst_c = qstate_c.classical_shadow()
lst_p = qstate_p.classical_shadow()
lst_d = qstate_d.quantum_state_tomography_for_purity()
true_purity = np.trace(state @ state).real


In [None]:
shots = list(np.linspace(100, 1000, 10)) + list(np.linspace(2000, 10000, 9)) + list(np.linspace(20000, 100000, 9))
shots = [int(shot) for shot in shots]
qstate_d = QuantumState(4, shots, [], [], True, False, 'direct')
qstate_d.dm = state
lst_d = qstate_d.quantum_state_tomography_for_purity()

In [None]:
# Set up the figure size
# plt.figure(figsize=(18, ))  # Adjusted to a more typical size

# Plot each series with lines and scatter points
plt.plot(shots[10:], [true_purity] * len(shots[10:]), linestyle='--', label='theoretical')
plt.plot(shots[10:], lst_c[10:], label='Clifford')
plt.scatter(shots[10:], lst_c[10:], color='orange')
plt.plot(shots[10:], lst_p[10:], label='Pauli')
plt.scatter(shots[10:], lst_p[10:], color='green')
plt.plot(shots[10:], lst_d[10:], label='direct')
plt.scatter(shots[10:], lst_d[10:], color='red')


# Add title and labels
plt.title('Classical Shadow v.s. Direct Sampling')
plt.xscale('log')
plt.xlabel('Num. of Shots')
plt.ylabel('Purity Predicted')

# Add legend
plt.legend()

# Adjust layout and save the figure
plt.tight_layout()
plt.grid()
plt.savefig('cs02-1.png', dpi=800)
plt.show()

# 2. Compressed Sensing

In [None]:
rho = generate_dm(5, 32, prime_prob=.95)
purity = np.trace(rho @ rho).real
purity

In [None]:
def generate_random_Pauli_strings(num_strings, num_qubits, pattern='balanced'):
    assert pattern in ['balanced', 'pro_I', 'pro_XYZ', 'uv_pair'], 'please choose pattern from: balanced, pro_I, pro_XYZ, uv_pair'
    generated_strings = []
    characters = ['X', 'Y', 'Z', 'I']
    assert 0 < num_strings <= 4 ** num_qubits - 1, 'too much or too few strings to generate'
    if pattern == 'balanced':
        for _ in range(num_strings):
            while True:
                random_string = ''.join(np.random.choice(characters) for _ in range(num_qubits))
                if random_string != 'I' * num_qubits and random_string not in generated_strings:
                    generated_strings.append(random_string)
                    break
        return generated_strings
    if pattern == 'uv_pair':
        uv_map = {'00':'I', '01':'Z', '10':'X', '11':'Y'}
        whole, remain = num_strings // 2 ** (num_qubits), num_strings % 2 ** (num_qubits)
        if whole > 0:
            v_lst = [format(i, f'0{num_qubits}b') for i in range(2 ** num_qubits)]
            u_lst = np.random.choice([format(i, f'0{num_qubits}b') for i in range(1, 2 ** num_qubits)], whole, replace=False).tolist()
            for u, v in list(itertools.product(u_lst, v_lst)):
                generated_strings.append(''.join([uv_map[u_char + v_char] for u_char, v_char in zip(u, v)]))
        if remain > 0:
            u_lst = ['0' * num_qubits]
            v_lst = np.random.choice([format(i, f'0{num_qubits}b') for i in range(1, 2 ** num_qubits)], remain, replace=False).tolist()
            for u, v in list(itertools.product(u_lst, v_lst)):
                generated_strings.append(''.join([uv_map[u_char + v_char] for u_char, v_char in zip(u, v)]))
        return generated_strings
    all_strings = generate_random_Pauli_strings(4 ** num_qubits - 1, num_qubits, pattern='balanced')
    grouped = dict()
    for i in range(num_qubits):
        grouped[i] = []
    for string in all_strings:
        grouped[string.count('I')].append(string)
    if pattern == 'pro_I':
        i = num_qubits - 1
        while len(generated_strings) < num_strings:
            if num_strings - len(generated_strings) >= len(grouped[i]):
                generated_strings += grouped[i]
            else:
                generated_strings += np.random.choice(grouped[i], num_strings - len(generated_strings), replace=True).tolist()
            i -= 1
        return generated_strings
    if pattern == 'pro_XYZ':
        i = 0
        while len(generated_strings) < num_strings:
            if num_strings - len(generated_strings) >= len(grouped[i]):
                generated_strings += grouped[i]
            else:
                generated_strings += np.random.choice(grouped[i], num_strings - len(generated_strings), replace=True).tolist()
            i += 1
        return generated_strings
        

In [None]:
def get_fidelity(dm1, dm2, tol=1e-5):
    # assert is_legal(dm1) and is_legal(dm2), 'inputs are not legal density matrices'
    if not is_legal(dm1) and is_legal(dm2):
        print("Warning: inputs are not legal density matrices")
    try: 
        fidelity = (np.trace(sqrtm(sqrtm(dm1) @ dm2 @ sqrtm(dm1)))) ** 2
    except ValueError:
        print('fidelity cannot be computed for given inputs')
    assert np.abs(np.imag(fidelity)) < tol, 'fidelity is not real within tol'
    return fidelity.real

In [None]:
def optimize(dim, obsv, expct, tol=1e-5):
    sigma = cp.Variable((dim, dim), complex=True)
    objective = cp.Minimize(cp.abs(5 * cp.norm(sigma, 'nuc') + 0 * cp.norm(sigma, 'fro') ** 2))
    constraints = [cp.trace(sigma) == 1]
    for o, e in zip(obsv, expct):
        constraints.append(cp.abs(cp.trace(sigma @ Pauli(o).to_matrix()) - e) <= tol)
    problem = cp.Problem(objective, constraints)
    problem.solve()
    print(problem.status)
    return sigma.value

def estimate_Pauli_expectations(dm, obsv, num_samples, simulation=False):
    num_samples = int(num_samples)
    if simulation: # simulate the process of sampling
        exp = np.real(np.trace(dm @ Pauli(obsv).to_matrix()))
        prob_p1 = (1 + exp) / 2
        prob_m1 = 1 - prob_p1
        samples = np.random.choice([+1, -1], size=num_samples, p=[prob_p1, prob_m1])
        return np.mean(samples)
    else: # use the approximate distribution instead
        exp = np.real(np.trace(dm @ Pauli(obsv).to_matrix()))
        num_samples_root = num_samples ** .5
        std_dev = (1 - exp ** 2) ** .5 / num_samples_root
        return np.random.normal(exp, std_dev)
    
num_qubits = 5
d = 2 ** num_qubits
nums_observables = [100, 300, 500, 700]
fidelities = dict()
for num_observables in nums_observables:
    print(num_observables)
    fidelities[num_observables] = []
    for i in range(4):
        print('    ' + str(i))
        observables = generate_random_Pauli_strings(300, 5, 'balanced')
        expectations = [estimate_Pauli_expectations(rho, obsv, 10e6) for obsv in observables]
        sigma = optimize(d, observables, expectations)
        fidelities[num_observables].append(get_fidelity(sigma, rho))

In [None]:
def optimize(dim, obsv, expct, tol=1e-5):
    sigma = cp.Variable((dim, dim), complex=True)
    objective = cp.Minimize(cp.abs(5 * cp.norm(sigma, 'nuc') + 0 * cp.norm(sigma, 'fro') ** 2))
    constraints = [cp.trace(sigma) == 1]
    for o, e in zip(obsv, expct):
        constraints.append(cp.abs(cp.trace(sigma @ Pauli(o).to_matrix()) - e) <= tol)
    problem = cp.Problem(objective, constraints)
    problem.solve()
    print(problem.status)
    return sigma.value

def estimate_Pauli_expectations(dm, obsv, num_samples, simulation=False):
    num_samples = int(num_samples)
    if simulation: # simulate the process of sampling
        exp = np.real(np.trace(dm @ Pauli(obsv).to_matrix()))
        prob_p1 = (1 + exp) / 2
        prob_m1 = 1 - prob_p1
        samples = np.random.choice([+1, -1], size=num_samples, p=[prob_p1, prob_m1])
        return np.mean(samples)
    else: # use the approximate distribution instead
        exp = np.real(np.trace(dm @ Pauli(obsv).to_matrix()))
        num_samples_root = num_samples ** .5
        std_dev = (1 - exp ** 2) ** .5 / num_samples_root
        return np.random.normal(exp, std_dev)
    
num_qubits = 5
d = 2 ** num_qubits
nums_observables = [30, 50, 70, 90]
fidelities = dict()
for num_observables in nums_observables:
    print(num_observables)
    fidelities[num_observables] = []
    for i in range(4):
        print('    ' + str(i))
        observables = generate_random_Pauli_strings(300, 5, 'balanced')
        expectations = [estimate_Pauli_expectations(rho, obsv, 10e6) for obsv in observables]
        sigma = optimize(d, observables, expectations)
        fidelities[num_observables].append(get_fidelity(sigma, rho))