# Imports

In [1]:
import itertools
import numpy as np
from scipy.optimize import minimize
from sklearn.neighbors import KDTree
from typing import Tuple

# Binary function 

Binary function that determines if a point is in the hypersphere of a point from the candidate set

> $f(s,S') = \left\{\begin{array}{ll} 1, &  if || s'-s||_2 \leq ||s-NN_k(s,S)[-1]||_2 \\ 0, & otherwise \\\end{array}\right.$

where $NN_k(s,S)$ returns an ordered set containing s and its k-nearest neibors in the set S, in ascending order of Euclidean distancesto s. (Mordido, Meinel 2020)

In [7]:
def is_in_hypersphere(s : np.ndarray, sample: np.ndarray, k : int = 2) -> bool:

        # Error handling
        if k > len(sample):
            raise Error("k cannot be larger than sample!")

        kdt : KDTree = KDTree(sample, metric='euclidean')

        for s_ in sample:
            # get k-nearest neighbor (k + 1 since ||s_ - s_|| = 0)
            k_nearest_neighbor_distances, _ = \
                kdt.query([s_], k = k + 1, return_distance = True)
            print(k_nearest_neighbor_distances)

            # check wether s_ is closer to s than it's k-nearest neighbor
            if np.linalg.norm(s_ - s) <= np.amax(k_nearest_neighbor_distances):
                return True
        return False


def is_in_hypersphere_TEST():
    X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])

    # k = 1
    assert is_in_hypersphere(k = 1, s = np.array([-1,-1.5]), sample = X) == 1
    assert is_in_hypersphere(k = 1, s = np.array([-4, -4]), sample = X) == 0

    # k = 2
    assert is_in_hypersphere(k = 2, s = np.array([0.5, 0.5]), sample = X) == 1
    assert is_in_hypersphere(k = 2, s = np.array([0, 0]), sample = X) == 1
    assert is_in_hypersphere(k = 2, s = np.array([6, 4]), sample = X) == 0

    assert is_in_hypersphere(k = 50, s = np.array([6, 4]), sample = X) == 1

is_in_hypersphere_TEST()

[[0. 1.]]
[[0. 1.]]
[[0. 1.]]
[[0.         1.41421356]]
[[0. 1.]]
[[0. 1.]]
[[0.         1.41421356]]
[[0.         1.         2.23606798]]
[[0.         1.         2.23606798]]
[[0.         1.         2.23606798]]
[[0.         1.         1.41421356]]
[[0.         1.41421356 2.23606798]]
[[0.         1.         2.23606798]]
[[0.         1.         1.41421356]]
[[0.         1.41421356 2.23606798]]


NameError: name 'Error' is not defined

# Petersen Estimator

> The Petersen estimator (Ricker, 1975), relies on a single marking step and a single recapture step.  Itmerely assumes that the ratio of marked samples (M) in the marking step and the population size (P)is       equivalent to the ratio of recaptured samples (R) and captured samples (C) in the recapture step. Thepopulation size estimate (̂PPetersen) is then calculated as follows:"

> $\hat{P}_{Petersen}(S,S') = \frac{C(S,S')M(S,S')}{R_T(S,S')}$




In [12]:
class Petersen():
    def __init__(self, reference : np.ndarray, candidate : np.ndarray):
        self.reference = reference
        self.candidate = candidate

    is_in_hypersphere = staticmethod(is_in_hypersphere)

    def estimate(self) -> float:
        mc = lambda s_, s : len(s) + sum([is_in_hypersphere(elem, s) for elem in s_])
        r = lambda s_, s : sum([self.is_in_hypersphere(elem, s) for elem in s_]) + sum([self.is_in_hypersphere(elem, s_) for elem in s])
        return mc(self.reference, self.candidate) * mc(self.candidate, self.reference) / r(self.reference, self.candidate)

    def accuracy_loss(p_hat, p):
    # Errorhandling
    if p == 0:
        return -1
    a = abs((p_hat - p) / p)
    return 1 if a > 1 else a

def accuracy_loss_TEST():
    assert accuracy_loss(4, 5) == 0.2
    assert accuracy_loss(5, 2) == 1
    assert accuracy_loss(0, 0)

accuracy_loss_TEST()

# Schnabel Estimator
> The Petersen estimator was extended by Schnabel (1938) to incorporate multiple markings and recap-tures. The population size estimate (̂PSchnabel) is calculated fromTconsecutive Petersen estimates:
> $\hat{P}_{Schnabel}(S,S') = \frac{C_T(S,S') M_T(S,S')}{R_T(S,S')}$

with 

$M(t, S, S') = \left\{\begin{array}{ll} S \cup \{s' \in S' | f(s', S) = 1\}, & t = 1 \\ S \cup S', & t = T \\ M(1, S, S') \cup (\bigcup^{t - 1}_{i = 1} NN_k(s_i', S') ) & otherwise \\\end{array}\right.$

$C_T = (K + 1) * |S'| + \sum_{s'\in S'} \sum_{s \in S} f(s, NN_k(s', S'))$

