In [None]:
import sympy as sp
import numpy as np
import pandas as pd
import scipy.signal as ss
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# core functions

In [None]:
# interrogating signal
def g_signal(t, w=2*np.pi):
    return (np.cos(w * t) -
            np.cos(1.2 * w * t) +
            np.cos(1.4 * w * t) -
            np.cos(1.6 * w * t) +
            np.cos(1.8 * w * t) -
            np.cos(2.0 * w * t))

# gaussian mixture model scatterer
def s_gmm(p, t):
    a1, l1, l2, s = p
    g1 = a1 * np.exp(-0.5 * ((t - l1) / s)**2)
    g2 = (1. - a1) * np.exp(-0.5 * ((t - l2) / s)**2)
    s_n = g1 + g2
    return s_n / np.sum(s_n)

# "delta function" scatterer
# delta functions don't work well with derivatives, jacobians, et.
# so we use very narrow gaussians to approximate them
def s_delta(p, t, s=3e-4):
    a1, a2, a3, l1, l2, l3 = p
    g1 = a1 * np.exp(-0.5 * ((t - l1) / s)**2)
    g2 = a2 * np.exp(-0.5 * ((t - l2) / s)**2)
    g3 = a3 * np.exp(-0.5 * ((t - l3) / s)**2)
    s_n = g1 + g2 + g3
    return s_n / np.sum(s_n)

# "delta function" scatterer with only two deltas rather than three
def s_delta_2(p, t, s=3e-4):
    a1, a2, l1, l2 = p
    g1 = a1 * np.exp(-0.5 * ((t - l1) / s)**2)
    g2 = a2 * np.exp(-0.5 * ((t - l2) / s)**2)
    s_n = g1 + g2
    return s_n / np.sum(s_n)

# finds and returns the moments of the scatterer distribution
def get_s_moments(func, p, t):
    s_n         = func(p, t)
    area        = np.sum(s_n)
    mean        = np.sum(t * s_n) / area
    var         = np.sum(((t - mean)**2) * s_n) / area
    std         = np.sqrt(var)
    skew        = np.sum((((t - mean) / std)**3) * s_n) / area
    kurt        = np.sum((((t - mean) / std)**4) * s_n) / area
    return np.array([mean, std, skew, kurt])

# returns an array representing the impact of each parameter on r_n
def get_dr_dtheta(func, g_n, p, t, epsilon=1e-14):
    num_params  = len(p)
    dr_dtheta   = []
    for i in range(num_params):
        p_plus      = np.array(p, dtype=float)
        p_minus     = np.array(p, dtype=float)
        d_p         = np.abs(p[i]) * epsilon if p[i] != 0 else epsilon
        # d_p         = epsilon
        p_plus[i]  += d_p
        p_minus[i] -= d_p
        s_n_plus    = func(p_plus, t)
        s_n_minus   = func(p_minus, t)
        r_n_plus    = ss.convolve(s_n_plus, g_n, mode='same')
        r_n_minus   = ss.convolve(s_n_minus, g_n, mode='same')
        derivative  = (r_n_plus - r_n_minus) / (2 * d_p)
        dr_dtheta.append(derivative)
    return dr_dtheta

# fisher information matrix for parameters theta given return signal, noise, etc.
def calculate_fim_theta(r_n, dr_dtheta, theta, snr_db=0):
    signal_power    = np.mean(r_n**2)
    snr_linear      = 10**(snr_db / 10.0)
    noise_var       = signal_power / snr_linear
    fim             = np.zeros((len(theta), len(theta)))
    for i in range(len(theta)):
        for j in range(i, len(theta)):
            dot_product = dr_dtheta[i] * dr_dtheta[j]
            fim[i, j]   = (1.0 / noise_var) * np.sum(dot_product)
            fim[j, i]   = fim[i, j]
    return fim

# transform matrix that turns the basis of parameters theta into the basis of moments
def get_jacobian(func, p, t, epsilon=1e-14):
    jacobian    = np.zeros((4, len(p)))
    for i in range(len(p)):
        p_plus          = p.copy()
        p_minus         = p.copy()
        d_p             = np.abs(p[i]) * epsilon if p[i] != 0 else epsilon
        p_plus[i]      += d_p
        p_minus[i]     -= d_p
        moments_plus    = get_s_moments(func, p_plus, t)
        moments_minus   = get_s_moments(func, p_minus, t)
        jacobian[:, i]  = (moments_plus - moments_minus) / (2 * d_p)
    return jacobian

# create database entries

In [None]:
folder      = '/your/folder/here/'
f_low       = 0
db          = pd.read_csv(folder + f'db_delta_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv')
db2         = pd.read_csv(folder + f'db_gmm_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv')
savefile    = folder + f'db_delta_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv'
savefile2   = folder + f'db_gmm_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv'

In [None]:
def find_all_crlb_for_database(db, delta=True):
    samp_rate           = 1000
    dt                  = 1/samp_rate
    t                   = np.arange(-2.5, 2.5, dt)
    t_g                 = np.arange(-5, 5, dt)
    g_n                 = g_signal(t_g)
    snr_db              = 20
    epsilon             = 1e-8

    if delta:
        if db['amp_3'] == 0.:
            func        = s_delta_2
            params = {'amp1': db['amp_1'], 'amp2': db['amp_2'], 'loc1': db['loc_1'], 'loc2': db['loc_2']}
        else:
            func        = s_delta
            params = {'amp1': db['amp_1'], 'amp2': db['amp_2'], 'amp3': db['amp_3'],
                            'loc1': db['loc_1'], 'loc2': db['loc_2'], 'loc3': db['loc_3']}
    else:
        func            = s_gmm
        params     = {'amp1': db['w1'], 'loc1':db['mu1'], 'loc2': db['mu2'], 'sigma': db['sigma']}
    theta               = np.array(list(params.values()))
    s_n                 = func(theta, t)
    dr_dtheta           = get_dr_dtheta(func, g_n, theta, t)
    jacobian            = get_jacobian(func, theta, t, epsilon=epsilon)
    r_n                 = ss.convolve(s_n, g_n, mode='same')
    fim_theta           = calculate_fim_theta(r_n, dr_dtheta, theta, snr_db)

    try:
        crlb_theta      = np.linalg.inv(fim_theta)
        crlb_moments    = jacobian @ crlb_theta @ jacobian.T
        crlb            = np.sqrt(np.diag(crlb_moments))
        return crlb
    except np.linalg.LinAlgError:
        print(params)
        return [np.nan for _ in range(4)]

In [None]:
db[['mean_crlb', 'std_crlb', 'skew_crlb', 'kurtosis_crlb']] = db.apply(lambda x:
                                                                           find_all_crlb_for_database(x, delta=True),
                                                                           axis=1,
                                                                           result_type='expand')

db2[['mean_crlb', 'std_crlb', 'skew_crlb', 'kurtosis_crlb']] = db2.apply(lambda x:
                                                                             find_all_crlb_for_database(x, delta=False),
                                                                             axis=1,
                                                                             result_type='expand')

In [None]:
savefile    = folder + f'db_delta_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv'
savefile2   = folder + f'db_gmm_snr10_complex_w_preds_{f_low}to{f_low+1}_0.csv'

db.to_csv(savefile)
db2.to_csv(savefile2)