# NLOS with HHL

## 1. NLOS Test case generation

### Layout of testcase generation

1. Set SV at origin $(0, 0, 0)$
1. Based on $r_{min}$ and $r_{max}$, generate random HV position. `numpy.random.rand()` is used. 
1. Randomly create total of `num_points` points, each point's distance being farther away than $d_{min}$ but closer to each points than $d_{max}$.
1. Compute necessary quantities($\phi, \, \psi,\, \text{etc...}$)

In [146]:
import numpy as np

In [156]:
def dist(a: np.ndarray, b: np.ndarray):
    if a.shape() != b.shape():
        raise ValueError("inconsistent point shape")
    return np.sqrt(np.sum(a * b))

In [157]:
class NLOStestcase:
    """
    One randomized test case (NLOS geometry). Does not enforce NLOS by itself,
    just generates geometry.
    """
    def __init__(self: "NLOStestcase", sample_point_cnt: int, HV_r_min: float, HV_r_max: float, 
                 sample_r_min: float, sample_r_max: float, HV_orientation: float | None = None,
                 HV: tuple[float, float, float] | None = None, sample_points: tuple | list | None = None):
        
        if HV_r_min < 0 or HV_r_max < 0 or HV_r_min > HV_r_max:
            raise ValueError("HV radius restriction inappropriate")
        if HV_orientation is not None and (HV_orientation < 0 or HV_orientation > 2 * np.pi):
            raise ValueError("Inappropriate HV orientation")
        if sample_r_min < 0 or sample_r_max < 0 or sample_r_max < sample_r_min:
            raise ValueError("sample point distance restriction inappropriate")
        if sample_point_cnt <= 0:
            raise ValueError(f"Too small sample_point_cnt: {sample_point_cnt}")
        if (sample_points is not None and len(sample_points) != sample_point_cnt):
            raise ValueError("Sample point count/actual list mismatch")

        self.SV = np.zeros(3)

        self.sample_r_min = sample_r_min
        self.sample_r_max = sample_r_max
        
        if HV is None:
            self.HV = self._sample_point_in_shell(HV_r_min, HV_r_max)
            print(f"HV position set randomly: {self.HV}")
        else:
            self.HV = HV
            print(f"HV position set to given parameter: {self.HV}")
        
        if HV_orientation is None:
            self.HV_orientation = np.random.random() * 2 * np.pi
        else:
            self.HV_orientation = HV_orientation

        # if dist between HV and SV is greater than sample_r_max * 2, throw
        if dist(self.HV, self.SV) > sample_r_max * 2.5:
            raise ValueError("sample_r_max is too small")
        
        # set the reflection sample points
        self.sample_point_cnt = sample_point_cnt
        if sample_points is None:
            points = set()
            while len(points) < self.sample_point_cnt:
                point = self._sample_point_in_shell(sample_r_min, sample_r_max)
                if sample_r_min < dist(point, HV) < sample_r_max:
                    points.add(point)
            self.sample_points = list(sample_points)
        else:
            self.sample_points = sample_points

        # convert each point to NLOS-used spherical coordinates
        self.sample_spherical_points = []
        for x in self.sample_points:
            self.sample_spherical_points.append(np.ndarray(self._to_NLOS_coordinates(x)))


    def _random_direction_3d(self) -> np.ndarray:
        """
        Sample a random direction uniformly on the unit sphere S^2.
        """
        v = np.random.normal(size=3)
        norm = np.linalg.norm(v)
        if norm < 1e-12:
            # extremely unlikely; resample if it happens
            return self._random_direction_3d()
        return v / norm
    
    def _sample_point_in_shell(self, r_min: float, r_max: float) -> np.ndarray:
        """
        Sample a random 3D point whose distance from the origin is in [r_min, r_max].
        Direction is uniform; radius is uniform in [r_min, r_max] (not volume-uniform).
        """
        direction = self._random_direction_3d()
        r = r_min + (r_max - r_min) * np.random.rand()
        return r * direction
    
    def _cartesian_to_spherical(self, point: np.ndarray):
        r = dist(np.array([0, 0, 0]), point)
        rho = np.sqrt(np.sum(point[:2] ** 2))
        phi = np.arctan2(point[1], point[0])
        psi = np.arctan2(point[2], rho)
        return r, phi, psi
    
    def _to_NLOS_coordinates(self, point: np.ndarray):
        """returns d, v, theta, vartheta, phi, psi"""
        v, theta, var_theta = self._cartesian_to_spherical(point)
        v_p, phi_p, psi = self._cartesian_to_spherical(point - self.HV)
        
        phi = phi_p - self.HV_orientation
        if phi < 0: 
            phi += 2 * np.pi

        return (v + v_p, v, theta, var_theta, phi, psi)

    def get_A(self, omega):
        pass

    def get_b(self, omega):
        pass


