# Scalable Tensorflow Dataset Introduction

In [94]:
import os
import tensorflow as tf
import tensorflow_datasets as tfds
import pandas as pd
import numpy as np

## Datasets, Pandas, and Numpy

In [8]:
df = pd.DataFrame(range(10))
df

Unnamed: 0,0
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9


In [9]:
df.values

array([[0],
       [1],
       [2],
       [3],
       [4],
       [5],
       [6],
       [7],
       [8],
       [9]])

In [21]:
type(df.values)

numpy.ndarray

In [22]:
tf.data.Dataset.from_tensor_slices(df)

<TensorSliceDataset shapes: (1,), types: tf.int64>

In [26]:
ds_from_tensor_slices = tf.data.Dataset.from_tensor_slices(df)
list(ds_from_tensor_slices.as_numpy_iterator())

[array([0]),
 array([1]),
 array([2]),
 array([3]),
 array([4]),
 array([5]),
 array([6]),
 array([7]),
 array([8]),
 array([9])]

In [24]:
ds_from_tensors = tf.data.Dataset.from_tensors(df)
list(ds_from_tensors.as_numpy_iterator())

[array([[0],
        [1],
        [2],
        [3],
        [4],
        [5],
        [6],
        [7],
        [8],
        [9]])]

In [35]:
print("0th element from ds from tensor slices:", next(ds_from_tensor_slices.as_numpy_iterator()))
print()
print("0th element from ds from tensors:", next(ds_from_tensors.as_numpy_iterator()))

0th element from ds from tensor slices: [0]

0th element from ds from tensors: [[0]
 [1]
 [2]
 [3]
 [4]
 [5]
 [6]
 [7]
 [8]
 [9]]


In [37]:
range_ds = tf.data.Dataset.from_tensor_slices(list(range(10)))
print(range_ds)
print(list(range_ds.as_numpy_iterator()))

<TensorSliceDataset shapes: (), types: tf.int32>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [41]:
def print_ds(ds: tf.data.Dataset) -> None:
    print(list(ds.as_numpy_iterator()))

In [42]:
interleaved_ds = range_ds.interleave(lambda i: tf.data.Dataset.from_tensor_slices(range(i)))
print(interleaved_ds)
print_ds(interleaved_ds)

<InterleaveDataset shapes: (), types: tf.int32>
[0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 2, 2, 2, 2, 2, 2, 1, 3, 3, 3, 3, 3, 2, 4, 4, 4, 4, 3, 5, 5, 5, 4, 6, 6, 5, 7, 6, 7, 8]


In [45]:
print_ds(interleaved_ds.shuffle(buffer_size=10))
print_ds(interleaved_ds.shuffle(buffer_size=100))

[0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 2, 0, 1, 0, 1, 2, 2, 3, 3, 1, 3, 2, 1, 4, 3, 4, 2, 4, 2, 5, 5, 5, 6, 2, 4, 6, 5, 7, 6, 3, 0, 3, 7, 4, 8]
[0, 1, 1, 1, 3, 6, 0, 1, 0, 5, 4, 3, 5, 0, 0, 4, 5, 7, 1, 0, 7, 2, 2, 3, 3, 2, 0, 3, 1, 4, 0, 8, 5, 0, 6, 6, 4, 4, 2, 1, 2, 3, 2, 1, 2]


In [46]:
print_ds(interleaved_ds.batch(10))

[array([0, 0, 0, 0, 0, 0, 0, 0, 1, 1], dtype=int32), array([1, 1, 1, 1, 1, 0, 2, 2, 2, 2], dtype=int32), array([2, 2, 1, 3, 3, 3, 3, 3, 2, 4], dtype=int32), array([4, 4, 4, 3, 5, 5, 5, 4, 6, 6], dtype=int32), array([5, 7, 6, 7, 8], dtype=int32)]


