In [159]:
import numpy as np
class MIGCore:
    """
    Core functionality for covariance matrix generation and basic operations
    Implements Eq. (3) and (5) from the paper
    """
    
    def __init__(self):
        pass
    
    def generate_covariance_matrix(self, z: np.ndarray, smooth: bool = False) -> np.ndarray:
        """
        Generate covariance matrix from signal vector z as in Eq. (3) and (5)
        """
        n = len(z)
        

        r = self.compute_correlation_coefficients(z, smooth)
        
        R = self.build_toeplitz_matrix(r)
        s=self.get_symbol_from_corr(r)
        return R,s
    
    def compute_correlation_coefficients(self, z: np.ndarray,smoothed:bool=False) -> np.ndarray:
        """Compute correlation coefficients r = [r_0, r_1, ..., r_{n-1}]"""
        n = len(z)
        r = np.zeros(n, dtype=complex)
        
        for i in range(n):
            if i == 0:
                r[i] = np.mean(z * np.conj(z))
            else:
                sum_val = 0
                for j in range(0, n - i):
                    sum_val += z[j] * np.conj(z[j + i])
                r[i] = sum_val / n
                if smoothed:
                    r[i]=r[i]*np.sinc(i*n**(-0.2)/2)
        return r

    def get_symbol_from_corr(self,r:np.ndarray):
        n = len(r)
    
        # Method 1: Direct formula (clear)
        f = np.zeros(n, dtype=float)
        
        # j = 0,...,n-1
        j = np.arange(n)
        
        # Add r_0 term
        f += r[0].real  # r_0 is real
        
        # Add terms for k = 1,...,n-1
        for k in range(1, n):
            phase = 2 * np.pi * j * k / n
            # r_k * exp(-i*phase) + conj(r_k) * exp(i*phase)
            f += 2 * (r[k].real * np.cos(phase) + r[k].imag * np.sin(phase))
        
        return f
    
    def build_toeplitz_matrix(self, r: np.ndarray) -> np.ndarray:
        """
        Build Toeplitz HPD matrix from correlation coefficients as in Eq. (3)
        """
        n = len(r)
        R = np.zeros((n, n), dtype=complex)
        
        for i in range(n):
            for j in range(n):
                if i == j:
                    R[i, j] = r[0]
                elif i < j:
                    R[i, j] = np.conj(r[j - i])
                else:
                    R[i, j] = r[i - j]
        
        if not self.is_hpd(R):
            R += 1e-10 * np.eye(n)
            
        return R
    
    def is_hpd(self, R: np.ndarray) -> bool:
        """Check if matrix is Hermitian Positive Definite"""
        if not np.allclose(R, R.conj().T):
            return False
        try:
            np.linalg.cholesky(R)
            return True
        except np.linalg.LinAlgError:
            return False
    
    def generate_test_signal(self, n: int, signal_type: str = 'random') -> np.ndarray:
        """Generate test signal for validation"""
        if signal_type == 'random':
            return np.random.randn(n) + 1j * np.random.randn(n)
        elif signal_type == 'sinusoid':
            t = np.linspace(0, 1, n)
            return np.exp(1j * 2 * np.pi * 5 * t) + 0.1 * (np.random.randn(n) + 1j * np.random.randn(n))
        else:
            return np.random.randn(n) + 1j * np.random.randn(n)

In [135]:
core = MIGCore()
    
# Test 1: Basic signal to covariance matrix
z = np.array([1+1j, 2+0j, 1-1j, 0+1j])
R_s = core.generate_covariance_matrix(z,True)
R = core.generate_covariance_matrix(z,False)
print(f"Input signal: {z}")

print(f"Covariance matrix shape: {R.shape}")
print(f"Is HPD: {core.is_hpd(R)}")
print(f"Matrix is Hermitian: {np.allclose(R, R.conj().T)}")
print(f"Covariance matrix:\n{R}")
print(f"Smoothed Covariance matrix:\n{R_s}")

Input signal: [1.+1.j 2.+0.j 1.-1.j 0.+1.j]
Covariance matrix shape: (4, 4)
Is HPD: True
Matrix is Hermitian: True
Covariance matrix:
[[2.25+0.j   0.75-0.75j 0.  -0.j   0.25+0.25j]
 [0.75+0.75j 2.25+0.j   0.75-0.75j 0.  -0.j  ]
 [0.  +0.j   0.75+0.75j 2.25+0.j   0.75-0.75j]
 [0.25-0.25j 0.  +0.j   0.75+0.75j 2.25+0.j  ]]