## 2. NLOS Preparation

### Method for obtaining $\omega$

$\omega$ is required to determine $\textbf{A}(\omega)$ and $\textbf{B}(\omega)$.

$\omega$ is determined by the equation $\text{null}(\textbf{A}(\omega)^\text{T})^\text{T} \textbf{B}(\omega)=0$.

Current implementation: Linear search

## 3. HHL Implementation

In [None]:
import math
import numpy as np
import qiskit
from scipy.linalg import expm
from qiskit.circuit.library import StatePreparation, ExactReciprocalGate, phase_estimation
from qiskit.quantum_info import Statevector, Operator
from qiskit_aer import AerSimulator
from qiskit.compiler import transpile
from qiskit.result import Result
from qiskit.circuit import IfElseOp, QuantumCircuit, QuantumRegister, ClassicalRegister, Gate
from qiskit.circuit.library import Initialize , UnitaryGate , QFTGate, ExactReciprocalGate, UCRYGate

from qiskit.quantum_info.operators.predicates import is_unitary_matrix

In [None]:
def progress_callback(**kwargs):
    pass_name = kwargs['pass_'].name()
    count = kwargs['count']
    # The DAG circuit is an internal representation that can be inspected
    dag = kwargs['dag'] 
    print(f"Pass {count}: {pass_name}, Operations: {dag.count_ops()}")


def hhl_fidelity(x_classic, probs_hhl):
    # Classical solution
    x = x_classic / np.linalg.norm(x_classic)

    # Rebuild complex vector from probabilities (up to global phase)
    # HHL gives only magnitudes; pick sign so inner product is maximized
    # Just treat as positive real for fidelity purposes

    probs_classical = np.abs(x) ** 2
    probs_classical = probs_classical / probs_classical.sum()

    # Normalize HHL probs just in case
    probs_hhl = probs_hhl / probs_hhl.sum()

    # Probability "fidelity" (Bhattacharyya coefficient)
    fidelity = float(np.sum(np.sqrt(probs_classical * probs_hhl)))
    # Numerical safety for arccos
    fidelity_clamped = max(min(fidelity, 1.0), -1.0)
    angle = float(np.rad2deg(np.arccos(fidelity_clamped)))

    return fidelity, angle

In [None]:
class CustomExactReciprocalGate(Gate):

    def __init__(
        self, num_state_qubits: int, scaling: float, neg_vals: bool = False, kappa: float | None = None, cutoff_ratio: float = 2.0, label: str = "1/x"
    ) -> None:
        super().__init__("CustomExactReciprocal", num_state_qubits + 1, [], label=label)

        self.scaling = scaling
        self.neg_vals = neg_vals
        self.kappa = kappa
        self.cutoff_ratio = cutoff_ratio
    
    def _f_lambda(self, lam: float) -> float:
        def f_original(lam):
            if math.isclose(self.scaling / lam, 1.0, abs_tol=1e-05):
                return 1.0
            elif self.scaling / lam < 1.0:
                return self.scaling / lam
            return 0.0

        if self.kappa is None:
            return f_original(lam)
        
        # original HHL paper for f(lambda)
        # if 1/kap_p < lambda < 1/kap: transition area
        # if lambda < 1/kap_p: too small lambda, don't rotate
        # if lamdba > 1/kap: good lambda, behave like original HHL
        kap = self.kappa
        kap_p = kap * self.cutoff_ratio

        # good area
        if lam >= 1 / kap:
            return f_original(lam)

        # bad area: not rotate at all
        if lam < 1 / kap_p:
            return 0.0
        
        # transition area
        t = (lam - 1 / kap_p) / (1 / kap - 1 / kap_p)
        return f_original(1 / kap) * math.sin(0.5 * math.pi * t)



    def _define(self):
        num_state_qubits = self.num_qubits - 1
        qr_state = QuantumRegister(num_state_qubits, "state")
        qr_flag = QuantumRegister(1, "flag")
        circuit = QuantumCircuit(qr_state, qr_flag)

        angles = [0.0]
        nl = 2 ** (num_state_qubits - 1) if self.neg_vals else 2**num_state_qubits

        # Angles to rotate by scaling / x, where x = i / nl
        for i in range(1, nl):
            lam = i / nl
            f = self._f_lambda(lam)
            if math.isclose(f, 1.0, abs_tol = 1e-5):
                angles.append(np.pi)
            elif f <= 0.0:
                angles.append(0.0)
            else:
                angles.append(2 * math.asin(f))

        circuit.append(UCRYGate(angles), [qr_flag[0]] + qr_state[: len(qr_state) - self.neg_vals])

        if self.neg_vals:
            circuit.append(
                UCRYGate([-theta for theta in angles]).control(),
                [qr_state[-1]] + [qr_flag[0]] + qr_state[:-1],
            )
            angles_neg = [0.0]
            for i in range(1, nl):
                lam_neg = -(1.0 - i / nl)
                lam = abs(lam_neg)
                f = self._f_lambda(lam)

                if math.isclose(f, 1.0, abs_tol = 1e-5):
                    angles_neg.append(-np.pi)
                elif f <= 0.0:
                    angles_neg.append(0.0)
                else:
                    angles_neg.append(-2 * math.asin(f))

            circuit.append(
                UCRYGate(angles_neg).control(), [qr_state[-1]] + [qr_flag[0]] + qr_state[:-1]
            )

        self.definition = circuit