In [48]:
print("first batch - then shuffle:")
print_ds(interleaved_ds.batch(10).shuffle(10))
print()
print("first shuffle - then batch:")
print_ds(interleaved_ds.shuffle(10).batch(10))

first batch - then shuffle:
[array([0, 0, 0, 0, 0, 0, 0, 0, 1, 1], dtype=int32), array([2, 2, 1, 3, 3, 3, 3, 3, 2, 4], dtype=int32), array([5, 7, 6, 7, 8], dtype=int32), array([1, 1, 1, 1, 1, 0, 2, 2, 2, 2], dtype=int32), array([4, 4, 4, 3, 5, 5, 5, 4, 6, 6], dtype=int32)]

first shuffle - then batch:
[array([1, 0, 0, 1, 0, 1, 0, 1, 0, 1], dtype=int32), array([2, 0, 2, 1, 0, 1, 2, 3, 0, 3], dtype=int32), array([4, 2, 3, 2, 2, 1, 4, 0, 4, 3], dtype=int32), array([6, 5, 3, 6, 7, 8, 3, 4, 5, 5], dtype=int32), array([6, 4, 5, 7, 2], dtype=int32)]


# Datasets and Generators: What if the data doesn't fit into memory?

In [52]:
for i in range(3):
    print(i)

0
1
2


In [54]:
try:
    tf.data.Dataset.from_tensor_slices(range(3))
except TypeError as e:
    print(e)

'list' object cannot be interpreted as an integer


In [55]:
try:
    tf.data.Dataset.from_tensors(range(3))
except TypeError as e:
    print(e)

'list' object cannot be interpreted as an integer


* cannot use tf.data.Dataset.from_tensor_slices
* cannot use tf.data.Dataset.from_tensors

In [66]:
def int_generator():
    yield from range(3)
    
ds_from_int_generator = tf.data.Dataset.from_generator(
    generator=int_generator,
    output_types=tf.int32
)
print(ds_from_int_generator)
print_ds(ds_from_int_generator)

<FlatMapDataset shapes: <unknown>, types: tf.int32>
[0, 1, 2]


In [68]:
def tuple_generator():
    yield from zip(range(3), reversed(range(3)))
    
ds_from_tuple_generator = tf.data.Dataset.from_generator(
    generator=tuple_generator,
    output_types=(tf.int32, tf.int32)
)
print(ds_from_tuple_generator)
print_ds(ds_from_tuple_generator)

<FlatMapDataset shapes: (<unknown>, <unknown>), types: (tf.int32, tf.int32)>
[(0, 2), (1, 1), (2, 0)]


In [69]:
tfds.benchmark(ds_from_tuple_generator)

