# Getting started with TensorFlow's `Dataset` API (continuation)

In this notebook we will learn how to divide the dataset over the ranks in distributed training.

The following steps were done on one of the previous notebooks. If necessary they can be run again on a new cell.
```bash
wget https://raw.githubusercontent.com/uiuc-cse/data-fa14/gh-pages/data/iris.csv
echo "sepal_length,sepal_width,petal_length,petal_width,species" > iris_setosa.csv
grep setosa iris.csv >> iris_setosa.csv
echo "sepal_length,sepal_width,petal_length,petal_width,species" > iris_versic.csv
grep versicolor iris.csv >> iris_versic.csv
echo "sepal_length,sepal_width,petal_length,petal_width,species" > iris_virgin.csv
grep virginica iris.csv >> iris_virgin.csv
```

In [1]:
import ipcmagic

In [2]:
%ipcluster start -n 2 --mpi

IPCluster is ready! (5 seconds)


In [3]:
%%px
import tensorflow as tf
import horovod.tensorflow.keras as hvd

In [4]:
%%px
tf.version.VERSION

[0;31mOut[0:2]: [0m'2.3.0'

[0;31mOut[1:2]: [0m'2.3.0'

In [5]:
%%px
def parse_columns(*row, classes):
    """Convert the string classes to one-hot encoded:
    setosa     -> [1, 0, 0]
    virginica  -> [0, 1, 0]
    versicolor -> [0, 0, 1]
    """
    features = tf.convert_to_tensor(row[:4])
    label_int = tf.where(tf.equal(classes, row[4]))
    label = tf.one_hot(label_int, 3)
    return features, label


def get_csv_dataset(filename):
    return tf.data.experimental.CsvDataset(filename, header=True,
                                           record_defaults=[tf.float32,
                                                            tf.float32,
                                                            tf.float32,
                                                            tf.float32,
                                                            tf.string])

## Using Shards <a id='using_shards'></a>


Let's consider a distributed training with two ranks to see what happens with the data on each worker. In distributed training one can use [`tf.data.Dataset.shard`]( https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shard) to divide the dataset over the ranks, otherwise the same data might be sent to each of the workers.

Let's consider:
 * `tf.data.Dataset.list_files` with `shuffle=True`.
 * `tf.data.Dataset.list_files` with `shuffle=False`.
 * Shard before interleaving.
 * Shard after interleaving.

In [9]:
%%px
hvd.init()

In [12]:
%%px
hvd.size(), hvd.rank()

[0;31mOut[0:9]: [0m(2, 0)

[0;31mOut[1:9]: [0m(2, 1)

In [67]:
%%px
dataset = tf.data.Dataset.list_files(['iris_setosa.csv',
                                      'iris_versic.csv'],
                                      shuffle=False)  # `shuffle=False` to ensure that for both ranks the files are in the same order.
dataset = dataset.interleave(get_csv_dataset,
                             cycle_length=2,
                             block_length=1,
                             num_parallel_calls=1)
dataset = dataset.shard(hvd.size(), hvd.rank())
dataset = dataset.map(lambda *row: parse_columns(*row, classes=['setosa', 'virginica', 'versicolor']))

for x, y in dataset.take(5):
    print(f'features: {x}    label: {y}')

[stdout:0] 
x: [5.1 3.5 1.4 0.2]    y: [[[1. 0. 0.]]]
x: [4.9 3.  1.4 0.2]    y: [[[1. 0. 0.]]]
x: [4.7 3.2 1.3 0.2]    y: [[[1. 0. 0.]]]
x: [4.6 3.1 1.5 0.2]    y: [[[1. 0. 0.]]]
x: [5.  3.6 1.4 0.2]    y: [[[1. 0. 0.]]]
[stdout:1] 
x: [7.  3.2 4.7 1.4]    y: [[[0. 0. 1.]]]
x: [6.4 3.2 4.5 1.5]    y: [[[0. 0. 1.]]]
x: [6.9 3.1 4.9 1.5]    y: [[[0. 0. 1.]]]
x: [5.5 2.3 4.  1.3]    y: [[[0. 0. 1.]]]
x: [6.5 2.8 4.6 1.5]    y: [[[0. 0. 1.]]]


In [68]:
%ipcluster stop