In [None]:
class HHL:
    """class that implements HHL algorithm"""

    def __init__(self, A: np.typing.NDArray, b: np.typing.NDArray, state_dim: int = -1, tol=1e-05, lambda_min_eff=1e-01, force_cutoff=None):
        """A: operator matrix, b: matrix, n: number of qubits
        force_cutoff: 
            - True - Always use cutoff based on lambda_min_eff
            - None - use cutoff if eigval is  on lambda_min_eff
            - False - do not use cutoff at all times.
        """
        assert (len(A.shape) == 2 and len(b.shape) == 1), "Inconsistent dimension of A or b"
        assert (A.shape[0] == b.shape[0]), "shape of A does not match b"
        
        self.A = A.copy()
        self.b = b.copy()
        self.lambda_min_eff = lambda_min_eff

        if (self.A.shape[0] != self.A.shape[1] or not np.allclose(self.A, self.A.conj().T, atol=tol)):
            if (self.A.shape[0] != self.A.shape[1]):
                print("A is not a square matrix")
            else:
                print("A is not Hermitian")
            self.A = np.block([[np.zeros((self.A.shape[0], self.A.shape[0])), self.A],
                               [self.A.conj().T, np.zeros((self.A.shape[1], self.A.shape[1]))]])
            print("A is evaluated as [[0, A], [AT, 0]]")
            print("-----------------------------------")
            self.b = np.block([b, np.zeros(A.shape[1])])
        
        # pad A to be 2^n
        self.n_b = 0
        while (self.A.shape[0] > (1 << self.n_b)):
            self.n_b += 1
        
        if (self.A.shape[0] != (1 << self.n_b)):
            print("A is not power of 2, pad to make A {0} * {0}".format((1 << self.n_b)))
            print("-----------------------------------")
            pad = (1 << self.n_b) - self.A.shape[0]

            self.A = np.pad(self.A, ((0, pad), (0, pad)), "constant", constant_values=0)
            self.b = np.pad(self.b, (0, (1 << self.n_b) - self.b.shape[0]), "constant", constant_values=0)

        self.tol = tol
        self.dim = (1 << self.n_b)

        if (not math.isclose(np.linalg.norm(self.b), 1, abs_tol=tol)):
            self.b = self.b.copy() / np.linalg.norm(self.b)
            print(f"b is not normalized, normalized to b = {self.b} during init")
            print("-----------------------------------")
        
        eigvals = np.linalg.eigvalsh(self.A)

        if force_cutoff or (force_cutoff is None and np.abs(eigvals).min() < lambda_min_eff):
            if not any(np.abs(eigvals) > lambda_min_eff):
                raise ValueError("lambda_min_eff is too big, all eigvals are smaller")
            
            lambda_min = np.abs(eigvals)[np.abs(eigvals) > lambda_min_eff].min()
            self.cutoff = True
            print(f"chosen mode: {force_cutoff}\nCutoff enabled: lambda_min_eff - {lambda_min_eff}, min(eigvals) - {np.abs(eigvals).min()}")

        else:
            lambda_min = np.abs(eigvals).min()
            self.cutoff = False
            print(f"chosen mode: {force_cutoff}\nCutoff disabled")
        print("-----------------------------------")

        # C for making C/Î» < 1
        self.C = lambda_min * 0.9

        self.kappa = np.abs(eigvals).max() / lambda_min # characteristic number of matrix A
        # define t slightly smaller -> then if eigval is positive, < 0.45 and negative, > 0.55
        # if just pi / |lambda_max| -> 0.5 eigval errors
        self.t = np.pi / np.abs(eigvals).max() / 1.5 # calculate t

        self.phase_min_eff = lambda_min * self.t / (2 * np.pi)

        if (state_dim > 0):
            self.state_dim = state_dim
        else:
            if (self.kappa < 100):
                self.state_dim = int(np.log2(self.kappa)) + 6
            else:
                self.state_dim = 12

        self.U = expm(1j * self.t * self.A) # U = e^(iAt), then convert to unitarygate
    
    def _create_register(self):
        """call this to initially create registers, only once"""
        b_reg = QuantumRegister(self.n_b, 'sys')
        c_reg = QuantumRegister(self.state_dim, 'phase')
        aux_reg = QuantumRegister(1, 'aux')
        sign_reg = QuantumRegister(1, 'sign')
        classic_aux_reg = ClassicalRegister(1, 'classic_aux')
        result_reg = ClassicalRegister(self.n_b, 'classic_sys')
        return b_reg, c_reg, aux_reg, sign_reg, classic_aux_reg, result_reg


    def _prepare_state(self, circ: QuantumCircuit, b_reg: QuantumRegister):
        """add gate to mount b on registers"""
        # assert math.isclose(np.linalg.norm(self.b), 1, abs_tol=self.tol) # check if b is normalized
        init_gate = StatePreparation(Statevector(self.b))
        circ.append(init_gate, b_reg) # add gate for initalizing b
        circ.barrier() # separation from rest of the process

    def _assign_neg_sign(self, circ, c_reg, sign_reg):
        phase_le = list(reversed(c_reg)) # since qpe gives us big-endian, should reverse/little-endian expected in ExactReciprocalGate
        # if the most-significant bit(msb) is 1 -> phase > 1
        # therefore sign must be 1
        # if msb = 1 -> set sign to 1(controlled-not on sign_reg)
        circ.cx(phase_le[-1], sign_reg[0])
        

    def _controlled_rotation(self, circ: QuantumCircuit, c_reg: QuantumRegister, aux_reg: QuantumRegister, sign_reg: QuantumRegister):
        """add controlled rotation block to circuit"""
        # neg_vals = True: eigvals may be negative
        # if negative, ExactReciprocalGate expects sign qubit to be 1
        # and the phase to be (1 - phase)
        # switch to negative if sign = 1 in _assign_neg_sign

        # todo: choose the right kappa
        if self.cutoff:
            rot_gate = CustomExactReciprocalGate(
                num_state_qubits=self.state_dim + 1,
                scaling=self.C * self.t / (2 * np.pi),
                neg_vals=True,
                kappa=1 / self.phase_min_eff,
                cutoff_ratio=2.0
            )
        else:
            # rot_gate = CustomExactReciprocalGate(
            #     num_state_qubits=self.state_dim + 1,
            #     scaling=self.C * self.t / (2 * np.pi),
            #     neg_vals=True,
            #     kappa=None,
            # )
            rot_gate = ExactReciprocalGate(
                num_state_qubits=self.state_dim + 1, 
                scaling=self.C * self.t / (2 * np.pi), 
                neg_vals=True
            )

        circ.append(rot_gate, [*sign_reg, *reversed(c_reg), *aux_reg])
    

    def _qpe(self, circ: QuantumCircuit, b_reg, c_reg, inverse=False):
        """add QPE/IQPE gate to the circuit"""
        qpe_circ = phase_estimation(self.state_dim, Operator(self.U).to_instruction())
        if (inverse):
            qpe_circ = qpe_circ.inverse()
        circ.append(qpe_circ.to_gate(), [*c_reg, *b_reg]) # order may change


    def _measure(self, circ: QuantumCircuit, aux_reg: QuantumRegister, b_reg: QuantumRegister, classic_aux_reg: ClassicalRegister, result_reg: ClassicalRegister):
        """measure the quantum qubits and store in classic bits"""
        circ.measure(aux_reg, classic_aux_reg)
        circ.measure(b_reg, result_reg)

    def transpile(self, simulator=AerSimulator, show_progress=False):
        """function to transpile circuit. required to be executed before run"""
        b_reg, c_reg, aux_reg, sign_reg, classic_aux_reg, result_reg = self._create_register()


        circ = QuantumCircuit(aux_reg, c_reg, b_reg, sign_reg, result_reg, classic_aux_reg)
        self._prepare_state(circ, b_reg)
        self._qpe(circ, b_reg, c_reg, inverse=False)
        circ.barrier()
        # check if phase is negative
        self._assign_neg_sign(circ, c_reg, sign_reg)
        circ.barrier()
        self._controlled_rotation(circ, c_reg, aux_reg, sign_reg)
        circ.barrier()
        self._assign_neg_sign(circ, c_reg, sign_reg)
        circ.barrier()
        self._qpe(circ, b_reg, c_reg, inverse=True)
        circ.barrier()
        self._measure(circ, aux_reg,  b_reg, classic_aux_reg, result_reg)

        self.backend = simulator()
        if (show_progress):
            return transpile(circ, backend=self.backend, callback=progress_callback), circ
        return transpile(circ, backend=self.backend), circ
    
    def run(self, circ, shots=1):
        try:
            return self.backend.run(circ, shots=shots).result()
        except AttributeError:
            raise AttributeError("Circuit is not Transpiled before running")
    
    def transpile_and_run(self, shots=1):
        transpiled_circ = self.transpile()
        return self.run(transpiled_circ, shots=shots).result()
    
    def analyze(self, result: Result, show_counts=False) -> tuple[float, np.ndarray]:
        count = result.get_counts()

        if (isinstance(count, dict)):
            if show_counts:
                print(count)
            success = {res: cnt for res, cnt in count.items() if res.split()[0] == '1'}
            # failure = {res: cnt for res, cnt in count.items() if res.split()[0] == '0'}
            tot = sum(success.values())
            success_rate = tot / sum(count.values())
            
            x = np.zeros((1 << self.n_b), dtype=float)
            for res, cnt in success.items():
                ind = int(res.split()[1], 2)
                x[ind] = cnt / tot
            return success_rate, x
        else:
            print(count)
            raise TypeError("result.get_counts() is not type dict[str, int]")

