# Inference Algorithm

In [9]:
import numpy as np
import networkx as nx
from dask import delayed

from mercs.composition import o

# Setup

In [23]:
check = lambda k,n: k=='D'

In [28]:
dataset='iris'
kind='train'
separator='-'
extension='csv'

In [32]:
filename = separator.join([dataset, kind])+".{}".format(extension)
filename

'iris-train.csv'

In [20]:
a = np.ndarray([1,2,3])

In [22]:
dir(a)

['T',
 '__abs__',
 '__add__',
 '__and__',
 '__array__',
 '__array_finalize__',
 '__array_function__',
 '__array_interface__',
 '__array_prepare__',
 '__array_priority__',
 '__array_struct__',
 '__array_ufunc__',
 '__array_wrap__',
 '__bool__',
 '__class__',
 '__complex__',
 '__contains__',
 '__copy__',
 '__deepcopy__',
 '__delattr__',
 '__delitem__',
 '__dir__',
 '__divmod__',
 '__doc__',
 '__eq__',
 '__float__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__iand__',
 '__ifloordiv__',
 '__ilshift__',
 '__imatmul__',
 '__imod__',
 '__imul__',
 '__index__',
 '__init__',
 '__init_subclass__',
 '__int__',
 '__invert__',
 '__ior__',
 '__ipow__',
 '__irshift__',
 '__isub__',
 '__iter__',
 '__itruediv__',
 '__ixor__',
 '__le__',
 '__len__',
 '__lshift__',
 '__lt__',
 '__matmul__',
 '__mod__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__or__',
 '__pos__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__rdivmod__',
 '__

# Function

In [99]:
def inference_algorithm(g, data):

    data_node = lambda x: isinstance(x, int)
    model_node = lambda x: x.startswith("M")
    imputation_node = lambda x: x.startswith("I")

    nodes = list(nx.topological_sort(g))

    g_desc_ids = list(g.desc_ids)

    for n in nodes:
        if data_node(n):
            if g.in_degree(n) == 0:
                dask_input_data_node(g, n, g_desc_ids, data)
            elif g.in_degree(n) == 1:
                dask_single_data_node(g, n, m_list)
            elif g.in_degree(n) > 1:
                if n[1] in nominal_ids:
                    dask_nominal_data_node(g, n, m_list)
                else:
                    dask_numeric_data_node(g, n, m_list)
        elif model_node(n):
            f[n] = dask_model_node(g, n)
        elif imputation_node(n):
            f[n] = dask_imputation_node(g, n)
        else:
            raise ValueError("Did not recognize node kind of {}".format(node_name))

    return


def dask_input_data_node(g, node, g_desc_ids, data):
    g.node[node]["dask"] = delayed(_select_numeric(g_desc_ids.index(idx)))(data)
    return


def dask_model_node(g, node, m_list):
    # Collect input data
    parent_functions = _get_parents_of_model_node(g, node)
    collector = delayed(np.stack)(parent_functions, axis=1)

    # Convert function
    g.node[node]["dask"] = delayed(m_list[node[1]].predict)(collector)

    if hasattr(m_list[node[1]], "predict_proba"):
        g.node[node]["dask_proba"] = delayed(node["predict_proba"])(collector)

    return


def dask_imputation_node(g, node, i_list, nb_rows):

    f1 = _dummy_array
    f2 = i_list[node[1]].transform
    f = o(f2, f1)

    g.node[node]["dask"] = delayed(f)(nb_rows)
    return


def dask_single_data_node(g, node, m_list):
    # Single output to recover from model, I do not have to merge or anything.
    idx, parent_functions = _get_parents_of_numeric_data_node(g, m_list, node)[0]
    g.node[node]["dask"] = delayed(_select_numeric(idx))(parent_functions)
    return


def dask_nominal_data_node(g, node, m_list):
    idx_cls_fnc = _get_parents_of_nominal_data_node(g, m_list, node)
    classes = np.unique(np.hstack([c for _, c, _ in idx_cls_fnc]))

    # Reduce
    parent_functions = []
    for idx, c, fnc in idx_cls_fnc:
        f1 = delayed(_select_nominal(idx))(fnc)
        if len(c) < len(classes):
            f2 = delayed(_pad_proba(c, classes))(f1)
            parent_functions.append(f2)
        else:
            parent_functions.append(f1)

    f3 = delayed(partial(np.sum, axis=0))(parent_functions)

    # Vote
    def vote(X):
        return classes.take(np.argmax(X, axis=1), axis=0)

    g.node[node]["dask"] = delayed(vote)(f3)
    return


def dask_numeric_data_node(g, node, m_list):
    idx_fnc = _get_parents_of_numeric_data_node(g, m_list, node)

    parent_functions = [delayed(_select_numeric(idx))(fnc) for idx, fnc in idx_fnc]
    g.node[node]["dask"] = delayed(partial(np.mean, axis=0))(parent_functions)
    return

In [8]:
def _get_parents_of_model_node(g, node):
    parent_functions = {a: g.nodes[(m, a)]["dask"] for m, a in g.predecessors(node)}
    parent_functions = [v for k, v in sorted(parent_functions.items())]
    return parent_functions


def _get_parents_of_numeric_data_node(g, m_list, node):

    rel_idx = lambda p_idx, n_idx: list(m_list[p_idx].targ_ids).index(n_idx)

    parents = [(m, p_idx) for m, p_idx in g.predecessors(node)]

    idx_fnc = [
        (rel_idx(p_idx, node[1]), g.node[(m, p_idx)]["dask"]) for m, p_idx in parents
    ]

    return idx_fnc


def _get_parents_of_nominal_data_node(g, m_list, node):
    rel_idx = lambda p_idx, n_idx: list(m_list[p_idx].targ_ids).index(n_idx)
    classes = lambda p_idx, r_idx: m_list[p_idx].classes_[r_idx]

    parents = [(m, p_idx) for m, p_idx in g.predecessors(node)]

    idx_fnc = [
        (rel_idx(p_idx, node[1]), p_idx, g.node[(m, p_idx)]["dask"])
        for m, p_idx in parents
    ]
    idx_cls_fnc = [(r_idx, classes(p_idx, r_idx), f) for r_idx, p_idx, f in idx_cls_fnc]

    return idx_cls_fnc


def _dummy_array(nb_rows):
    a = np.empty((nb_rows, 1))
    a.fill(np.nan)
    return a

In [3]:
a = lambda x,y: x+y
a(3,2)

5

In [51]:
%%timeit
n_rows =10**6
a = np.empty((n_rows,1))
a[:]=np.nan
#a = a.reshape(-1,1)

1.1 ms ± 1.84 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [46]:
%%timeit
n_rows =10**6
a=np.full((n_rows, 1), np.nan)

1.11 ms ± 2.01 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


array([[nan],
       [nan],
       [nan],
       ...,
       [nan],
       [nan],
       [nan]])

In [19]:
# Helpers
def _select_numeric(idx):
    def select(X):
        if a.ndim > 1:
            return X[:, idx]
        else:
            return X

    return select

In [25]:
d = {300: 'd', 0: 'a', 1:'c'}

In [36]:
sorted(d.items())

[(0, 'a'), (1, 'c'), (300, 'd')]

SyntaxError: invalid syntax (<ipython-input-31-c88aced58bb6>, line 1)

In [None]:
actions = dict()

In [8]:
NODE_KINDS = dict(
I="imputation",
M="model",)

# Test