In [None]:
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from fermat import Fermat
from sklearn.datasets import load_iris
import networkx as nx
import igraph as ig
import neo4j

In [None]:
X, y = load_iris(return_X_y=True, as_frame=True)
N = len(X)  # # observaciones
K = len(set(y))  # # clases

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)


In [None]:
G = nx.complete_graph(len(X))

In [None]:
nx.set_node_attributes(G, X.to_dict(orient="index"))
nx.set_node_attributes(G, y, name="y")

In [None]:
alpha, path_method = 2, "FW"
fermat = Fermat(alpha=alpha, path_method=path_method)
dists = distance_matrix(X, X)


In [None]:
# %%timeit
fermat.fit(dists)
fdists = fermat.get_distances()
nx.set_edge_attributes(
    G, {(i, j): fdists[i, j] for i in range(N) for j in range(i)}, name="fermat_dist"
)


In [None]:
for (i, j), d in nx.get_edge_attributes(G, "fermat_dist").items():
    assert fdists[i, j] == d

In [None]:
# Reverse check
assert np.all(
    fdists[i, j] == d
    for (i, j), d
    in nx.get_edge_attributes(G, "fermat_dist").items()
)


In [None]:
# %%timeit
nx.set_edge_attributes(
    G, {(i, j): dists[i, j] ** alpha for i in range(N) for j in range(i)}, name="alpha_dist"
)
nxdists = nx.shortest_paths.floyd_warshall_numpy(G, weight="alpha_dist")

In [None]:
# Aparentemente las referencias locales (`nxdists`) dentro de %%timeit no perduran
nxdists = nx.shortest_paths.floyd_warshall_numpy(G, weight="alpha_dist")
np.allclose(fdists, nxdists)

In [None]:
diff = np.abs(nxdists - fdists)
plt.imshow(diff, cmap="gray")

In [None]:
y

In [None]:
max_idx = diff.argmax()
max_i, max_j = max_idx // N, max_idx % N
diff[max_i, max_j]
pd.concat([X.loc[[max_i, max_j]], y[[max_i, max_j]]], axis=1)

Son dos observaciones idénticas! Probablemente uno de los dos algoritmos no está permitiendo caminos mínimos de longitud 0.

In [None]:
nxdists[max_i, max_j], fdists[max_i, max_j]

In [None]:
fdists[max_i, max_i]

Curioso. La distancia de `max_i` a sí mismo es 0, pero no así a `max_j`. Asumimos que `Fermat` llegó a la distancia `fdists[max_i, max_j]` saltando al NN(1) de `max_i` que _no está en su mismo lugar_ (como `max_j`), y volviendo a `max_i`, así que conjeturamos que la distancia de `max_i` a NN(1) _según Fermat_, tiene que ser `fdists[max_i, max_j] / 2`. 

In [None]:
nn1 = fdists[max_i].argsort()[1]  # el argmin absoluto es `max_i`, me interesa el segundo
assert fdists[max_i, nn1] == fdists[max_i, max_j] / 2

Era verdad! Qué cagada, a esta altura más bien tendemos a desconfiar de `Fermat.fit()`. Habremos de verificar que las `N` observaciones de X sean únicas.

In [None]:
plt.figure(figsize=(12, 9))
nx.draw_networkx(
    G,
    width=1e-2 * np.sqrt([G.edges[e].get("alpha_dist") for e in G.edges]),
    node_color=[
        {0: "blue", 1: "green", 2: "red"}[G.nodes[n].get("y")] for n in G.nodes
    ],
)


In [None]:
from sklearn.neighbors import KernelDensity

In [None]:
from numpy import random as rnd
import seaborn as sns


In [None]:
X = rnd.rand(100, 3)
X.sort(axis=1)
X = np.column_stack([np.zeros_like(X[:, 0]), X, np.ones_like(X[:, 0])])
x = X[0]
x, X[:5,]

In [None]:
def fermat_dist(x, alpha=1):
    """ Fermat alpha-distance between `x_0` and `x_k`, in the line graph with nodes at `x = (x_1, ..., x_k)`."""
    return ((x[1:] - x[:-1]) ** alpha).sum()
    

In [None]:
np.apply_along_axis(fermat_dist, axis=1, arr=X, alpha=3).mean()


In [None]:
sample_size = 1000
scales = [1/10, 1/2, 1, 2, 10]
alphas = np.linspace(1, 4, 7)
ks = [1, 2, 5, 10]
results = []
for k in ks:
    X = rnd.rand(sample_size, k)
    X.sort(axis=1)
    X = np.column_stack([np.zeros_like(X[:, 0]), X, np.ones_like(X[:, 0])])
    for scale in scales:
        for alpha in alphas:
            results.append(
                dict(
                    k=k,
                    alpha=alpha,
                    scale=scale,
                    dists=np.apply_along_axis(
                        fermat_dist, axis=1, arr=scale * X, alpha=alpha
                    ),
                )
            )


In [None]:
df = pd.DataFrame(results)

