## Application procedure

In [None]:
from dislib.neighbors import NearestNeighbors

def main():
    dataset = generate_data()
    
    nn = NearestNeighbors()
    
    nn.fit(dataset)
    
    points = [point1, point2, point3]
    while not converged:
        points = nn.kneighbors(points, n_neighbors=5)
        process_points(points)
        evaluate_next_points()

## Current implementation on dislib

This is an abstraction for easy understanding. Actual code is more convoluted, but general flow and graph of tasks is correct.

In [None]:
class NearestNeighbors(BaseEstimator):
    def fit(self, x):
        self._fit_data = x
        return self
    
    def kneighbors(self, points, ...):
        partial_results = list()
        
        for row_blocks in self._fit_data.iterator(axis="rows"):
            # Following lines are enclosed in a task
            nn = sklearn.NearestNeighbors()
            nn.fit(row_blocks)
            partial_results.append(nn.kneighbors(points))
        
        # This is a task
        return _merge(partial_results)

## Correct IMHO implementation on dislib

Somebody may want to implement this. If implemented, this can become the baseline for our evaluation, otherwise we have no baseline to compare to.

In [None]:
@task
def fit_task(row_blocks)
    nn = sklearn.NearestNeighbors()
    nn.fit(row_blocks)
    return nn

class NearestNeighbors(BaseEstimator):
    def fit(self, x):
        self._preprocessed_blocks = list()
        
        for row_blocks in x.iterator(axis="rows"):
            nn = fit_task(row_blocks)
            self._preprocessed_blocks.append(nn)
            
        return self

    def kneighbors(self, points, ...):
        partial_results = list()
        
        for sk_pp_block in self._preprocessed_blocks:
            # Following lines should be enclosed into a task 
            partial_results.append(sk_pp_block.kneighbors(points))
        
        # This is also a task
        return _merge(partial_results)

## Intermission: index mapping

`sklearn.NearestNeighbors.kneighbors` returns the indexes of the points for the input dataset. In this notebook and in general, we want to get the indexes for the full dataset. We are giving `sklearn` a portion of the dataset, so indexes will be _local_ and not _global_.

The current merge implementation on COMPSs relies on tracking the _index offset_ by keeping track on the number of points processed into the query (the list that goes into the merge follows the original order).

The following examples have explicit index translation table. Previous examples also have some index translation, but it not shown. It is quite simple, but it is required nevertheless.

## Draft proposal with split

In [None]:

class NearestNeighborsWithSplit(BaseEstimator):
    def fit(self, x):
        self._preprocessed_blocks = list()
        self._index_translation_table = list()

        for partition in split(x):
            # Following lines should be enclosed into a task
            # -------------------------
            subdataset = np.stack(partition._chunks)
            nn = sklearn.NearestNeighbors()
            nn.fit(subdataset)
            
            # This is to be discussed
            itt = np.zeros(len(subdataset), dtype=int)
            n = 0
            for row_block in partition._chunks:
                self._index_translation_table[n:n + len(row_block)] = \
                    range(row_block.offset, row_block.offset + len(row_block))
                                  #********#
                                  # This relies on PersistentBlock.offset attribute
            # -------------------------
            
            self._preprocessed_blocks.append(nn)
            self._index_translation_table.append(itt)

    def kneighbors(self, points, ...):
        partial_results = list()

        for sk_pp_block in self._preprocessed_blocks:
            # Following lines should be enclosed into a task 
            partial_results.append(sk_pp_block.kneighbors(points))

        # Now perform the remapping. Note that partial_results follows the same order
        # as self._index_translation_table, so remapping is immediate
        return _merge(partial_results, self._index_translation_table)

# Appendix A

## Alternatives for index translation table

The previous example showed the use of `PersistentBlock.offset` in order to get the offset. This is the most generic approach in which matrixes may be irregular in terms of number-of-points-per-block, and the Block self-contains the offset for each block (this information can be considered redundant from the point of view of the dislib array).

If adding an offset attribute to PersistentBlock is not desirable, a similar result can be achieved by relying on regularness of the input dataset dislib array. Note that regularness of dislib array is not generally a requirement.

Adding a `split` method on the dislib array with an implementation that relies on the `split` dataClay general idea for its blocks while keeping track of the index is also a idea. This would result in the following:

In [None]:
class ArraySplit(object):
    ...

class Array(object):
    def split(self, axis=0):
        ret = list()
        # < iterate chunks >
        for ... in ...:
            # < assign chunks to some substructures >
            # < build an array for each thing >
            arr = Array()
            # < keep track of index >
            offset = [m, n]
            
            # Completely incorrect Python code, but the main idea remains
            ArraySplit.append(arr, offset)

        return ret
    
    
# In the main application
for partition in x.split("rows"):
    # partition is an ArraySplit instance
    for array, offset in partition.iter_with_offset():
        # do stuff here
        ...

# For applications that do not care on the offsets:
for partition in x.split("rows"):
    for array in partition:
        # do stuff here
        ...

# Appendix B

## Huge `points` considerations

The `points` structure passed to the `kneighbors` call may be a small list but it can potentially be a huge one. The first use case is related to things such as _edge detection_ that require multiple calls and build upon previous results. The second use case is related to clusterization or feature extraction.

