# Data Pipeline in Tensorflow
Example for splitting, caching, shuffling and batching data.

https://www.tensorflow.org/guide/data

In [3]:
from os.path import exists

import tensorflow as tf
from tensorflow.keras import preprocessing
from tensorflow.keras import layers, models


## Simple Example with Numpy Array

#### 1. Load Data

In [31]:
N = 80_000 # 

In [32]:
dataset = tf.data.Dataset.random(seed=42).take(N)
dataset

<TakeDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>

In [39]:
for x in dataset.skip(0).take(7):
    print(x)

tf.Tensor(2985944072, shape=(), dtype=int64)
tf.Tensor(4132877644, shape=(), dtype=int64)
tf.Tensor(929418493, shape=(), dtype=int64)
tf.Tensor(249609589, shape=(), dtype=int64)
tf.Tensor(146598941, shape=(), dtype=int64)
tf.Tensor(4149265688, shape=(), dtype=int64)
tf.Tensor(1024970628, shape=(), dtype=int64)


In [14]:
%%time
for x in dataset:
    y = x
    if N < 10:
        print(x.numpy())

CPU times: user 3.7 s, sys: 3.85 ms, total: 3.71 s
Wall time: 3.7 s


*N = 80_000*: 
CPU times: user 6.43 s, sys: 3.97 ms, total: 6.44 s
Wall time: 6.43 s

#### 2. Cache Data

In [40]:
dataset = dataset.cache()
dataset

<CacheDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>

In [43]:
for x in dataset.skip(0).take(7):
    print(x)

tf.Tensor(2985944072, shape=(), dtype=int64)
tf.Tensor(4132877644, shape=(), dtype=int64)
tf.Tensor(929418493, shape=(), dtype=int64)
tf.Tensor(249609589, shape=(), dtype=int64)
tf.Tensor(146598941, shape=(), dtype=int64)
tf.Tensor(4149265688, shape=(), dtype=int64)
tf.Tensor(1024970628, shape=(), dtype=int64)


2022-05-11 07:11:16.948607: W tensorflow/core/kernels/data/cache_dataset_ops.cc:768] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [22]:
%%time
for x in dataset:
    y = x**2
    if N < 10:
        print(x.numpy())

CPU times: user 6.47 s, sys: 11.8 ms, total: 6.49 s
Wall time: 6.49 s


*N = 80_000*: 
CPU times: user 6.53 s, sys: 4 ms, total: 6.53 s
Wall time: 6.53 s

#### 3. Prepare Data (Shuffle)

In [17]:
print(len(dataset))

80000


In [44]:
dataset = dataset.shuffle(N) # For perfect shuffling, a buffer size greater than or equal to the full size of the dataset is required.
dataset

<ShuffleDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>

In [53]:
for x in dataset.skip(0).take(7):
    print(x)

tf.Tensor([2003472655  143275239  235426856], shape=(3,), dtype=int64)
tf.Tensor([1573696505 3567337857 3229029281], shape=(3,), dtype=int64)
tf.Tensor([1226379757 1810802461  746452079], shape=(3,), dtype=int64)
tf.Tensor([4230827482 3665857522 2767622597], shape=(3,), dtype=int64)
tf.Tensor([2055930460 2232576504 2654478964], shape=(3,), dtype=int64)
tf.Tensor([2539911038 4274633663 4278636127], shape=(3,), dtype=int64)
tf.Tensor([1469424630 2675207502  399910036], shape=(3,), dtype=int64)


#### 4. Batch Data

In [46]:
dataset = dataset.batch(3)
dataset

<BatchDataset element_spec=TensorSpec(shape=(None,), dtype=tf.int64, name=None)>

In [47]:
for x in dataset.take(4):
    print(x)

tf.Tensor([1325902236 3895323289 2154609534], shape=(3,), dtype=int64)
tf.Tensor([3870700915  437146545  523678125], shape=(3,), dtype=int64)
tf.Tensor([3600099156  693626229 2629645225], shape=(3,), dtype=int64)
tf.Tensor([ 735790293 1019133584 1208373853], shape=(3,), dtype=int64)


## Example with Text File

#### 1. Load Data

In [None]:
!pwd

In [None]:
# 150MB Textfile
folder_path_raw = "../../data/raw/SciFi_Stories_Text/"
file_name_raw = "internet_archive_scifi_v3.txt"

In [None]:
# small text files with pseudo text
folder_path_raw = "../../data/raw/lorem_ipsum/"
file_name_raw = "lorem_ipsum_500.txt"

In [None]:
file_path_raw = folder_path_raw + file_name_raw

In [None]:
!ls $folder_path_raw
assert exists(file_path_raw)


In [None]:
# just have a look
with open(file_path_raw, "r") as file:
    for count, line in enumerate(file):
        pass
    print("Total lines: ", count+1)

In [None]:
dataset = tf.data.TextLineDataset(file_path_raw)

In [None]:
batch_size = 2
vocabulary_size = 12_000
sequence_length = 16
seed = 123

##### B

In [None]:
dataset
info

In [None]:
for line in dataset:
    print(line)

In [None]:
batched_dataset = dataset.batch(2)
for line in batched_dataset:
    print(line)

In [None]:
for line in dataset.take(3):
    print(line.numpy())

In [None]:
dataset = preprocessing.text_dataset_from_directory(
    file_path,
    labels=None,
    batch_size=batch_size,
    seed=seed
)

In [None]:
encoder = layers.TextVectorization(
    max_tokens=vocabulary_size,
    output_sequence_length=sequence_length,
    standardize="lower_and_strip_punctuation",
    split="whitespace",
    output_mode="int"
)

In [None]:
dataset

In [None]:
for x in dataset:
    print(x)

In [None]:
!ls $file

In [None]:
dataset = tf.data.Dataset.from_tensor_slices([0, 9, 2, 5, 6, 8, 7])

In [None]:
dataset

In [None]:
for elem in dataset:
    print(elem)

In [None]:
it = iter(dataset)

In [None]:
print(next(it))

In [None]:
dataset = tf.data.Dataset.range(10)

In [None]:
dataset

In [None]:
for x in dataset:
    print(x)

In [None]:
dataset.shuffle(1)

In [None]:
dataset

In [None]:
for x in dataset.shuffle(4):
    print(x)

In [None]:
batched_dataset = dataset.batch(3)

In [None]:
for x in batched_dataset:
    print(x)

In [None]:
dataset.shuffle(32)

In [None]:
batched_dataset.shuffle(32)

In [None]:
for x in batched_dataset.shuffle(32):
    print(x)