In [33]:
from typing import Optional, List, Any, Tuple

import numpy as np
import numpy.typing as npt
from haversine import haversine_vector, Unit
from src.interpolation._rbfinterp import _build_and_solve_system as _build_and_solve_system2
from src.interpolation._rbfinterp_pythran import (_build_system, _build_evaluation_coefficients, _polynomial_matrix)
from scipy.linalg.lapack import dgesv

float_like = npt.NDArray[np.floating[Any]]

In [96]:
def _construct_Amatrix(distances, delta_times, distance_weight, time_weight):
    return -np.sqrt((distance_weight * distances)**2 + (time_weight * delta_times)**2)

def _build_and_solve_system(y, t, distance_weight = 0.1, time_weight = 0.5625):
    """Build and solve the RBF interpolation system of equations"""
    distances = haversine_vector(y[:, [1, 0]], y[:, [1, 0]], Unit.KILOMETERS, comb=True)
    delta_times = y[:, 2:3] - y[:, 2:3].T
    a_matrix = _construct_Amatrix(distances, delta_times, distance_weight, time_weight)
    _, _, weight, _ = dgesv(a_matrix, t, overwrite_a=False, overwrite_b=False)
    return weight

def _evaluator(
    x: float_like,
    y: float_like,
    weights: float_like,
    distance_weight = 0.1,
    time_weight = 0.5625
):
    """
    Evaluate the interpolation.
    """
    distances = haversine_vector(x[[1, 0]], y[:, [1, 0]], Unit.KILOMETERS, comb=True)
    delta_times = x[2] - y[:, 2]
    a_matrix = _construct_Amatrix(distances, delta_times, distance_weight, time_weight)
    return np.dot(a_matrix, weights)

def build_and_evaluate(
    x: float_like,
    y: float_like,
    t: float_like,
    distance_weight = 0.1,
    time_weight = 0.5625
):
    weights = _build_and_solve_system(y, t, distance_weight, time_weight)
    return _evaluator(x, y, weights, distance_weight, time_weight)

In [11]:
def _chunk_evaluator(
    x: float_like,
    y: float_like,
    shift: float_like,
    scale: float_like,
    coeffs: float_like,
    memory_budget: int = 1000000
):
    """
    Evaluate the interpolation while controlling memory consumption.
    We chunk the input if we need more memory than specified.

    Parameters
    ----------
    x : (Q, N) float ndarray
        array of points on which to evaluate
    y: (P, N) float ndarray
        array of points on which we know function values
    shift: (N, ) ndarray
        Domain shift used to create the polynomial matrix.
    scale : (N,) float ndarray
        Domain scaling used to create the polynomial matrix.
    coeffs: (P+R, S) float ndarray
        Coefficients in front of basis functions
    memory_budget: int
        Total amount of memory (in units of sizeof(float)) we wish
        to devote for storing the array of coefficients for
        interpolated points. If we need more memory than that, we
        chunk the input.

    Returns
    -------
    (Q, S) float ndarray
    Interpolated array
    """
    nx, ndim = x.shape
    nnei = 1000
    # in each chunk we consume the same space we already occupy
    chunksize = memory_budget // ((1 + nnei)) + 1
    if chunksize <= nx:
        out = np.empty((nx, 1), dtype=float)
        for i in range(0, nx, chunksize):
            vec = _build_evaluation_coefficients(
                x[i:i + chunksize, :],
                y,
                "haversine",
                1.0,
                np.array([[0, 0, 0]]),
                shift,
                scale)
            out[i:i + chunksize, :] = np.dot(vec, coeffs)
    else:
        vec = _build_evaluation_coefficients(
            x,
            y,
            "haversine",
            1.0,
            np.array([[0, 0, 0]]),
            shift,
            scale)
        out = np.dot(vec, coeffs)
    return out

In [97]:
coeffs = np.load("coeffs.npy")
dnbr = np.load("dnbr.npy")
powers = np.load("powers.npy")
scale = np.load("scale.npy")
shift = np.load("shift.npy")
snbr = np.load("snbr.npy")
sub_out = np.load("sub_out.npy")
xnbr = np.load("xnbr.npy")
ynbr = np.load("ynbr.npy")

In [98]:
def test3():
    haversine_vector(ynbr[:, [1, 0]], ynbr[:, [1, 0]], Unit.KILOMETERS, comb=True)
def test1():
    out = build_and_evaluate(xnbr[0], ynbr, dnbr)
def test2():
    shift_, scale_, coeffs_ = _build_and_solve_system2(
        ynbr,
        dnbr,
        snbr,
        "haversine",
        1.0,
        powers,
    )
    out_ = _chunk_evaluator(
        xnbr,
        ynbr,
        shift_,
        scale_,
        coeffs_,
        memory_budget=10000000000
    )

In [100]:
%%timeit
test1()

309 ms ± 24.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [101]:
%%timeit
test2()

139 ms ± 19.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
