In [8]:
import pandas as pd
import numpy as  np
import matplotlib.pyplot as plt
np.set_printoptions(linewidth=200)

# Synthetic Data Generation

In [9]:
# Synthetic problem
r = np.random.random((12,3))
s = np.random.random((40,3))

q = np.sqrt(np.sum((np.expand_dims(r,1) - np.expand_dims(s,0))**2,axis=2))
offsets_gt = np.random.rand(1,q.shape[1])
q = q + offsets_gt + 0.000*np.random.randn(*q.shape)
outlier_idx = np.random.rand(*q.shape) < 0.00
q[outlier_idx] = np.random.rand(*q.shape)[outlier_idx]*3

tdoa_df = pd.DataFrame(q)
tdoa = tdoa_df.to_numpy()
tdoa_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,30,31,32,33,34,35,36,37,38,39
0,1.200553,1.076373,1.059106,1.702169,1.408859,1.400301,0.930424,1.194525,1.422139,0.897111,...,0.458133,0.666299,0.724654,1.700575,1.451224,0.586032,0.722754,1.265334,0.999576,0.861557
1,0.517995,1.310372,1.676846,1.163421,1.947735,1.292141,1.349088,0.573097,1.414983,0.652104,...,1.002994,1.041269,1.148931,0.99671,1.388551,1.315638,0.533962,1.875038,0.399897,1.274592
2,1.063105,0.904866,1.581838,1.448262,1.731329,1.196379,1.356207,0.953245,1.668128,0.489119,...,0.934137,0.9557,1.340853,1.364161,1.304034,1.223787,0.371809,1.649682,0.840179,1.303545
3,0.916019,1.395158,1.335705,1.590263,1.711955,1.50498,1.021204,1.064229,1.211054,1.004954,...,0.64783,0.832424,0.402733,1.577473,1.540834,0.877334,0.835179,1.591476,0.820791,0.926388
4,1.059301,1.084952,1.152801,1.614017,1.416375,1.350874,0.987689,1.052801,1.34209,0.853124,...,0.411393,0.585954,0.659211,1.559356,1.339209,0.710513,0.663936,1.297895,0.85564,0.911803
5,0.945427,1.201465,1.180418,1.203711,1.948308,1.119315,0.714668,1.097034,1.123655,0.554074,...,0.995504,1.11718,0.981014,1.604826,1.605169,0.824083,0.502206,1.834472,0.868251,0.650517
6,0.706287,1.233344,1.691376,1.414213,1.745882,1.370585,1.441869,0.562115,1.539746,0.758898,...,0.830378,0.847309,1.124095,0.934368,1.210394,1.314857,0.588316,1.679289,0.47542,1.36778
7,1.11444,1.131682,1.484693,1.242688,2.072673,1.124643,1.091108,1.183368,1.551935,0.305594,...,1.214309,1.285074,1.397463,1.638282,1.669391,1.142406,0.419325,1.969704,1.007186,1.050683
8,1.035582,1.050712,1.630384,1.651049,1.452505,1.421794,1.463552,0.877059,1.689608,0.825025,...,0.637781,0.633688,1.156141,1.283887,1.093094,1.234063,0.630205,1.387786,0.772904,1.397083
9,1.266908,1.275491,1.004205,1.552409,1.952718,1.36843,0.422094,1.391111,1.348393,0.806127,...,1.024954,1.196504,1.002325,1.90529,1.796359,0.590546,0.748431,1.806867,1.167256,0.396668


# Helper Functions ans Classes

In [None]:
class UvaboSolution():

    def __init__(self, n_receivers, n_senders, dim=3):
        u = np.empty((3, n_receivers))
        u[:] = np.nan
        self.u = u

        v = np.empty((3, n_senders))
        v[:] = np.nan
        self.v = v

        a = np.empty((n_receivers,1))
        a[:] = np.nan
        self.a = a

        b = np.empty((1, n_senders))
        b[:] = np.nan
        self.b = b

        o = np.empty((1,n_senders))
        o[:] = np.nan
        self.o = o

from enum import Enum
from abc import ABC, abstractmethod

