This python notebook uses some ideas of the paper **Fitting helices to data by total least squares** by *Yves Nievergelt*.
The method to determine if a set of points in space fits in a helix is shown below

In [1]:
import numpy as np

### Data points
The data considered here consists of p points $$\vec{x_1}, ...,\vec{x_p}$$

In [2]:
x = np.array([(62,397,103),(82,347,107),(93,288,120),
     (94,266,128),(65,163,169),(12,102,198),
     (48,138,180),(77,187,157),(85,209,149),(89,316,112)])

### 1. Estimating the degree of colinearity of data

#### 1.1 Compute the average
$$\bar{\vec{x}}:= (1/p)\sum_{j=1}^{p}\vec{x_j}$$

In [3]:
xm = np.mean(x)

#### 1.2 Form the matrix
$$X\in\mathbb{M}_{p\times{3}}$$

In [4]:
X = (x-xm).T

#### 1.3 Compute the singular values of X
$$\sigma_1\geqslant\sigma_2\geqslant\sigma_3\geqslant0$$
and the corresponding orthonormal vectors
$$\vec{v_1},\vec{v_2},\vec{v_3}\in\mathbb{R}^3$$

In [5]:
v, s, t = np.linalg.svd(X,full_matrices=True)

In [6]:
sigma1 = s[0]
sigma2 = s[1]
sigma3 = s[2]
v1 = t[0]
v2 = t[1]
v3 = t[2]

##### If $$\sigma_2>\sigma_3\geqslant0$$ 
the plane of total least squares satisfies the equation $$\langle\vec{x}-\bar{\vec{x}},\vec{v_3}\rangle=0$$
in particular if $$\sigma_3=0$$
all data lie in that plane 

#####  if
$$\sigma_1>\sigma_2=0=\sigma_3$$
then all data lie in a straight line

##### Similarly, if
$$\sigma_1=\sigma_2=\sigma_3=0$$
all the data coalesce at one point

### 2. Fitting the axis and the radius of the helix

#### 2.1 Fitting a quadric surface to the data

Each affine quadric surface satisfies the equation $$F(S;\vec{x})=0$$ defined by a quadratic
form $$F(S;\vec{x})=(\bar{x}^T,1).S.(\bar{x}^T,1)^T=\begin{bmatrix}x_1 & x_2 & x_3 & 1\end{bmatrix}\begin{bmatrix}
    s_{11} & s_{12} & s_{13} & s_{1} \\
    s_{12} & s_{22} & s_{23} & s_{2} \\
    s_{13} & s_{23} & s_{33} & s_{3} \\
    s_{1} & s_{2} & s_{3} & s_{44}
\end{bmatrix}\begin{bmatrix} x_1 \\ x_2 \\ x_3 \\ 1\end{bmatrix}$$

This algorithm will determine the matrix or matrices S minimizing the total least-squares objective
$$G(S):=\sum_{j=1}^{p}[F(S;\vec{x_j})]^2$$
subject to the constraint that
$$\sum_{k=1}^{4}\sum_{\ell=k}^4|{S_{k,\ell}}|^2=1$$
Computationally, arrange matrix S in one-dimensional vector in lexicographic order:
$$S=\vec{S}:=(s_{11}, s_{12}, s_{13}, s_{14}; s_{22}, s_{23}, s_{24}; s_{33}, s_{34}; s_{44})$$
Similarly, for each point x, form the vector z:
$$\vec{z}:=(x_{1}^3,2x_{1}x_{2},2x_{1}x_{3},2x_{1};x_{2}^2,2x_{2}x_{3},2x_{2};x_{3}^2,2x_{3}; 1)$$

#### 2.1.1 Form the matrix Z

In [7]:
Z = np.zeros((x.shape[0],10), np.float32)
Z[:,0] = x[:,0]**2
Z[:,1] = 2*x[:,0]*x[:,1]
Z[:,2] = 2*x[:,0]*x[:,2]
Z[:,3] = 2*x[:,0]
Z[:,4] = x[:,1]**2
Z[:,5] = 2*x[:,1]*x[:,2]
Z[:,6] = 2*x[:,1]
Z[:,7] = x[:,2]**2
Z[:,8] = 2*x[:,2]
Z[:,9] = 1

#### 2.1.2 Compute the smallest singular value and the corresponding right-singular vectors of the matrix Z

In [8]:
v, s, t = np.linalg.svd(Z,full_matrices=True)
smallest_value = np.min(np.array(s))
smallest_index = np.argmin(np.array(s))
T = np.array(t)
T = T[smallest_index,:]
S = np.zeros((4,4),np.float32)
S[0,0] = T[0]
S[0,1] = S[1,0] = T[1]
S[0,2] = S[2,0] = T[2]
S[0,3] = S[3,0] = T[3]
S[1,1] = T[4]
S[1,2] = S[2,1] = T[5]
S[1,3] = S[3,1] = T[6]
S[2,2] = T[7]
S[2,3] = S[3,2] = T[8]
S[3,3] = T[9]
norm = np.linalg.norm(np.dot(Z,T), ord=2)**2
print(norm)

6.238565749084928e-12


##### The norm value near zero shows that x points above are fitted in a quadric surface of a cylinder or helix.