In [22]:
class Schnabel():
    @staticmethod
    def k_nearest_neighbor_set(sample : np.ndarray, data_set : np.ndarray, kdt : KDTree, k : int, point_in_data : bool = False) -> np.ndarray:

        k_nearest_neighbor_indices = None

        if point_in_data:
            # taking the quaried point into account
            k_nearest_neighbor_indices = kdt.query([sample], k = k + 1)
            k_nearest_neighbor_indices = k_nearest_neighbor_indices[1:]
        else:
            k_nearest_neighbor_indices = kdt.query([sample], k = k)

        return data_set[k_nearest_neighbor_indices]

        


    # T = |S union S'|
    @staticmethod
    def mark(t : set, set0 : set, set1 : set, k : int) -> set:
        if t == 1:
            return set0.union(set([s1 for s1 in set1 if is_in_hypersphere(s1, set0)]))
        else:
            kdt : KDTree = KDTree(np.asarray(list(set1)), metric='euclidean')
            set_tmp = set0.union(set([s1 for s1 in set1 if is_in_hypersphere(s1, set0)]))
            for s1 in set(itertools.islice(set1, t - 1)): # range from 0 until t -1 
                k_nearest_neighbors =k_nearest_neighbor_set(s1, set1, kdt, k, point_in_data=True)
                set_temp.union(set(k_nearest_neighbors))
            return set_tmp

    @staticmethod
    def capture(set0 : set, set1 : set, k : int) -> int:
        acc = 0
        kdt : KDTree = KDTree(np.asarray(list(set1)), metric='euclidean')
        for s1 in set1:
            for s0 in set0:

                k_nearest_neighbors  = k_nearest_neighbor_set(s1, set1, kdt, k, point_in_data=True)
                acc += is_in_hypersphere(s0, k_nearest_neighbors)
                
        return (self.k + 1) * len(s1) + acc

    @staticmethod
    def recapture(set0 : set, set1 : set, k : int) -> int:
        acc = 0
        kdt : KDTree = KDTree(np.asarray(list(set1)), metric='euclidean')
        for index, s1 in enumerate(set1):
            for s0 in set0:
                k_nearest_neighbors = k_nearest_neighbor_set(s1, set1, kdt, k, point_in_data=True)
                acc += is_in_hypersphere(s0, k_nearest_neighbors) + \
                    len(self.mark(index, set0, set1, k).\
                    union(set(k_nearest_neighbors)))
        return acc

    @staticmethod
    def estimate(set0 : set, set1 : set, k : int) -> float:
        return capture(set0, set1, k) * len(set0.union(set1)) / recapture(set0, set1, k)

# CAPTURE estimator

While the previous metrics are caputre- recapture methods, the next one will be a maximum likelihood method:

$L_n(P_{CAPTURE}; S, S') = ln(\frac{P_{CAPTURE}!}{(P_{CAPTURE} - C_{total}(S',S))!}) + C_{total}(S,S') \times ln(C_total(S,S')) + (TP_{CAPTURE} - C_{total}(S,S')) \times ln(TP_{CAPTURE} - C_{total}(S,S')) - (TP_{Capture}ln(TP_{CAPTURE}))$ 

In [8]:
def capture_total(set0 : set, set1 : set, k : int) -> int:
    kdt0 : KDTree = KDTree(np.asarray(list(set0)), metric='euclidean')
    kdt1 : KDTree = KDTree(np.asarray(list(set1)), metric='euclidean')
    acc : int = 0
    for s1 in set1:
        for s0 in set0:
            k_nearest_neighbors_0 = k_nearest_neighbor_set(s0, set0, kdt0, k, point_in_data=True)
            k_nearest_neighbors_1 = k_nearest_neighbor_set(s1, set1, kdt1, k, point_in_data=True)
            acc += is_in_hypersphere(s1, k_nearest_neighbors_0) + len(k_nearest_neighbors_0) + \
                    is_in_hypersphere(s0, k_nearest_neighbors_1) + len(k_nearest_neighbors_1)
    return acc

def maximize_likelihood(set0 : set, set1 : set, n : int = 10e10) -> float:
    m_t : int = len(set0.union(set1))
    c_t : int = capture_total(set0, set1)
    t : int = len(set0.union(set1))
    def likelihood(p):
        return -(\
            np.log(np.math.factorial(p) / np.math.factorial(p - m_t)) + \
            c_t * np.log(c_t) + \
            (t * p - c_t) * np.log(t * p - c_t) - \
            t * p * np.log(t * p) \
            )
    int_range = np.arange(n)
    max_val = minimize(likelihood, int_range, method="nelder-mead", options={'xatol':1e-8, }) 
    return 0



# Accuracy

Accuracy loss function. 0 indicatates a good estimation, 1 indicates a bad estimation.


$A(P, \hat{P}) = max(|\frac{\hat{P}- P}{P}|, 1)$

In [None]:
def accuracy_loss(p_hat, p):
    # Errorhandling
    if p == 0:
        return -1
    a = abs((p_hat - p) / p)
    return 1 if a > 1 else a

def accuracy_loss_TEST():
    assert accuracy_loss(4, 5) == 0.2
    assert accuracy_loss(5, 2) == 1
    assert accuracy_loss(0, 0)

accuracy_loss_TEST()