Smoothed Covariance matrix:
[[ 2.25      +0.j          0.58499294-0.58499294j  0.        -0.j
  -0.02916466-0.02916466j]
 [ 0.58499294+0.58499294j  2.25      +0.j          0.58499294-0.58499294j
   0.        -0.j        ]
 [ 0.        +0.j          0.58499294+0.58499294j  2.25      +0.j
   0.58499294-0.58499294j]
 [-0.02916466+0.02916466j  0.        +0.j          0.58499294+0.58499294j
   2.25      +0.j        ]]


In [3]:
def generate_real_Toeplitz_covariance(n: int) -> np.ndarray:
    # generate a random Toeplitz covariance matrix r0...rn-1
    r = np.zeros(n, dtype=complex)
    r[0]=1
    for i in range(1, n):
        r[i] = np.random.uniform(0, 1/i)
        theta=np.random.uniform(0,2*np.pi)
        r[i] *= np.exp(1j * theta)
    return r



In [5]:
from numpy import ndarray
from scipy.linalg import sqrtm
def generate_samples(r:ndarray,num_samples:int):
    # T_r=core.build_toeplitz_matrix(r)
    T_t=sqrtm(r)
    n = len(r)
    z_std = (np.random.randn(n, num_samples) + 1j*np.random.randn(n, num_samples)) / np.sqrt(2)
    Z = T_t @ z_std
    return Z

In [6]:
r=generate_real_Toeplitz_covariance(1024)
R=core.build_toeplitz_matrix(r)
Z=generate_samples(R,100)
print(Z.shape)

(1024, 100)


In [49]:
corr=core.generate_covariance_matrix(Z[:,0])
corr

