# Advanced Slicing in Kosh

This notebook is for **advanced** users.

We will show how to re-implement `__getitem__` in Kosh's Loaders, Transformers or Operators, to allow efficient slicing.

## Concepts

In early version of Kosh (<1.2) , Transformers and Loaders would only allow to retrieve a feature in its whole. Loaders could be fed user-defined keywords to subset a feature but nothing general was in place.

Similarly transformers would get the entire feature before processing it.

Starting with 1.2, Kosh offers the possibility to slice a feature at any level, allowing for efficient data retrieval.

In this notebook will assume we have a very big dataset that cannot fit in memory at once. We will show how Kosh can still work on *chunks* of the datasets.

## The need for indexing

To demonstrate Kosh capabilities we will create a custom loader, that returns range(N) with N coming from the source name itself. When N becomes very big, a regular ingestion is no longer possible.

## Standard Loader

Let's create this loader in its most basic form. 



In [1]:
import kosh
import os
import numpy


class MyBasicLoader(kosh.KoshLoader):
    types = {"range": ["numpy", ]}

    def extract(self):
        # ridiculous over-simplification, use name to get length!
        length = int(os.path.basename(self.obj.uri))
        return numpy.arange(length)

    def list_features(self):
        return ["range", ]


Let's add this loader to Kosh and try to retrieve a few fake datasets

It's a very simple loader, that can handle returning a single item or a slice.

Note that we didn't implement things like ellipsis, etc...

In [2]:
store = kosh.connect("data_slicing.sql", delete_all_contents=True)
store.add_loader(MyBasicLoader)
ds = store.create()
# associate a fake source with it
# name is "5" so we should return numpy.arange(5)
ds.associate("5", mime_type="range")
rg = ds["range"]
print("values:", rg())

values: [0 1 2 3 4]


So far all is good. But now let's assume we need a really long range

In [3]:
ds = store.create()
# associate a fake source with it
# name is "5" so we should return numpy.arange(5)
ds.associate("500000000000000000000000", mime_type="range")
rg = ds["range"]
try:
    print("values:", rg())
except Exception as error:
    print("We failed with the following error:", error)

We failed with the following error: Maximum allowed size exceeded


Maybe slicing this would work

In [4]:
try:
    print("values:", rg[:3])
except Exception as error:
    print("We failed with the following error:", error)

We failed with the following error: Maximum allowed size exceeded


Unfortunately no, this is because our loader does not explicitly define a way to slice the data.

## Introducing `__getitem__` in the loader

Let's solve this by adding a `__getitem__` function to our loader.

It's a very simple loader, that can handle returning a single item or a slice.

Note that we didn't implement things like ellipsis, etc...

In [5]:
import kosh

class MySlicingLoader(MyBasicLoader):
    types = {"range": ["numpy", ]}
    def __getitem__(self, key):
        length = int(os.path.basename(self.obj.uri))
        if isinstance(key, int):
            if 0 <= key < length:
                return numpy.array(key)
            elif -length <= key < 0:
                return length + key
            else:
                raise ValueError("Index {} is out of range".format(key))
        elif isinstance(key, slice):
            start = key.start
            stop = key.stop
            step = key.step
            if start is None:
                start = 0
            if step is None:
                step = 1
            if stop is None:
                stop = length
            if -length < start < 0:
                start += length
            if -length < stop < 0:
                stop += length
            return numpy.arange(start, stop, step, dtype=numpy.float64)
        else:
            raise ValueError("Invalid key value: {}".format(key))


Now let's remove the old loader and add this to our store.

In [6]:
del(store.loaders["range"])
store.add_loader(MySlicingLoader)
rg = ds["range"]
print(rg[:5])
print(rg[2020:2025])

[0. 1. 2. 3. 4.]
[2020. 2021. 2022. 2023. 2024.]


Problem solved! we can now easily slice and dice our data at the loader level?

## Transformers and loaders.

But what about Transformers or Operators?

Let's create a transformer that multiplies the data by 2.

In [7]:
class Twice(kosh.transformers.KoshTransformer):
    types = {"numpy":["numpy",]}
    def transform(self, input_, format):
        return input_*2.

Let's apply this transformer on our sliced data.