### HHL Test Example

In [153]:
    # [1, 2, 0],
    # [2, 1, 3],
    # [0, 3, 2]
A = np.array([
    [1, 2, 0, 1],
    [2, 1, 3, -1],
    [0, 3, 2, 4],
    [1, -1, 4, 4],
], dtype=float)
b = np.array([2, 5, -10, 3], dtype=float)

alg = HHL(A, b, lambda_min_eff=1e-1, force_cutoff=None)

print(np.linalg.eigvalsh(alg.A))

b is not normalized, normalized to b = [ 0.17025131  0.42562827 -0.85125653  0.25537696] during init
-----------------------------------
chosen mode: None
Cutoff disabled
-----------------------------------
[-3.83476114  0.88196357  3.4106229   7.54217467]


In [154]:
circ, c = alg.transpile(show_progress=False)

In [None]:
# classic
try:
    x_classic = np.linalg.solve(alg.A, alg.b)
except np.linalg.LinAlgError as e:
    print(f"Failed to solve classically, {e.args}")
    x_classic = np.linalg.lstsq(alg.A, alg.b)[0] # first: singular value of A
except Exception as e:
    raise ValueError(f"Failed to solve: {e.args}")
print(x_classic)

for shots in [10**x for x in range(3, 7)]:
    print(f"shots: {shots}")
    result = alg.run(circ, shots=shots)

    p, x = alg.analyze(result, show_counts=False)
    print(f"success rate: {p}\n{x}")
    fidelity, deg = hhl_fidelity(x_classic, x)
    print(f"Fidelity: {fidelity}, angle: {deg}")
    print("Classical Norm: {0}, HHL-based norm: {1}".format(np.linalg.norm(x_classic), math.sqrt(p) / alg.C) * 2)