A `kneighbors` implementation that is ready for this use case is conceivable, albeit I would avoid it in the first proof-of-concept implementation.

In [None]:
class NearestNeighborsWithSplit(BaseEstimator):
    def kneighbors(self, points, ...):
        if is_a_dislib_array(points) and has_more_than_one_row_block(points):
            for partition in split(points):
                # Following lines should be enclosed into a task
                # -------------------------

                # AFAICT, there is no real benefit to stack points into a single block
                # so we can safely make the iteration inside the task
                for query_index, query_chunk in zip(partition.get_indexes(), partition._chunks):
                    r = self.kneighbors(query_chunk)  # do not fear, this is NOT a recursive algorithm
                    partial_results.append(query_index, r)
                    
            sort_and_join(partial_results)
        else:
            # the simpler code in previous examples

# Update 2021/11/25

## Encapsulating the index tracking semantic in an agnostic-ish way

Up until now, we had the `split` concept, alongside a `GenericSplit` for simple stuff (as well as some prospections for `WorkStealingSplit` and similar):

In [None]:
# Current state

for partition in split(experiment, split_class=GenericSplit):
    # each partition is an instance of GenericSplit 
    # (registered class, built-in, provided by dataClay)
    partition._chunks  # <- this gives a list of chunks
    for chunk in partition:  # <- this is transparent way to access those same chunks
        pass
    partition.get_chunkindexes()  # <- this gives the position of each **chunk**

My idea is to build upon that and have the following:

In [None]:
for partition in split(experiment, split_class=SplitWithIndexTrackingCoordinator()):
    # each partition is an instance of SplitWithIndexTracking
    # and they are coordinated by the SplitWithIndexTrackingCoordinator
    # (all registered classes, built-in, provided by dataClay)
    
    # Existing features ("inherited" or whatever)
    partition.get_chunkindexes()
    
    # New features of this SplitWITC:
    partition.get_itemindexes()

The only requirement for SplitWithIndexTrackingCoordinator is that each element should have the concept of _length_ (which, in Python, it means that `len(object)` should work and typically this is achieved through the implementation of `__len__` method).

This is satisfied by the `PersistentBlock` of the dislib array, but it is quite generic and expected. So it can be used by most data structures.

**Semantic nitpicking / language-lawyer-mode-on:** `split_class` should be renamed to `split_factory`. But otherwise, it is a backwards-compatible change.

### How does this work?

The `split` procedure uses the `SplitClass.add_object` method. For `SplitWithIndexTracking` instances, that will trigger a call to `len` and update a global index counter on `SplitWithIndexTrackingCoordinator` as well as a local index mapping table for each split.

It sounds convoluted, but

 - this basic strategy resembles the one used by the dislib
 - it is compatible with all `len`-supporting structures (which are most)
 - the idea of this `SplitXXXCoordinator` can be reused for other situations
 
 
## Draft proposal with this idea

In [None]:
class NearestNeighborsWithSplit(BaseEstimator):
    def fit(self, x):
        self._preprocessed_blocks = list()
        self._index_translation_table = list()

        for partition in split(x, split_factory=SplitWITC()):
            # Following lines should be enclosed into a task
            # -------------------------
            subdataset = np.stack(partition._chunks)
            nn = sklearn.NearestNeighbors()
            nn.fit(subdataset)
            
            self._preprocessed_blocks.append(nn)  # nn is a sklearn
            self._index_translation_table.append(partition.get_itemindexes())
                                                         # *******************
                                                         # this is provided by SplitWITC()
            # type of idx_local_to_global is open to discussion
            # but the semantic is clear: 
            # it conveys a way to convert indexes from local to global

    def kneighbors(self, points, ...):
        partial_results = list()

        for sk_pp_block, itt in zip(self._preprocessed_blocks, self._index_translation_table):
            # Following lines should be enclosed into a task 
            distances, indexes = sk_pp_block.kneighbors(points)
            partial_results.append(distances, itt[indexes])

        # Now perform the merge.
        # This is now simpler than before because index are global
        # so the code complexity is reduced
        return _merge(partial_results)

### Comparison between `merge` functions

In [None]:
# Dislib version with index tracking stuff
@task(returns=2)
def _merge(*queries):
    final_dist, final_ind, offset = queries[0]

    for dist, ind, n_samples in queries[1:]:
        ind += offset
        offset += n_samples

        # keep the indices of the samples that are at minimum distance
        m_ind = _min_indices(final_dist, dist)
        comb_ind = np.hstack((final_ind, ind))

        final_ind = np.array([comb_ind[i][m_ind[i]]
                              for i in range(comb_ind.shape[0])])

        # keep the minimum distances
        final_dist = _min_distances(final_dist, dist)

    return final_dist, final_ind

In [None]:
# Split version (untested)
@task(returns=2)
def _merge(*queries):
    final_dist, final_ind = zip(*queries)
    
    # Stack everything
    final_dist = np.hstack(final_dist)
    final_ind = np.hstack(final_ind)
    
    # And now sort distances and keep the resulting first n
    result_ind = np.argsort(final_dist)[:"""number of points to keep"""]
    
    return final_dist[result_ind], final_ind[result_ind]