In [1]:
import numpy as np
import matplotlib.pyplot as plt
import em_dtools
from functools import partial

In [2]:
dist_ratio = 0.5

In [3]:
Num_sensors1 = 5
Num_emitters1 = 1
sample_size1 = 100
theta1_rad = [0.5] # Направление прибытия (DOA) в радианах
theta1_deg = np.rad2deg(theta1_rad[0]) # Направление прибытия (DOA) в градусах
GN_1 = 4.1*np.ones(Num_sensors1, dtype=np.float64) # Ковариация шума
A1 = np.exp(-2j * np.pi * dist_ratio * np.arange(Num_sensors1).reshape(-1,1) * np.sin(theta1_rad).reshape(1,-1)) # Матрица управляющих векторов
# Генерация векторов сигнала, шума и принятого сигнала
S1 = em_dtools.generate_deterministic_signals(Num_emitters1, sample_size1)
n1 = em_dtools.generate_stochastic_signals(Num_sensors1, sample_size1, GN_1)
X1 = (A1 @ S1.T + n1.T).T

In [4]:
X1_with_mv = em_dtools.MCAR(X1, [2,4], [80, 80])

In [5]:
X1_with_mv

array([[ 3.21844286+1.5756985j ,  0.38368948-6.3414772j ,
        -3.20467871-0.85138846j, -1.02096124+0.03950756j,
         1.6429624 -0.59566567j],
       [-0.49641828+1.15422673j,  1.89258608-1.26667046j,
                nan+0.j        , -1.694435  +2.68083036j,
                nan+0.j        ],
       [-0.22212836+1.92167596j, -0.52818042-2.66210647j,
                nan+0.j        , -3.23793318+1.23476116j,
                nan+0.j        ],
       [-0.85328295+0.19292834j,  1.58227543-1.3354218j ,
                nan+0.j        , -2.70980008+0.36898037j,
                nan+0.j        ],
       [ 0.41159832+1.37836579j,  0.56645517+0.1685884j ,
                nan+0.j        , -1.0354819 -0.25384136j,
                nan+0.j        ],
       [ 1.39026189+2.95742298j,  0.85550669+1.19858533j,
                nan+0.j        , -1.52347154-0.44442704j,
                nan+0.j        ],
       [-1.70658277+1.09839832j,  0.43283287+0.88457953j,
                nan+0.j        , -2.428842

In [6]:
X_clean = em_dtools.initializer(X1_with_mv,1)

In [7]:
X_clean

(array([[-0.56349519]]),
 array([[ 0.21412781-0.03169576j],
        [-0.38823604+0.62823449j],
        [ 0.85548393+0.41462091j],
        [ 0.72152084-0.03689661j],
        [ 0.34857824+0.13775215j],
        [-0.41093029-0.76780629j],
        [ 0.84544918-1.1608122j ],
        [-0.86258652+0.07190756j],
        [-0.43248673+1.00266127j],
        [ 0.68438298-0.4213601j ],
        [-0.12327676-1.18741491j],
        [-0.31824474+1.4858239j ],
        [-0.51313166+0.50689368j],
        [ 0.84977004+0.73942561j],
        [-0.06710277+0.05246984j],
        [-0.33872968+0.28459486j],
        [-0.02107577+0.43095712j],
        [ 0.8336753 +0.10916205j],
        [-0.20932858+0.10199442j],
        [ 0.51879481-0.19951788j]]))

In [8]:
#EM_theta1_rad, neglhd_1, K1, mu1 = em_dtools.multi_start_EM(X1, GN_1, 10, max_iter=50, eps=1e-6)

In [9]:
#EM_theta1_deg = np.rad2deg(EM_theta1_rad)

In [10]:
#print(f"Погрешность в смысле разности углов в градусах: {np.abs(EM_theta1_deg-theta1_deg)}.\nПогрешность в смысле разности синусов углов {np.abs(np.sin(EM_theta1_rad)-np.sin(theta1_rad))}.")

In [None]:
import numpy as np
from scipy.optimize import minimize

def steering_vector(theta_deg, m, d=0.5, wavelength=1.0):
    k = 2 * np.pi / wavelength
    theta_rad = np.radians(theta_deg)
    return np.exp(1j * k * d * np.arange(m) * np.sin(theta_rad))

def A_matrix(thetas, m, d=0.5, wavelength=1.0):
    return np.column_stack([steering_vector(theta, m, d, wavelength) for theta in thetas])

def e_step(X_obs, mask, A, S):
    X_pred = A @ S
    X_filled = X_obs.copy()
    X_filled[~mask] = X_pred[~mask]
    return X_filled

def weighted_norm(X, Q_inv_sqrt):
    """
    Compute weighted Frobenius norm: || Q^{-1/2} X ||_F
    Q_inv_sqrt is vector of 1/std deviation for each sensor (row)
    """
    return np.linalg.norm(Q_inv_sqrt[:, None] * X, 'fro')

def m_step_theta(X, S, thetas_init, m, Q_inv_sqrt, d=0.5, wavelength=1.0):
    def cost(thetas):
        A = A_matrix(thetas, m, d, wavelength)
        residual = X - A @ S
        return weighted_norm(residual, Q_inv_sqrt)**2
    bounds = [(-90, 90)] * len(thetas_init)
    res = minimize(cost, thetas_init, method='L-BFGS-B', bounds=bounds)
    return res.x

def m_step_S(X, A, Q_inv_sqrt):
    """
    Solve weighted least squares: minimize || Q^{-1/2} (X - A S) ||_F^2
    """
    # Weight X and A rows by Q_inv_sqrt
    X_w = Q_inv_sqrt[:, None] * X
    A_w = Q_inv_sqrt[:, None] * A
    # Solve least squares for each snapshot (column)
    # S = (A_w^\dagger) X_w
    return np.linalg.pinv(A_w) @ X_w

def EM_algorithm(X_obs, mask, m, r, noise_vars, max_iters=50, tol=1e-6):
    """
    noise_vars: array of noise variances per sensor (length m)
    """
    G = X_obs.shape[1]
    Q_inv_sqrt = 1.0 / np.sqrt(noise_vars)  # vector length m

    thetas = np.linspace(-30, 30, r)  # init DoA guesses
    S = np.random.randn(r, G) + 1j * np.random.randn(r, G)
    X = X_obs.copy()

    prev_cost = np.inf

    for i in range(max_iters):
        A = A_matrix(thetas, m)
        # E-step
        X = e_step(X, mask, A, S)
        # M-step theta
        thetas = m_step_theta(X, S, thetas, m, Q_inv_sqrt)
        # M-step S
        A = A_matrix(thetas, m)
        S = m_step_S(X, A, Q_inv_sqrt)

        residual = X - A @ S
        cost = weighted_norm(residual, Q_inv_sqrt)**2

        print(f"Iter {i+1}, cost: {cost:.6f}, DoAs: {thetas}")

        if abs(prev_cost - cost) < tol:
            print("Converged.")
            break
        prev_cost = cost

    return thetas, S, X