In [8]:
rg = ds.get_execution_graph("range", transformers=[Twice(),])
try:
    print(rg[:5])
except Exception as err:
    print("We run into the error again!: ", err)

We run into the error again!:  Maximum allowed size exceeded


## Helping the loader from the transformer/operator: `__getitem_propagate__`

We need to implement the propagation function to let the loader's `__getitem__` function know which indices are required: `__getitem__propagate__`, in addition to the requested key, *__get_item_propagate__* also receive the index of the input to which we will propagate the corresponding key. More the index later in this notebook.


In [9]:
class TwiceWithPropagate(Twice):
    def __getitem_propagate__(self, key, input_index):
        return key
rg = ds.get_execution_graph("range", transformers=[TwiceWithPropagate(),])
rg[:5]

array([0., 2., 4., 6., 8.])

It works! 


## Gotchas!

So why didn't Kosh implement this function by default on all Transformers/Operators?

It turns out index propagation can be tricky. Let's examine the case where the transformers also flips the data. 
Let's apply this to a smaller dataset so we can more easily follow the logic.

In [10]:
class FlipPropagate(kosh.transformers.KoshTransformer):
    def __getitem_propagate__(self, key, input_index):
        return key
    def transform(self, input_, format):
        return input_[::-1] * 2.
ds = store.create()
ds.associate("20", mime_type="range")
rg = ds.get_execution_graph("range", transformers=[FlipPropagate(),])
rg[:5]

array([8., 6., 4., 2., 0.])

Compare to the full solution

In [11]:
rg[:]

array([38., 36., 34., 32., 30., 28., 26., 24., 22., 20., 18., 16., 14.,
       12., 10.,  8.,  6.,  4.,  2.,  0.])

We should have received: `array([38., 36., 34., 32., 30.])`

So what went wrong?

Well our transformer dutifully propagated to our loader that we were only interested in the first 5 elements. As a result the loader sent back `0, 1, 2, 3, 4` which our transformer appropriately flipped and multiplied by 2.

So what should we do? Well we need to implement `__getitem_propagate__` in such a way that the loaders returns the **last** 5 elements and not the first five elements.

Here again we will over simplify and implement only positive int and slices as possible keys.

In [12]:
class FlipPropagateOk(kosh.transformers.KoshTransformer):
    def __getitem_propagate__(self, key, input_index):
        if isinstance(key, int):
            return -1 - key
        elif isinstance(key, slice):
            if key.stop is None:
                start = 0
            else:
                start = -key.stop
            if key.start is None:
                stop = None
            else:
                stop = -key.start
            return slice(start, stop, key.step)
        else:
            return slice(None, None, None)

    def transform(self, input_, format):
        return input_[::-1] * 2.
rg = ds.get_execution_graph("range", transformers=[FlipPropagateOk(),])
rg[:5]

array([38., 36., 34., 32., 30.])

Hurray! It works!

The same is true for Operators.

In [13]:
class ADD(kosh.KoshOperator):

    types = {"numpy": ["numpy", ]}

    def operate(self, *inputs, **kargs):
        out = inputs[0]
        for input_ in inputs[1:]:
            out += input_
        return out

    def __getitem_propagate__(self, key, input_index):
        return key

rg1 = ds.get_execution_graph("range", transformers=[FlipPropagateOk(),])
rg2 = ds.get_execution_graph("range", transformers=[TwiceWithPropagate()])

add = ADD(rg1, rg2)

print(add[5:-6])

[38. 38. 38. 38. 38. 38. 38. 38. 38.]


Now why do we also send the index to the operator?

This can be useful for complex operators that take in a lot of inputs. Two use case come to mind:

* The indexing is different based on the position of the input
* the indexing can generate a result that kills propagation (e.g do nothing)

Let's create an operator that would act as a virtual concatenator.

The operator will take feature read by our slicing loader, for simplicity we will assume the features are all 10 long

In the `__get_item_propagate__` function we will check if the input is in the range requested. If not we will kill propagtion, otherwise we figure the indices needed for that feature.

It is important to note that `input_index` is passed as a keyword, so our function definition **MUST** declare it with this exact name.