In [None]:
df["mean_dist"] = df.dists.apply(np.mean)
df["scaled_dist"] = df.mean_dist / (df.scale ** df.alpha)

In [None]:
df.head()

In [None]:
df[df.k == 1].pivot("alpha", "scale", "mean_dist").round(3)

In [None]:
df[df.k == 1].pivot("alpha", "scale", "scaled_dist").round(3)

Está claro que si $c$ es la constance de escale `scale`, las distancias de fermat escalan según $c^{\alpha}$. Ahora, cuánto cambian con el tamaño de muestra $k$? 

Para $k=1$, se puede calcular exactamente la esperanza de la longitud del camino. Si hay un único punto entre 0 y 1 elegido al azar según $X \sim \text{Unif}(0, 1)$, entonces la longitud del camino de Fermat cuando $alpha=2$ será $E\left(X^2 + (1-X)^2\right) = 2/3$, lo cual se ve en la tabla anterior. Para $k=1$ y otros valores de $\alpha$, la expresión no será tan bella pero es computable sin mucha dificultad. Para otros valores de $k$, sin embargo, ya entran en juego la distribución de los estadísticos de orden y no me resulta para nada evidente una fórmula cerrada.

Aproximémosla. Sean $X^{(0)} = 0, X^{(k+1)} = 1$ y $X^{(i)}, i=1,\dots,k$ las v.a. que surgen de ordenar una muestra de $k$. Para todo $k$ se cumple que cada "segmentito de recta", $\mathbb{E}\left(X^{(i+1)}-X^{(i)}\right) = 1 / (k + 1)$. Luego, esperaríamos que 

$$
\begin{align} dist_{\alpha}^k(0, 1) &= \mathbb{E}\ \left(\sum_{i=0}^k \left[X^{(i+1)}-X^{(i)} \right]^{\alpha}\right) \\
 &=  \sum_{i=0}^k\ \left( \mathbb{E}\left[X^{(i+1)}-X^{(i)} \right]^{\alpha}\right) \\
 &\approx (???) (k + 1) \left(\frac{1}{k+1}\right)^{\alpha}
   \end{align}

$$

Pero no vale que $E(X^k) = E(X)^k$! Qué se hace en su lugar? Hay que conocer la densidad de $X^{(i+1)}-X^{(i)}$ y calcularlo? Empíricamente, veamos como cambia la distancia con $k$.

In [None]:
df[df.scale == 1].pivot("alpha", "k", "scaled_dist")

In [None]:
df[df.scale == 1].assign(k_scale = lambda x: (x.k) ** (1 - x.alpha)).pivot("alpha", "k", "k_scale")

In [None]:
sample_size = 1000
alphas = [1.5, 1.75, 2, 2.25, 3, 4, 5]
ks = np.array([*range(1, 11), *rnd.choice(range(11, 2001), 50, replace=False)])

results = []
for k in ks:
    X = rnd.rand(sample_size, k)
    X.sort(axis=1)
    X = np.column_stack([np.zeros_like(X[:, 0]), X, np.ones_like(X[:, 0])])
    for alpha in alphas:
        results.append(
            dict(
                k=k,
                alpha=alpha,
                dists=np.apply_along_axis(
                    fermat_dist, axis=1, arr=X, alpha=alpha
                ),
            )
        )

df = pd.DataFrame(results)
df["mean_dist"] = df.dists.apply(np.mean)

In [None]:
df[df.k < 50].pivot("alpha", "k", "mean_dist").round(3)

In [None]:
plt.figure(figsize=(16, 7))
sns.lineplot(x="k", y="mean_dist", hue="alpha", data=df, marker="o")
plt.xscale("log")
plt.yscale("log")
plt.show()

Para la elección del ancho de banda $h$ es necesario saber la escala de las distancias! Con qué se come esto???

In [None]:
results = []
for i in range(10000):
    s = rnd.rand(100)
    s.sort()
    dists = (np.array([s[0], *(s[1:] - s[:-1]), 1 - s[-1]])) ** 2
    results.append((dists.mean(), dists.std()))

In [None]:
np.mean([std / mean for mean, std in results])

## Implementación KDEClassifier

In [None]:
from scipy.stats import norm
from numpy.linalg import norm as euclidean_norm
from scipy.spatial import distance_matrix


In [None]:
euclidean_norm([1, 1])

In [None]:
import scipy

In [None]:
scipy.spatial.distance.minkowski(X[0], X[2])

In [None]:
repr(fermat)

In [None]:
A = range(5)
B = range(7)
[[a * b for a in A] for b in B]

In [None]:
preds = []
fhats = []
for x in X_test:
    fhat = {}
    for cls in classes.keys():
        klass = classes[cls]
        verts, dists = klass["verts"], klass["dists"]
        n = verts.shape[0]
        to_verts = euclidean_distances(x.reshape(1, -1), verts)[0] ** alpha
        fmt_dists = [min(to_verts + dists[:, i]) for i in range(n)]
        # print(cls, np.mean(fmt_dists))
        fhat[cls] = (1 / h**D) * np.mean([kern(d / h) for d in fmt_dists])
    fhats.append(fhat)
    preds.append(pd.Series(fhat).argmax())