In [30]:
print(S)
ones = np.reshape(np.transpose(np.repeat(1, len(x))), (10, 1))
x_ap = np.append(x, ones, axis=1)
print(np.matmul(np.matmul(x_ap[1], S), x_ap[1]))

[[ 1.5326255e-04 -3.8315637e-05 -3.8315637e-05  3.2070186e-02]
 [-3.8315637e-05  9.5789088e-05 -7.6631273e-05 -1.6552355e-02]
 [-3.8315637e-05 -7.6631273e-05  9.5789088e-05  5.1726104e-04]
 [ 3.2070186e-02 -1.6552355e-02  5.1726104e-04  9.9934840e-01]]
-9.113864507526159e-07


In [9]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os

from trackml.dataset import load_event, load_dataset
from trackml.score import score_event

In [10]:
# Change this according to your directory preferred setting
from data_path import DATA_PATH
path_to_train = DATA_PATH + "/train_100_events/"

In [11]:
# This event is in Train_1
event_prefix = "event000001000"

In [13]:
hits, cells, particles, truth = load_event(os.path.join(path_to_train, event_prefix))

In [14]:
from sklearn.preprocessing import StandardScaler
import hdbscan
from scipy import stats
from tqdm import tqdm

class Clusterer(object):
    def __init__(self,rz_scales=[0.65, 0.965, 1.428]):                        
        self.rz_scales=rz_scales
    
    def _eliminate_outliers(self,labels,M):
        norms=np.zeros((len(labels)),np.float32)
        indices=np.zeros((len(labels)),np.int32)
        for i, cluster in tqdm(enumerate(labels),total=len(labels)):
            if cluster == 0:
                continue
            index = np.argwhere(self.clusters==cluster)
            x = M[index]
            indices[i] = len(index)
            local_mask = np.ones((len(index)),np.bool)
            norms[i] = self._test_quadric(x)
        threshold = np.percentile(norms,90)*4
        for i, cluster in tqdm(enumerate(labels),total=len(labels)):
            if norms[i] > threshold:
                self.clusters[self.clusters==cluster]=0            
    def _test_quadric(self,x):
        if len(x.shape)==3:
            x = np.reshape(x,(x.shape[0],3))
        Z = np.zeros((x.shape[0],10), np.float32)
        Z[:,0] = x[:,0]**2
        Z[:,1] = 2*x[:,0]*x[:,1]
        Z[:,2] = 2*x[:,0]*x[:,2]
        Z[:,3] = 2*x[:,0]
        Z[:,4] = x[:,1]**2
        Z[:,5] = 2*x[:,1]*x[:,2]
        Z[:,6] = 2*x[:,1]
        Z[:,7] = x[:,2]**2
        Z[:,8] = 2*x[:,2]
        Z[:,9] = 1
        v, s, t = np.linalg.svd(Z,full_matrices=False)        
        smallest_index = np.argmin(np.array(s))
        T = np.array(t)
        T = T[smallest_index,:]        
        norm = np.linalg.norm(np.dot(Z,T), ord=2)**2
        return norm

    def _preprocess(self, hits):
        
        x = hits.x.values
        y = hits.y.values
        z = hits.z.values

        r = np.sqrt(x**2 + y**2 + z**2)
        hits['x2'] = x/r
        hits['y2'] = y/r

        r = np.sqrt(x**2 + y**2)
        hits['z2'] = z/r

        ss = StandardScaler()
        X = ss.fit_transform(hits[['x2', 'y2', 'z2']].values)
        for i, rz_scale in enumerate(self.rz_scales):
            X[:,i] = X[:,i] * rz_scale

        
        return X
    
    def predict(self, hits):        
        volumes = np.unique(hits['volume_id'].values)
        X = self._preprocess(hits)
        self.clusters = np.zeros((len(X),1),np.int32)
        max_len = 1
        cl = hdbscan.HDBSCAN(min_samples=1,min_cluster_size=7,
                             metric='braycurtis',cluster_selection_method='leaf',algorithm='best', leaf_size=50)
        self.clusters = cl.fit_predict(X)+1
        labels = np.unique(self.clusters)
        n_labels = 0
        while n_labels < len(labels):
            n_labels = len(labels)
            self._eliminate_outliers(labels,X)
            max_len = np.max(self.clusters)
            self.clusters[self.clusters==0] = cl.fit_predict(X[self.clusters==0])+max_len
            labels = np.unique(self.clusters)
        return self.clusters

In [16]:
def create_one_event_submission(event_id, hits, labels):
    sub_data = np.column_stack(([event_id]*len(hits), hits.hit_id.values, labels))
    submission = pd.DataFrame(data=sub_data, columns=["event_id", "hit_id", "track_id"]).astype(int)
    return submission

In [17]:
path_to_test = "../input/test"
test_dataset_submissions = []

create_submission = True # True for submission 
if create_submission:
    for event_id, hits, cells in load_dataset(path_to_test, parts=['hits', 'cells']):

        # Track pattern recognition 
        model = Clusterer()
        labels = model.predict(hits)

        # Prepare submission for an event
        one_submission = create_one_event_submission(event_id, hits, labels)
        test_dataset_submissions.append(one_submission)
        
        print('Event ID: ', event_id)

    # Create submission file
    submission = pd.concat(test_dataset_submissions, axis=0)
    submission.to_csv('submission.csv', index=False)