print(x_classic ** 2 / np.linalg.norm(x_classic) ** 2)

[ 0.57043972 -0.21330336 -0.15850984  0.02641831]
shots: 1000
success rate: 0.076
[0.81578947 0.13157895 0.05263158 0.        ]
Fidelity: 0.9985799349918278, angle: 3.053817999511264
Classical Norm: 0.6298595830750704, HHL-based norm: 0.3473070835197967
shots: 10000
success rate: 0.059
[0.79152542 0.1440678  0.05762712 0.00677966]
Fidelity: 0.9981522076840869, angle: 3.4836226460737283
Classical Norm: 0.6298595830750704, HHL-based norm: 0.306008049548378
shots: 100000
success rate: 0.06283
[0.81871717 0.11586822 0.06318638 0.00222823]
Fidelity: 0.9999842556274263, angle: 0.3215147333760821
Classical Norm: 0.6298595830750704, HHL-based norm: 0.31578418410667897
shots: 1000000
success rate: 0.062283
[0.81893936 0.11518392 0.06366103 0.00221569]
Fidelity: 0.9999861173955311, angle: 0.30190729554932977
Classical Norm: 0.6298595830750704, HHL-based norm: 0.3144065655244754
[0.82022317 0.11468534 0.06333226 0.00175923]


## 4. Apply HHL to NLOS