In [None]:
np.where((X[:] == 2 * X[2]).all(1))[0]

In [None]:
X.shape, np.zeros(10)

In [None]:
class FermatDistance(Fermat):
    def __init__(self, **kwargs):
        super.__init__(**kwargs)

    def _fit(self, X):
        self.fit(X)
        self.X_ = X
        self.n_, self.d_ = X.shape
        self.is_fitted_ = True
        return self

    def __call__(A, B):
        if self.alpha == 1:
            return distance_matrix(A, B)
        else:
            return [[self._get_distance(a, b) for a in A] for b in B]

    def _get_distance(a, b):
        if not self.is_fitted_:
            self._fit()
        if any((self.X_[:] == a).all(1)):  # `a` is a node from X_
            to_X = np.zeros(self.n_)
        else:
            to_X = euclidean_distances(a.reshape(1, -1), verts)[0] ** self.alpha
        

        a_known = np.where((X[:] == 2 * X[2]).all(1))[0]
        if known np.where((X[:] == 2 * X[2]).all(1))
        to_nodes = euclidean_distances(a.reshape(1, -1), verts)[0] ** alpha
        

In [None]:
new = np.random.rand(3, 4)
sample = np.random.rand(5, 4)
kernel = norm.pdf
h = 2

In [None]:
kernel(distance_matrix(new, sample) / h).mean(axis=1) * (h ** -sample.shape[0])

In [None]:
def euclidean_distance(x, y):
    return euclidean_norm(x - y)


# https://scikit-learn.org/stable/developers/develop.html
class EuclideanKDE:
    def __init__(self, kernel="normal", bandwith=1):
        self.kernel = kernel
        self.bandwith = bandwith

    def fit(self, X):
        self.X_ = X
        self.n_, self.dim_ = X.shape
        return self

    def density(self, X=None, log=True):
        X = X or self.X_
        return np.log(
            kernel(distance_matrix(X, self.X_) / h).mean(axis=1)
            * (self.bandwith**-self.dim_)
        )


In [None]:
class EuclideanKDEClassifier:
    def __init__(self, kernel="normal", bandwith=1):
        self.kernel = kernel
        self.bandwith = bandwith

    def fit(self, X, y):
        # Check that X and y have correct shape  
        X, y = check_X_y(X, y)
        # Store the classes seen during fit
        self.classes_ = unique_labels(y)                                      
        self.X_ = X
        self.y_ = y
        self.densities_ = {cls: EuclideanKDE.fit(X) for cls in self.classes_}

    def predict(self, X):
        self.densities_ nnnnnnnnnnnnnn

In [None]:
ekde = EuclideanKDE()
ekde.fit(X)
ekde.density()

In [None]:
def euclidean_distance(x, y):
    return euclidean_norm(x - y)

# https://scikit-learn.org/stable/developers/develop.html
class FermatKDEstimator:
    def __init__(self, kernel="normal", bandwith=1, alpha=2, path_method="FW"):
        self.kernel = kernel
        self.bandwith = bandwith
        self.alpha = alpha
        self.path_method = path_method

    def fit(self, X):
        self.X_ = X
        self.n_, self.dim_ = X.shape
        self.euclidean_distances_ = distance_matrix(X, X)
        if self.alpha == 1:
            self.distances_ = self.euclidean_distances_
        elif:
            self.distances_ = self.euclidean_distances_ ** self.alpha
        self.fermat_ = Fermat(self.alpha, self.path_method)
        self.fermat_.fit(X)


    def             self.distances_ = self.
        if self.distance == "euclidean":
            self.distances_ = self.euclidean_distances_
        if self.distance.

    def predict(self, X):
        y = anp.empty_like(X) * distances = rv = self.bandwith**-self.d * np.mean(
            [self.kernel(self.distance(x, xi) / self.bandwith) for xi in self.X]
        )
        return rv

    def distance(self, X, Y):
        pass  # Implement your own. Should accept 



In [None]:
X, y = load_iris(return_X_y=True)
N = len(X)  # # observaciones
K = len(set(y))  # # clases

In [None]:
kde = KDEstimator()

In [None]:
kde.fit(X[y == 0])

In [None]:
kde.predict(X[-1])

In [None]:
y[0]

## TemplateClassifier

In [None]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances
class TemplateClassifier(BaseEstimator, ClassifierMixin):

    def __init__(self, demo_param='demo'):
        self.demo_param = demo_param

    def fit(self, X, y):

        # Check that X and y have correct shape
        X, y = check_X_y(X, y)
        # Store the classes seen during fit
        self.classes_ = unique_labels(y)

        self.X_ = X
        self.y_ = y
        # Return the classifier
        return self

    def predict(self, X):

        # Check if fit has been called
        check_is_fitted(self)

        # Input validation
        X = check_array(X)

        closest = np.argmin(euclidean_distances(X, self.X_), axis=1)
        return self.y_[closest]