In [14]:
class VirtualConcatenator(kosh.KoshOperator):
    types = {"numpy":["numpy",]}
        
    def __init__(self, *inputs, **kargs):
        # Assume each input is 10 long
        self.length=len(inputs) * 10
        super(VirtualConcatenator, self).__init__(*inputs, **kargs)

    def __len__(self):
        return self.length

    def operate(self, *inputs, **args):
        out = None
        # This line purpose is to show how the propagate worked
        print("Received:" ,inputs)
        for input_ in inputs:
            if input_ is not None:
                # We got data back
                if out is None:
                    out = numpy.array(input_)
                else:
                    out = numpy.concatenate((out, numpy.array(input_)))
        return out

 

Now let's setup a dozen input "features" to this operator.

In [15]:
ds = store.create()
ds.associate("10", mime_type="range")

VC = VirtualConcatenator(*[ds["range"] for x in range(12)])

all = VC[15:63]
len(all)

Received: (array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64))


0

the `key` was passed to our loader, because it has a `__get_item__` function. Unfortunately slice(15,65) is empty for all datasets...

Let's implement the propagate:

In [16]:
class VirtualConcatenator(kosh.KoshOperator):
    types = {"numpy":["numpy",]}
        
    def __init__(self, *inputs, **kargs):
        # Assume each input is 10 long
        self.length=len(inputs) * 10
        super(VirtualConcatenator, self).__init__(*inputs, **kargs)

    def __len__(self):
        return self.length

    def operate(self, *inputs, **args):
        out = None
        # This line purpose is to show how the propagate worked
        print("Received:" ,inputs)
        for input_ in inputs:
            if input_ is not None:
                # We got data back
                if out is None:
                    out = numpy.array(input_)
                else:
                    out = numpy.concatenate((out, numpy.array(input_)))
        return out
    
    def __getitem_propagate__(self, key, input_index):
        """only implementing slices with positive numbers"""
        start = key.start
        if start is None:
            start = 0
        stop = key.stop
        if stop is None:
            stop = self.length
        start = start - (input_index)*10
        if start >= 10:
            # we start passed this feature
            # let's tell Kosh to not propagate
            return None
        elif start < 0:
            start = 0
        stop = stop - (input_index)*10
        if stop < 0:
            # we end before this starts
            # let's tell kosh to not propagte
            return None
        elif stop > 10:
            stop = 10
        
        # Ok there is some intersection
        return slice(start, stop, key.step)

VC = VirtualConcatenator(*[ds["range"] for x in range(12)])

all = VC[15:63]
len(all)

Received: (None, array([5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2.]), None, None, None, None, None)


48

Lastly it is worth noting, that you can control the value sent to `operate` when you abort the propagation. The default as seen above is `None` but you can set `self.index_results[index_result]` to whatever value you want. For example here let's use an empy array

In [17]:
class VirtualConcatenator(kosh.KoshOperator):
    types = {"numpy":["numpy",]}
        
    def __init__(self, *inputs, **kargs):
        # Assume each input is 10 long
        self.length=len(inputs) * 10
        super(VirtualConcatenator, self).__init__(*inputs, **kargs)

    def __len__(self):
        return self.length

    def operate(self, *inputs, **args):
        out = numpy.array(inputs[0])
        # This line purpose is to show how the propagate worked
        print("Received:" ,inputs)
        for input_ in inputs[1:]:
            out = numpy.concatenate((out, numpy.array(input_)))
        return out
    
    def __getitem_propagate__(self, key, input_index):
        """only implementing slices with positive numbers"""
        start = key.start
        if start is None:
            start = 0
        stop = key.stop
        if stop is None:
            stop = self.length
        start = start - (input_index)*10
        if start >= 10:
            # we start passed this feature
            # let's tell Kosh to not propagate
            # And return an empty array
            self.index_results[input_index] = numpy.array([])
            return None
        elif start < 0:
            start = 0
        stop = stop - (input_index)*10
        if stop < 0:
            # we end before this starts
            # let's tell kosh to not propagte
            # And return an empty array
            self.index_results[input_index] = numpy.array([])
            return None
        elif stop > 10:
            stop = 10
        
        # Ok there is some intersection
        return slice(start, stop, key.step)

VC = VirtualConcatenator(*[ds["range"] for x in range(12)])

all = VC[15:63]
len(all)

Received: (array([], dtype=float64), array([5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2.]), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64))


48