def init_uvab(d):
    dsquare = d**2
    doubleCompaction = dsquare - dsquare[0:1,:] - dsquare[:,0:1] + dsquare[0,0]
    uu,ss,vv = np.linalg.svd(doubleCompaction/-2)
    u =uu[:,:3].T
    v = np.diag(ss[:3])@vv[:3,:]
    a = dsquare[:,0:1] - dsquare[0,0]/2
    b = dsquare[0:1,:] - dsquare[0,0]/2
    return u,v,a,b

class TxoaType(Enum):
    TOA = 1
    TDOA = 2
    COTDOA = 3

class TxoaProblem():
    def __init__(self, data, problem_type=TxoaType.TDOA, dim=3):
        self.data = data
        self.sol = UvaboSolution(*data.shape, dim=dim)
        self.problem_type = problem_type
        self.dim = dim

    def solve_for_offset(self, solver, tol=0.01):   
        #sample_subset = lambda : self.data[np.random.permutation(self.data.shape[0])[:solver.get_needed_receivers()],np.random.permutation(self.data.shape[1])[:solver.get_needed_senders()]]

        outer_ransac_iters = 100
        most_inliers = -1
        best_sol = None
        for _ in range(outer_ransac_iters):
            mics_choice = np.random.permutation(self.data.shape[0])[:solver.get_needed_receivers()]
            sound_choice = np.random.permutation(self.data.shape[1])[:solver.get_needed_senders()]
            data = self.data[mics_choice][:,sound_choice]

            offsets = solver.solve(data)
            d = data - offsets

            u,v,a,b = init_uvab(d)

            cur_solution = UvaboSolution(*self.data.shape,dim=self.dim)

            cur_solution.u[:,mics_choice] = u
            cur_solution.v[:,sound_choice] = v
            cur_solution.a[mics_choice] = a
            cur_solution.b[:,sound_choice] = b
            cur_solution.o[:,sound_choice] = offsets
            cur_problem = TxoaProblem(self.data, problem_type=self.problem_type, dim=self.dim)
            cur_problem.sol = cur_solution

            cur_problem.ransac_expand_to_all_cols()

            res = -2*cur_solution.u.T@cur_solution.v + cur_solution.a + cur_solution.b - (self.data - cur_solution.o)**2

            if np.sum(np.abs(res) < tol) > most_inliers:
                most_inliers = np.sum(np.abs(res) < tol)
                best_sol = cur_solution
        self.sol = best_sol

    def ransac_expand_col(self, new_sound_idx, ransac_iter=10,tol=0.01):
        needed_eqs = self.dim + 2
        most_inliers = -1
        best_sol = None
        known_mics = np.argwhere(np.logical_not(np.isnan(self.sol.a)))[:,0]

        for _ in range(ransac_iter):
            mic_local_choice = np.random.permutation(known_mics.shape[0])[:needed_eqs]
            M = np.concatenate([2*self.data[known_mics[mic_local_choice],new_sound_idx:new_sound_idx+1],
                                -np.ones((needed_eqs,1)),
                                -2*self.sol.u[:,known_mics[mic_local_choice]].T,
                                np.ones((needed_eqs,1))
                                ],axis=1) # variable order is [o_j, o_j^2, v_j, b_j]
            B = self.data[known_mics[mic_local_choice],new_sound_idx:new_sound_idx+1]**2 - self.sol.a[known_mics[mic_local_choice]]
            if np.any(np.isnan(M)) or np.any(np.isnan(B)):
                print("O no! found none")
                continue
            x_part,_,_,_ = np.linalg.lstsq(M,B,rcond=None)
            x_hom = np.array([0,1,*([0]*self.dim),1])
            x_hom = np.expand_dims(x_hom,1)
            res = x_part[0]**2 - x_part[1]
            x = x_hom*res + x_part 

            o_new = x[0]
            v_new = x[2:5]
            b_new = x[5]

            lh = np.expand_dims((self.data[known_mics,new_sound_idx] - o_new)**2,axis=1)
            rh = -2*self.sol.u[:,known_mics].T@v_new+self.sol.a[known_mics]+b_new

            res = lh - rh
            if np.sum(np.abs(res) < tol) > most_inliers:
                most_inliers = np.sum(np.abs(res) < tol)
                best_sol = (o_new[0], v_new[:,0], b_new[0])
        self.sol.o[0,new_sound_idx] = best_sol[0]
        self.sol.v[:,new_sound_idx] = best_sol[1]
        self.sol.b[0,new_sound_idx] = best_sol[2]

    def ransac_expand_row(self, new_mic_idx, ransac_iter=10,tol=0.01):
        
        known_sounds = np.argwhere(np.logical_not(np.isnan(self.sol.b)))[:,1]
        needed_eqs = self.dim + 1
        most_inliers = -1
        best_sol = None
        for _ in range(ransac_iter):
            sound_choice = known_sounds[np.random.permutation(known_sounds.shape[0])[:needed_eqs]]
            M = np.concatenate([-2*self.sol.v[:,sound_choice].T,
                                np.ones((needed_eqs,1))
                                ],axis=1) # variable order is [o_j, o_j^2, v_j, b_j]
            B = (self.data[new_mic_idx,sound_choice] - self.sol.o[0,sound_choice])**2 - self.sol.b[0,sound_choice]
            #B = tdoa[mics_choice[mic_local_choice],new_sound_idx:new_sound_idx+1]**2 - a[mic_local_choice] 
            if np.any(np.isnan(M)) or np.any(np.isnan(B)):
                print("O no! found none")
                continue
            x = np.linalg.solve(M,B)

            u_new = np.expand_dims(x[:3],axis=1)
            a_new = x[3]

            lh = np.expand_dims((self.data[new_mic_idx,known_sounds] - self.sol.o[:,known_sounds])**2,axis=1)
            rh = -2*u_new.T@self.sol.v[:,known_sounds]+a_new+self.sol.b[:,known_sounds]

            res = lh - rh
            if np.sum(np.abs(res) < tol) > most_inliers:
                most_inliers = np.sum(np.abs(res) < tol)
                best_sol = (u_new, a_new)
        self.sol.u[:,new_mic_idx] = best_sol[0][:,0]
        self.sol.a[new_mic_idx] = best_sol[1]

    def ransac_expand_to_all_cols(self, ransac_iter=10, tol=0.01):
        known_sounds = np.argwhere(np.logical_not(np.isnan(self.sol.b)))[:,1]
        for new_sound in np.setdiff1d(np.arange(self.data.shape[1]),known_sounds):
            self.ransac_expand_col(new_sound,ransac_iter=ransac_iter, tol=tol)
    
    def ransac_expand_to_all_rows(self, ransac_iter=10, tol=0.01):
        known_mics = np.argwhere(np.logical_not(np.isnan(self.sol.a)))[:,0]
        for new_mic in np.setdiff1d(np.arange(self.data.shape[0]),known_mics):
            self.ransac_expand_row(new_mic,ransac_iter=ransac_iter, tol=tol)
        
    def get_residuals(self):
        return (-2*self.sol.u.T@self.sol.v + self.sol.a + self.sol.b) - (self.data - self.sol.o)**2


class OffsetSolver(ABC):

    @abstractmethod
    def get_needed_receivers():
        pass

    @abstractmethod
    def get_needed_senders():
        pass

    @abstractmethod
    def solve(data):
        pass
    
class OffsetSolver95(OffsetSolver):

    def get_needed_receivers():
        return 9
    
    def get_needed_senders():
        return 5
    
    def solve(data):
        zsquared = data ** 2
        A = np.concatenate([zsquared[:,1:] - zsquared[:,0:1], -2*data[:,1:], 2*data[:,0:1]],axis=1)
        u = np.linalg.solve(A, np.ones(9))
        sols = np.concatenate([u[-1:]/np.sum(u[:4]),u[4:-1]/u[:4]],axis=0)
        return sols


# Run System

In [11]:
tp = TxoaProblem(tdoa)
tp.solve_for_offset(OffsetSolver95)
tp.ransac_expand_to_all_rows()
tp.get_residuals()

NameError: name 'problem_dim' is not defined