1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 41.06 ex/sec (total: 3 ex, 0.07 sec)
Examples/sec (First only) 27.53 ex/sec (total: 1 ex, 0.04 sec)
Examples/sec (First excluded) 54.44 ex/sec (total: 2 ex, 0.04 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,0.073069,3,41.057198
first,0.036328,1,27.526714
lasts,0.03674,2,54.435916


## Attention: Python code can be really slow

In [74]:
benchmark_size = 2 * 10**4

# dataframe in-memory
large_df = pd.DataFrame(range(benchmark_size))
large_ds_from_tensor_slices = tf.data.Dataset.from_tensor_slices(large_df)

# generator in-memory
def large_generator():
    yield from range(benchmark_size)
large_ds_from_generator = tf.data.Dataset.from_generator(
    generator=large_generator,
    output_types=tf.int32
)

print("from tensor slices:")
tfds.benchmark(large_ds_from_tensor_slices)

print()
print()

print("from generator:")
tfds.benchmark(large_ds_from_generator)

from tensor slices:


  0%|          | 1/20000 [00:00<?, ?it/s]


************ Summary ************

Examples/sec (First included) 21514.45 ex/sec (total: 20000 ex, 0.93 sec)
Examples/sec (First only) 316.36 ex/sec (total: 1 ex, 0.00 sec)
Examples/sec (First excluded) 21586.78 ex/sec (total: 19999 ex, 0.93 sec)


from generator:


1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 3702.72 ex/sec (total: 20000 ex, 5.40 sec)
Examples/sec (First only) 95.46 ex/sec (total: 1 ex, 0.01 sec)
Examples/sec (First excluded) 3709.73 ex/sec (total: 19999 ex, 5.39 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,5.401434,20000,3702.720574
first,0.010476,1,95.456317
lasts,5.390958,19999,3709.730402


## The generator is ~7x slower.

In [126]:
print("batched from tensor slices:")
tfds.benchmark(large_ds_from_tensor_slices.batch(128))

print()
print()

print("batched from generator:")
tfds.benchmark(large_ds_from_generator.batch(128))

batched from tensor slices:


  1%|          | 1/157 [00:00<?, ?it/s]


************ Summary ************

Examples/sec (First included) 2949.73 ex/sec (total: 157 ex, 0.05 sec)
Examples/sec (First only) 156.71 ex/sec (total: 1 ex, 0.01 sec)
Examples/sec (First excluded) 3330.20 ex/sec (total: 156 ex, 0.05 sec)


batched from generator:


1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 40.29 ex/sec (total: 157 ex, 3.90 sec)
Examples/sec (First only) 27.15 ex/sec (total: 1 ex, 0.04 sec)
Examples/sec (First excluded) 40.42 ex/sec (total: 156 ex, 3.86 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,3.896529,157,40.292275
first,0.036839,1,27.145188
lasts,3.85969,156,40.417758


## even batched Generators are not that fast...
### You can use generators, but they are not optimal... However, nothing is impossible with them!

more complex generators:

In [80]:
# imo the prettiest way to implement 'method classes'

class MyGenerator:
    def __init__(self, benchmark_size: int) -> None:
        self.benchmark_size = benchmark_size
    
    def __call__(self, msg: str = "Hello"):
        print(f"I was run with '{msg}'!")
        yield from range(self.benchmark_size)

next(MyGenerator(1)(msg="world"))

I was run with 'world'!


0

In [81]:
ds_from_generator_class = tf.data.Dataset.from_generator(
    generator=MyGenerator(10**4),
    output_types=tf.int32
)

tfds.benchmark(ds_from_generator_class)

I was run with 'Hello'!


1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 3679.24 ex/sec (total: 10000 ex, 2.72 sec)
Examples/sec (First only) 61.51 ex/sec (total: 1 ex, 0.02 sec)
Examples/sec (First excluded) 3701.01 ex/sec (total: 9999 ex, 2.70 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,2.717951,10000,3679.242528
first,0.016258,1,61.506917
lasts,2.701692,9999,3701.013457


## Better Option: read Dataset asynchronously from Files
### Dump Dataframes into Dataset Files

In [90]:
DUMP_PATH_PREFIX = 'test'

In [96]:
rm -r $DUMP_PATH_PREFIX

In [97]:
mkdir $DUMP_PATH_PREFIX

In [98]:
ls $DUMP_PATH_PREFIX

In [148]:
large_df_for_dumping = pd.DataFrame(range(50_000))
large_ds_for_dumping = tf.data.Dataset.from_tensor_slices(large_df_for_dumping)

In [149]:
len(large_ds_for_dumping)

50000

In [150]:
print(large_ds_for_dumping)

<TensorSliceDataset shapes: (1,), types: tf.int64>


In [151]:
dump_paths = []

for i in range(4):
    dump_path = os.path.join(DUMP_PATH_PREFIX, f"{i}.dataset")
    tf.data.experimental.save(
        dataset=large_ds_for_dumping,
        path=dump_path,
        compression='GZIP'
    )
    print(f"stored to '{dump_path}'")
    dump_paths.append(dump_path)

stored to 'test/0.dataset'
stored to 'test/1.dataset'
stored to 'test/2.dataset'
stored to 'test/3.dataset'


In [153]:
print(os.popen(f"du -h --summarize {os.path.join(DUMP_PATH_PREFIX, '*')}").read())

8.1M	test/0.dataset
8.1M	test/1.dataset
8.1M	test/2.dataset
8.1M	test/3.dataset



### Load Datasets

In [154]:
dumped_files_ds = tf.data.Dataset.from_tensor_slices(dump_paths)
print(dumped_files_ds)
print_ds(dumped_files_ds)

<TensorSliceDataset shapes: (), types: tf.string>
[b'test/0.dataset', b'test/1.dataset', b'test/2.dataset', b'test/3.dataset']


In [155]:
element_spec = (tf.TensorSpec(shape=(1,), dtype=tf.int64),)
element_spec

(TensorSpec(shape=(1,), dtype=tf.int64, name=None),)

In [156]:
loaded_files_ds = dumped_files_ds.interleave(lambda path:
    tf.data.experimental.load(
        path=path,
        element_spec=element_spec,
        compression='GZIP'
    )
)
print(loaded_files_ds)

<InterleaveDataset shapes: ((1,),), types: (tf.int64,)>


In [158]:
# looks like round robin
print_ds(loaded_files_ds.take(12))

[(array([0]),), (array([0]),), (array([0]),), (array([0]),), (array([1]),), (array([1]),), (array([1]),), (array([1]),), (array([2]),), (array([2]),), (array([2]),), (array([2]),)]


In [157]:
tfds.benchmark(loaded_files_ds)

1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 13543.44 ex/sec (total: 200000 ex, 14.77 sec)
Examples/sec (First only) 46.91 ex/sec (total: 1 ex, 0.02 sec)
Examples/sec (First excluded) 13562.95 ex/sec (total: 199999 ex, 14.75 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,14.767295,200000,13543.441407
first,0.021318,1,46.908084
lasts,14.745977,199999,13562.953371


## Reading from Files is 3-4x faster than reading from a Generator

In [160]:
from typing import List

def load_ds_from_files(file_paths: List[str]) -> tf.data.Dataset:
    ds = tf.data.Dataset.from_tensor_slices(file_paths)
    ds = ds.interleave(lambda path:
        tf.data.experimental.load(
            path=path,
            element_spec=element_spec,
            compression='GZIP'
        ))
    return ds

In [161]:
tfds.benchmark(
    load_ds_from_files(dump_paths)\
    .shuffle(buffer_size=10**5)\
    .batch(128, drop_remainder=True)
)

1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 1754.55 ex/sec (total: 1562 ex, 0.89 sec)
Examples/sec (First only) 3.24 ex/sec (total: 1 ex, 0.31 sec)
Examples/sec (First excluded) 2683.08 ex/sec (total: 1561 ex, 0.58 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,0.890255,1562,1754.55271
first,0.308462,1,3.241888
lasts,0.581793,1561,2683.084075


## >10x Speed Improvement by using Batching 

In [162]:
tfds.benchmark(
    load_ds_from_files(dump_paths)\
    .shuffle(buffer_size=10**4)\
    .batch(128, drop_remainder=True)\
    .prefetch(tf.data.experimental.AUTOTUNE)
)

1it [00:00, ?it/s]


************ Summary ************

Examples/sec (First included) 2060.09 ex/sec (total: 1562 ex, 0.76 sec)
Examples/sec (First only) 17.49 ex/sec (total: 1 ex, 0.06 sec)
Examples/sec (First excluded) 2226.63 ex/sec (total: 1561 ex, 0.70 sec)


Unnamed: 0,duration,num_examples,avg
first+lasts,0.75822,1562,2060.087473
first,0.057162,1,17.494201
lasts,0.701058,1561,2226.633232


## Another 10-20% (at least) by using Prefetching