array([[ 1.34134841e+00+3.24670799e-19j, -1.98777236e-01-2.74053942e-01j,
        -1.07719205e-01+2.06339415e-03j, ...,
        -1.44942755e-03-1.11235310e-04j,  2.92247471e-04+1.24001883e-03j,
        -2.53417572e-04-1.49176006e-04j],
       [-1.98777236e-01+2.74053942e-01j,  1.34134841e+00+3.24670799e-19j,
        -1.98777236e-01-2.74053942e-01j, ...,
         1.06426187e-03-1.68055096e-04j, -1.44942755e-03-1.11235310e-04j,
         2.92247471e-04+1.24001883e-03j],
       [-1.07719205e-01-2.06339415e-03j, -1.98777236e-01+2.74053942e-01j,
         1.34134841e+00+3.24670799e-19j, ...,
         2.10442442e-03+4.37408842e-05j,  1.06426187e-03-1.68055096e-04j,
        -1.44942755e-03-1.11235310e-04j],
       ...,
       [-1.44942755e-03+1.11235310e-04j,  1.06426187e-03+1.68055096e-04j,
         2.10442442e-03-4.37408842e-05j, ...,
         1.34134841e+00+3.24670799e-19j, -1.98777236e-01-2.74053942e-01j,
        -1.07719205e-01+2.06339415e-03j],
       [ 2.92247471e-04-1.24001883e-03j, -1.

In [109]:
import scipy.linalg as la
def riemannian_distance(R1: np.ndarray, R2: np.ndarray) -> float:
        """
        Riemannian Distance (RD) as in Eq. (10): ‖log(R1⁻¹ R2)‖_F
        """
        try:
            R1_inv = np.linalg.inv(R1)
            M = R1_inv @ R2
            log_M = la.logm(M)
            return np.linalg.norm(log_M, 'fro')
        except (np.linalg.LinAlgError, la.LinAlgError):
            print("ERROR!")
            return np.linalg.norm(R1 - R2, 'fro')

In [8]:
def compute_errors(Z:ndarray,num_samples:int,r:ndarray):
    d=np.zeros(num_samples)
    d_s=np.zeros(num_samples)
    for i in range(num_samples):
        z=Z[:,i]
        corr=core.generate_covariance_matrix(z)
        corr_s=core.generate_covariance_matrix(z,True)
        d[i]=riemannian_distance(corr, r)
        d_s[i]=riemannian_distance(corr_s, r)
    return d, d_s

In [9]:
def line(n:int,num_samples:int):
    r=generate_real_Toeplitz_covariance(n)
    R=core.build_toeplitz_matrix(r)
    Z=generate_samples(R,num_samples)
    d,d_s=compute_errors(Z,num_samples,R)
    return d, d_s

In [72]:
d,d_s=line(1024,100)

In [73]:
d

array([71.62521079, 70.44544747, 72.21075573, 71.54437087, 72.87346224,
       71.75181184, 70.94650086, 72.90700205, 71.68162362, 72.13896729,
       69.96226906, 73.26725269, 71.86331772, 71.57920671, 71.13526265,
       71.96044179, 72.48146427, 72.38554005, 71.3024876 , 72.04194459,
       72.52454517, 72.11169106, 71.20709995, 72.25091813, 70.93662352,
       72.36419177, 72.34376344, 71.53327758, 72.06549699, 71.95083774,
       73.27745453, 72.01491274, 71.31433913, 72.67118634, 71.66398805,
       72.0042577 , 71.62005122, 71.93888794, 72.82742838, 73.34118602,
       71.67337676, 71.81832273, 72.46921444, 72.97728701, 72.74594054,
       72.0488022 , 72.26585015, 71.15384361, 71.44202641, 71.96041397,
       71.45420817, 70.45820198, 73.71042643, 71.74471896, 72.2865291 ,
       73.7530494 , 71.46169818, 71.75944479, 72.2036414 , 72.1213944 ,
       70.95424586, 73.19851895, 71.72092859, 72.41434657, 71.17098567,
       72.14949013, 74.12547337, 72.3632211 , 72.14490172, 71.03

In [74]:
d_s

array([59.3555291 , 59.15880733, 59.56636432, 59.0291884 , 59.41714169,
       59.65648267, 59.17093113, 59.46151861, 59.67520627, 59.67347282,
       59.37387476, 59.39795381, 59.10744768, 59.20903236, 59.38796961,
       59.55523366, 58.73146925, 59.60617464, 59.24020621, 59.82529708,
       59.89901851, 59.64740051, 59.22911571, 59.58409181, 59.45288274,
       59.25405935, 59.76904106, 59.49432556, 59.39797396, 60.05117013,
       59.3990291 , 59.3298066 , 59.34090805, 59.2594006 , 59.33648323,
       58.89882726, 59.95820802, 59.0267523 , 59.55455046, 60.15502798,
       59.33859814, 60.09819758, 59.57252323, 60.19668251, 59.65667981,
       59.31958269, 59.77922038, 59.106562  , 58.53774754, 59.95528178,
       59.54403365, 59.25543727, 60.05963733, 59.69039781, 58.91803738,
       59.69151536, 59.72013163, 58.96311212, 59.87045949, 59.73771101,
       59.89175865, 59.90378848, 58.86897738, 59.77997875, 58.7364238 ,
       60.02005061, 59.98098495, 59.2943084 , 59.56368184, 59.33

In [242]:
def circular_moving_average(signal, hw):
        """循环移动平均，hw为半宽"""
        n_sig = len(signal)
        smoothed = np.zeros_like(signal)
        
        for i in range(n_sig):
            # 循环索引：i-hw 到 i+hw
            indices = np.arange(i - hw, i + hw + 1) % n_sig
            smoothed[i] = np.mean(signal[indices])
        
        return smoothed
def riemannian_distance_optimized(x, y):
    n = len(x)
    
    # 1. 计算功率谱
    X = np.fft.fft(np.conjugate(x))
    Y = np.fft.fft(np.conjugate(y))
    Px = np.abs(X)**2 / n
    Py = np.abs(Y)**2 / n
    # Px=Px.ravel()
    # Py=Py.ravel()
    # print(Px,Py)
    half_width = int(np.round(n**0.8 / (4 * np.pi)))
    # print(half_width)
    Px_smooth = circular_moving_average(Px, half_width)
    Py_smooth = circular_moving_average(Py, half_width)
    # print(Px_smooth,Py_smooth)
    # 4. 取对数并计算距离
    eps = 1e-10
    Lx = np.log(np.maximum(Px_smooth, eps))
    Ly = np.log(np.maximum(Py_smooth, eps))
    
    D = np.sqrt(np.sum((Lx - Ly)**2))
    return D

In [246]:
def standard_distance():
    core=MIGCore()
    r1=generate_real_Toeplitz_covariance(1024)
    r2=generate_real_Toeplitz_covariance(1024)
    R1=core.build_toeplitz_matrix(r1)
    R2=core.build_toeplitz_matrix(r2)
    x=generate_samples(R1,1)
    y=generate_samples(R2,1)
    x=x.ravel()
    y=y.ravel()
    print("Start calculation")
    
    corr_x,s1=core.generate_covariance_matrix(x,True)
    corr_y,s2=core.generate_covariance_matrix(y,True)
    # print("symbol:",s1,s2)
    d=riemannian_distance(corr_x, corr_y)
    print(f"Riemannian distance: {d}")
    # lx=riemannian_distance_smooth_fixed_fast(x)
    # ly=riemannian_distance_smooth_fixed_fast(y)
    d_approx = riemannian_distance_optimized(x, y)
    print(f"Approximate distance: {d_approx}")
    return d, d_approx

In [None]:
d,d_a=standard_distance()

In [245]:
test_vector=[np.random.rand() + 1j*np.random.rand() for _ in range(1024)]
core=MIGCore()
corr=core.generate_covariance_matrix(test_vector,False)
print(corr)

(array([[0.65311972-9.84049100e-19j, 0.49011309-2.74514904e-05j,
        0.49086223+2.67071661e-03j, ..., 0.00174368+2.06840542e-04j,
        0.00068737+9.04157233e-04j, 0.00082861+3.66525181e-04j],
       [0.49011309+2.74514904e-05j, 0.65311972-9.84049100e-19j,
        0.49011309-2.74514904e-05j, ..., 0.00230159+9.06113199e-04j,
        0.00174368+2.06840542e-04j, 0.00068737+9.04157233e-04j],
       [0.49086223-2.67071661e-03j, 0.49011309+2.74514904e-05j,
        0.65311972-9.84049100e-19j, ..., 0.00218823+2.38921093e-04j,
        0.00230159+9.06113199e-04j, 0.00174368+2.06840542e-04j],
       ...,
       [0.00174368-2.06840542e-04j, 0.00230159-9.06113199e-04j,
        0.00218823-2.38921093e-04j, ..., 0.65311972-9.84049100e-19j,
        0.49011309-2.74514904e-05j, 0.49086223+2.67071661e-03j],
       [0.00068737-9.04157233e-04j, 0.00174368-2.06840542e-04j,
        0.00230159-9.06113199e-04j, ..., 0.49011309+2.74514904e-05j,
        0.65311972-9.84049100e-19j, 0.49011309-2.74514904e-05j

In [233]:
X=np.fft.fft(np.conjugate(test_vector))
X

array([ 5.30442165e+02-5.26526629e+02j,  1.77331998e+00+1.18508989e-01j,
       -2.36375383e-01+5.87010034e+00j, ...,
        9.58872771e-01+1.39495292e+00j, -2.32461623e+01+5.02265704e+00j,
       -4.83449978e+00-1.68311813e+00j], shape=(1024,))

In [234]:
np.abs(X)**2/len(X)

array([5.45507013e+02, 3.08467591e-03, 3.37050306e-02, ...,
       2.79817445e-03, 5.52354635e-01, 2.55910886e-02], shape=(1024,))

In [227]:
r1=generate_real_Toeplitz_covariance(1024)
r2=generate_real_Toeplitz_covariance(1024)
R1=core.build_toeplitz_matrix(r1)
R2=core.build_toeplitz_matrix(r2)
x=generate_samples(R1,1)
y=generate_samples(R2,1)

In [236]:
x=x.ravel()

In [237]:
core=MIGCore()
corr_x,s=core.generate_covariance_matrix(x,False)
print(s)

[1.8162998  0.74911289 1.65563261 ... 0.15297995 0.41306178 1.63226973]


In [238]:
X=np.fft.fft(np.conjugate(x))
X

array([ 37.62345697-21.08000201j, -12.88833034-24.51494534j,
       -31.3461015 -26.69812182j, ...,   2.10087411+12.33846824j,
       -14.30625184-14.77519624j, -40.78452351 -2.84021873j],
      shape=(1024,))

In [239]:
np.abs(X)**2/1024

array([1.8162998 , 0.74911289, 1.65563261, ..., 0.15297995, 0.41306178,
       1.63226973], shape=(1024,))