# Manual: How to create user minibatch sources

In order to make use of CNTK’s (distributed) training functionality, one has to provide the input data as an instance of [MinibatchSource](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.MinibatchSource). In CNTK, there are a variety of means to provide minibatch sources:

- (**best**) convert data to the formats of built-in data readers - they support rich functionality of randomization/packing with high performance (see [Manual: How to feed data](https://github.com/Microsoft/CNTK/blob/master/Manual/Manual_How_to_feed_data.ipynb) and [cntk.io](https://cntk.ai/pythondocs/cntk.io.html))
- (**preferred**) if it is hard to convert the data and the data can fit in memory, please use [MinibatchSourceFromData](https://cntk.ai/pythondocs/cntk.io.html?highlight=minibatchsourcefromdata#cntk.io.MinibatchSourceFromData), 
- if the data does not fit in memory and you want a fine grained control over how minibatch is created, then implementing the abstract  [UserMinibatchSource](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.UserMinibatchSource) interfac is the option. 

This manual explains how to create user minibatch source in Python. A minibatch source is made to facilitate the CNTK training loops providing:
1. meta-information regarding the data, such as *storage format*, *data type*, *shape of elements*,
2. minibatch data, and
3. auxiliary information for advanced features of the training loops, such as checkpoint state of the current access position of the data source so as to enable restoring interrupted training from the checking points.

Correspondingly, in the minibatch source API, we need to implement the following methods:

1. meta information for the learning loops regarding the data, e.g. *storage format*, *data type*, *shape of elements*  (see [StreamInformation](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.StreamInformation) for details), 
2. next minibatch data when given the following parameters 1) *the number of samples*, 2) *the number of learning workers* which are working on the training in parrallel, 3) *the worker rank* which identifies which specific worker is requesting this minibatch data the outer learning loops when the outer loops 
3. implementing get_checkpoint_state() and restore_from_checkpoint(state) menthods so that the CNTK checkpoint mechanism can save the minibatch source states and restore the learning process from the save checkpoints without wasting time repeating training again (see [trainer checkpoint](https://cntk.ai/pythondocs/cntk.train.trainer.html?highlight=checkpoint#cntk.train.trainer.Trainer.restore_from_checkpoint) and [training session checkpiont config](https://cntk.ai/pythondocs/cntk.train.training_session.html?highlight=checkpoint#cntk.train.training_session.CheckpointConfig) for more information). 

All built-in and user minibatch data sources implement the above functionalties.

## User minibatch source
[UserMinibatchSource](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.UserMinibatchSource) is an abstract interface enable end users to implement his/her own data minibatch data feeding mechanism. To implement a UserMinibatchSource, the user need to implement the following four methods to providing the minibatch source functionality described above:

1. **stream_infos()**: return a dictionary mapping from names to state variable content
2. **next_minibatch(num_samples, number_of_workers, worker_rank, device=None)**: return next minibatch of data given the outer learning loops provide the following parameters
    * num_samples: the number of samples that are being requested 
    * num_of_workders: the number of workers in a distributed training session; if this is not a distributed training session, this number is always 1
    * worker_rank: the number which identifies the specific worker who requests this minibatch in the distributed training setting; if this is not a distributed training session, the worker rank is always 0 (the first worker)
    * device: a device descriptor specifying which device the minibatch data should be copied to, e.g. cntk.device.cpu() or cntk.device.gpu(device_id)  (see [DeviceDescriptor](https://cntk.ai/pythondocs/cntk.device.html#cntk.device.DeviceDescriptor) for details)
3. **get_checkpoint_state()**
4. **restore_from_checkpoint(state)**

In the following example, we will detail the steps on how to implement the  [UserMinibatchSource](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.UserMinibatchSource) interface. 

## Single node user minibatch source

<a id='single_node_mb_source'></a>
In the following MyDataSource example, the major steps are:

* In MyDataSource constructor,
    * prepare  state of the data loading: e.g. mark the initial location of data
    * define  data stream information: e.g. StreamInformation("features", ...) and StreamInformation("labels",...)
* Override the stream_infos() method to 
    * return the list of stream information: e.g. [self.fsi, self.lsi]
* Override a next_minibath() method to 
    * based on the current accessing state of the data, retrieve the data for next minibatch and set data state for next minibatch retrieval: e.g. _prepare_nextbatch()
    * wrap the retrieved data into a format that is convenient for CNTK internal usage: e.g. call cntk.Value constructors
    * create a dictionary mapping stream information to the minibatch data: e.g. {self.fsi:  MinibatchData(f_data, ...), self.lsi: MinibatchData(l_data, ...)

Note that in this example, for description simplicity we load the whole data set into the memory. In practice, the minibatch source should depend on the data source state (e.g. the mapping between the requesting next batch data and its logical/physical location in the data storage) to load (or pre-load) the data at the point (or right before) they are requested.

In [1]:
import numpy as np
import cntk as C
from cntk.io import UserMinibatchSource, StreamInformation, MinibatchData

In [3]:


# Our toy test data contains two sequences:
# 1) the first column is the sequence ID: e.g. 0 is the ID for sequecne 0; and 1 is the ID for sequence 1, 
# 2) the second column is feature 'x' which is a sparse representation for the words in our training data, and
# 3) the third column is our lable 'y' which is the one-hot representation of label.
MBDATA = r'''0      |x 560:1        |y 1 0 0 0 0
0   |x 0:1
0   |x 0:1
1   |x 560:1        |y 0 1 0 0 0
1   |x 0:1
1   |x 0:1
1   |x 424:1
'''

class MyDataSource(UserMinibatchSource):
    def __init__(self, f_dim, l_dim):
        self.f_dim, self.l_dim = f_dim, l_dim

        self._prepare_data()

        #setting the state
        self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
        self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
        self.sequences = sorted(self.data)
        self.next_seq_idx = 0        
        


        super(MyDataSource, self).__init__()

    def _prepare_data(self):
        """
        Parse the text and load the data into self.data. 
        self.data is of the following structure:
           sequence id -> "features" -> list of features
        and
          sequence id -> "labels" -> label
        """
        # MBDATA fits into memory, so we will read it in all at once. Normally, however,
        # it does not, in which case we would need to keep track of the position in the
        # file until which we have already provided the data.
        # It follows the CNTKTextFormat specification
        #   sequence ID |feature1 data |feature2 data
        # where in this case feature1's data is encoded as one-hot and we will
        # convert to CSR, and feature2's data is a one-hot encoded as dense.


        self.data = {}
        for line in MBDATA.split('\n'):
            line = line.strip()
            if not line:
                continue
            seq_id, data = line.split('|', 1)
            data = data.split("|")
            seq_id = int(seq_id.strip())

            if seq_id not in self.data:
                self.data[seq_id] = {'features': []}

            # Processing features - expecting one per line.
            features = data[0].split(" ")
            vocab_idx = int(features[1].split(":")[0])
            self.data[seq_id]['features'].append(vocab_idx)

            # Process label, if exists
            if len(data) == 2:
                labels = np.asarray([data[1].split(" ")[1:]], dtype=np.float32)
                self.data[seq_id]['labels'] = labels
    
    def _prepare_nextbatch(self, num_samples):
       # Note that in this example we do not yet make use of number_of_workers or
        # worker_rank, which will limit the minibatch source to single GPU / single node
        # scenarios.
        features = []
        labels = []

        sweep_end = False

        f_sample_count = l_sample_count = 0

        while max(f_sample_count, l_sample_count) < num_samples:
            if self.next_seq_idx == len(self.sequences):
                sweep_end = True
                self.next_seq_idx = 0

            seq_id = self.sequences[self.sequences[self.next_seq_idx]]

            f_data = self.data[seq_id]['features']
            l_data = self.data[seq_id]['labels']
            if (features or labels) and max(f_sample_count+len(f_data), l_sample_count+len(l_data)) > num_samples:
                break
            f_sample_count += len(f_data)
            features.append(f_data)

            l_sample_count += len(l_data)
            labels.append(l_data)

            self.next_seq_idx += 1
        return features, f_sample_count, labels, l_sample_count, sweep_end
        
    def stream_infos(self):
        """
        Override the stream_infos method of the base class --- UserMinibatchSource --- to provide stream meta information.
        """
        return [self.fsi, self.lsi]

    def next_minibatch(self, num_samples, number_of_workers=1, worker_rank=0, device=None):
        """
        Override the next_minibatch method of the base class --- UserMinibatchSource --- to provide minibatch data.
        """
        # Note that in this example we do not yet make use of number_of_workers or
        # worker_rank, which will limit the minibatch source to single GPU / single node
        # scenarios.
        features, feature_sample_count, labels, label_sample_count, sweep_end = self._prepare_nextbatch(num_samples)
        num_seq = len(features)

        f_data = C.Value.one_hot(batch=features, num_classes=self.f_dim)
        l_data = C.Value(batch=np.asarray(labels, dtype=np.float32))

        result = {
                self.fsi: MinibatchData(f_data, num_seq, feature_sample_count, sweep_end),
                self.lsi: MinibatchData(l_data, num_seq, label_sample_count, sweep_end)
                }


        return result

### Using the user minibatch data source in trainning sessions
After we define a user minibatch source, such a minitbatch source can then be used wherever a MinibatchSource instance is accepted. For example, 

In [38]:
input_dim = 1000
num_output_classes = 5

# instantiating the user minibatch source
mbs = MyDataSource(input_dim, num_output_classes)
feature = C.sequence.input_variable(shape=(input_dim,))
label = C.input_variable(shape=(num_output_classes,))

# setting up the model
rnn = C.layers.Recurrence(C.layers.LSTM(20), go_backwards=False)(feature)
end = C.sequence.last(rnn)
z = C.layers.Dense(num_output_classes)(end)
ce = C.cross_entropy_with_softmax(z, label)
errs = C.classification_error(z, label)
learner = C.sgd(z.parameters, C.learning_rate_schedule(0.5, unit = C.UnitType.sample) )

# and train
trainer = C.Trainer(z, (ce, errs), [learner], [C.logging.ProgressPrinter(tag='Training', num_epochs=10)])
input_map = {
    feature: mbs.fsi,
    label: mbs.lsi
}

session = C.training_session(
    trainer = trainer, 
    mb_source = mbs,
    model_inputs_to_streams = input_map,
    mb_size = 4, 
    max_samples = 80,
    progress_frequency = 20
)
session.train()

Learning rate per sample: 0.5
Finished Epoch[1 of 10]: [Training] loss = 1.077986 * 20, metric = 95.00% * 20 0.107s (186.9 samples/s);
Finished Epoch[2 of 10]: [Training] loss = 0.424429 * 20, metric = 0.00% * 20 0.076s (263.2 samples/s);
Finished Epoch[3 of 10]: [Training] loss = 0.082140 * 20, metric = 0.00% * 20 0.072s (277.8 samples/s);
Finished Epoch[4 of 10]: [Training] loss = 0.030935 * 20, metric = 0.00% * 20 0.071s (281.7 samples/s);


## Multiple node user minibatch source

To define user minibatch source that can be used with distributed learners, e.g. BlockMomentum. We will need to use number_of_workers to cut the data in slices and then return the slices depending on which worker_rank requested the next minibatch.

In the example below, we create a multi-worker data source. The difference between MyMultiWorkerDataSource below and the single node data source example [MyDataSource](#single_node_mb_source) is minimum:
* Change _prepare_nextbatch(self, num_samples) to _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank) to take into account the number of workers and the worker rank when preparing minibatch data: i.e. 
    * only the data that are supposed to be handled by the exact worker (identifed by the worker rank) will be returned
* In next_minibatch(), provide workder number and worker rank to _prepare_nextbatch and then wrap the minibatch data in a CNTK internal representation in the same way as for single node minibatch data

**Note that it is the user's resposibility to guarantee that 1) the data can be accessed in multiple nodes, 2) the data can be accessed in multiple threads. **

In [73]:
# Our toy test data contains two sequences:
# 1) the first column is the sequence ID: e.g. 0 is the ID for sequecne 0; and 1 is the ID for sequence 1, 
# 2) the second column is feature 'x' which is a sparse representation for the words in our training data, and
# 3) the third column is our lable 'y' which is the one-hot representation of label.
sample_data = r'''0      |x 560:1        |y 1 0 0 0 0
0   |x 0:1
0   |x 0:1
1   |x 560:1        |y 0 1 0 0 0
1   |x 0:1
1   |x 0:1
1   |x 424:1
2   |x 160:1        |y 0 0 1 0 0
2   |x 5:1
2   |x 6:1
3   |x 460:1        |y 0 0 0 1 0
3   |x 3:1
3   |x 3:1
3   |x 425:1
'''

class MyMultiWorkerDataSource(UserMinibatchSource):
    def __init__(self, file_io, f_dim, l_dim):
        self.f_dim, self.l_dim = f_dim, l_dim
        self.file_io = file_io
        

        #setting the state
        self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
        self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
        self.io_pos = self.file_io.tell()
        super(MyMultiWorkerDataSource, self).__init__()

    def _parse_line(self, line):
        res = {}
        seq_id, data = line.split('|', 1)
        seq_id = int(seq_id.strip())
        data = data.split("|")        
        # Processing features - expecting one per line.
        features = data[0].split(" ")
        vocab_idx = int(features[1].split(":")[0])
        features = vocab_idx
        # Process label, if exists
        if len(data) == 2:
            labels = np.asarray([data[1].split(" ")[1:]], dtype=np.float32)
        else:
            labels = None
        return seq_id, features, labels
        
    def _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank):
        features = []
        labels = []

        sweep_end = False

        f_sample_count = l_sample_count = 0

        seq_id = -1
        feature_data = []
        label_data = None

        #seek to the previous reading location:
        self.file_io.seek(self.io_pos)
        while max(f_sample_count, l_sample_count) < num_samples:
            io_pos = self.file_io.tell()
            line = self.file_io.readline()
            if "" == line:
                sweep_end = True
                #rewind to the begining
                self.file_io.seek(0)
                seq_id = -1
                continue
                
            next_seq_id, next_feature, next_label = self._parse_line(line)
            if seq_id != next_seq_id:
                if seq_id != -1:
                    #a sequence is ready:
                    #Based on the worker rank, determines whether to add this data in the batch: 
                    #If the sequences doesn't belong to this worker, skip it
                    #In practice, this should be  based on more efficient mechanism: e.g. based on the location of the worker
                    #and the data location
                    print('io pos: ', io_pos)
                    print('seq_id:', seq_id)
                    print('workder_rank: ', worker_rank)
                    print('feature_data: ', feature_data)
                    #if (seq_id % number_of_workers) != worker_rank:
                    #    continue

                    if (feature_data or label_data) and max(f_sample_count+len(feature_data), l_sample_count+len(label_data)) > num_samples:
                        print('===two much sample; reserve for next batch== ')
                        print('save io pos: ', self.io_pos)                        
                        break
                    f_sample_count += len(feature_data)
                    features.append(feature_data)
                    l_sample_count += len(label_data)
                    labels.append(label_data)
                #start a new sequence
                seq_id = next_seq_id
                feature_data = [next_feature]
                label_data = next_label
                #record the start io pos of this sequence for the future when restart is needed
                self.io_pos = io_pos

            else:    
                #continue the previous sequence
                feature_data.append(next_feature)
                if next_label is not None:
                    label_data = next_label
                

        return features, f_sample_count, labels, l_sample_count, sweep_end
        
    def stream_infos(self):
        """
        Override the stream_infos method of the base class --- UserMinibatchSource --- to provide stream meta information.
        """
        return [self.fsi, self.lsi]

    def next_minibatch(self, num_samples, number_of_workers, worker_rank, device=None):
        """
        Override the next_minibatch method of the base class --- UserMinibatchSource --- to provide minibatch data.
        """
        features, feature_sample_count, labels, label_sample_count, sweep_end = self._prepare_nextbatch(num_samples,
                                                                                                        number_of_workers, 
                                                                                                        worker_rank)
        num_seq = len(features)

        print('features to training: ', features)
        f_data = C.Value.one_hot(batch=features, num_classes=self.f_dim)
        l_data = C.Value(batch=np.asarray(labels, dtype=np.float32))

        result = {
                self.fsi: MinibatchData(f_data, num_seq, feature_sample_count, sweep_end),
                self.lsi: MinibatchData(l_data, num_seq, label_sample_count, sweep_end)
                }


        return result

In [74]:
import io

input_dim = 1000
num_output_classes = 5

# instantiating the user minibatch source
file_io =  io.StringIO(sample_data) #mock the string data as file io for self-contained explanation purpurse
mbs = MyMultiWorkerDataSource(file_io, input_dim, num_output_classes)
feature = C.sequence.input_variable(shape=(input_dim,))
label = C.input_variable(shape=(num_output_classes,))

# setting up the model
rnn = C.layers.Recurrence(C.layers.LSTM(20), go_backwards=False)(feature)
end = C.sequence.last(rnn)
z = C.layers.Dense(num_output_classes)(end)
loss = C.cross_entropy_with_softmax(z, label)
errs = C.classification_error(z, label)
local_learner = C.sgd(z.parameters, C.learning_rate_schedule(0.5, unit = C.UnitType.sample) )
dist_learner = C.distributed.data_parallel_distributed_learner(local_learner)
# and train
trainer = C.Trainer(z, (loss, errs), [dist_learner], [C.logging.ProgressPrinter(tag='Training', num_epochs=10)])
input_map = {
    feature: mbs.fsi,
    label: mbs.lsi
}
session = C.training_session(
    trainer = trainer, 
    mb_source = mbs,
    model_inputs_to_streams = input_map,
    mb_size = 7, 
    max_samples = 80,
    progress_frequency = 20
)
session.train()


io pos:  58
seq_id: 0
workder_rank:  0
feature_data:  [560, 0, 0]
io pos:  126
seq_id: 1
workder_rank:  0
feature_data:  [560, 0, 0, 424]
features to training:  [[560, 0, 0], [560, 0, 0, 424]]
io pos:  181
seq_id: 2
workder_rank:  0
feature_data:  [160, 5, 6]
io pos:  58
seq_id: 0
workder_rank:  0
feature_data:  [560, 0, 0]
io pos:  126
seq_id: 1
workder_rank:  0
feature_data:  [560, 0, 0, 424]
===two much sample; reserve for next batch== 
save io pos:  58
features to training:  [[160, 5, 6], [560, 0, 0]]
io pos:  126
seq_id: 1
workder_rank:  0
feature_data:  [560, 0, 0, 424]
io pos:  181
seq_id: 2
workder_rank:  0
feature_data:  [160, 5, 6]
features to training:  [[560, 0, 0, 424], [160, 5, 6]]
io pos:  58
seq_id: 0
workder_rank:  0
feature_data:  [560, 0, 0]
io pos:  126
seq_id: 1
workder_rank:  0
feature_data:  [560, 0, 0, 424]
features to training:  [[560, 0, 0], [560, 0, 0, 424]]
io pos:  181
seq_id: 2
workder_rank:  0
feature_data:  [160, 5, 6]
io pos:  58
seq_id: 0
workder_rank:

IndexError: list index out of range

RuntimeError: SWIG director method error.

### Using the user minibatch data source in trainning sessions with distributed learners
After we define a user minibatch source which takes into accoun ther worker number and worker rank, such a minitbatch source can then be used wherever a MinibatchSource instance is accepted. For example, 

In [None]:
#finalize the distributed learning
C.distributed.Communicator.finalize()