U ovom dokumentu biti će pokazano kako spremati podatke u TFRecord te ih iz njega pročitati.
Glavni cilj je ispitati utjecaj različitih načina zapisivanja podataka u TFRecord formatu, te utjecaj na veličinu
tako spremljenih podataka i brzinu čitanja sa diska.

# 1. Biblioteke, podatci i model za testiranje

##  1.1. Potrebne biblioteke za testiranje

In [1]:
#Uvoz potrebnih biblioteka za slijedeće korake
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import time
import os

## 1.2. Home direktorij u koji će biti pohranjeni rezultati eksperimenata

In [2]:
WORK_DIR = os.chdir("C:/Users/Public/Testni_podatci/")

## 1.3. Funkcija za podizanje skupa podataka za testiranje

In [3]:
def test_data_loader(dataset):
    data = dataset.load_data()
    #Koristimo samo trening podatke
    X, y = data[0][0], data[0][1]
    #Ispis dimenzija kreiranih podataka
    print("Dimenzije X:", X.shape,"\nDimenzije y:", y.shape)
    #Ispis tipa podatka za X i y
    print("Tip objekta za X:", X.dtype, "\nTip objekta za y:", y.dtype)
    return X, y

In [4]:
#Podizanje potrebnih podataka
from tensorflow.keras.datasets import mnist
X, y = test_data_loader(mnist)
#Uklanjanje nepotrebnih stvari
del mnist

Dimenzije X: (60000, 28, 28) 
Dimenzije y: (60000,)
Tip objekta za X: uint8 
Tip objekta za y: uint8


## 1.4. Funkcija za kreiranje kompajliranog modela

In [5]:
def build_compiled_model():
    
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.compile(optimizer='rmsprop', 
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model    

# 2. Pomoćne funkcije potrebne za kreiranje, zapisivanje i čitanje TFRecorda

## 2.1. Funkcija za pretvaranje array-a u raw bajt string

In [6]:
#Funkcija za kreiranje bytova iz slika - one su stringovi kada bi ih čitali
#kao jpeg sa diska - OPONAŠAMO OVAJ PROCES SA OVOM FUNKCIJOM
def images_as_bytes(image):
    '''Funkcija koja pretvara sliku u raw bajtove(string)'''
    #Iz razloga što mnist slike nemaju dimenziju "RGB",
    #a framework zahtjeva da slika ima 3 dimenzije
    image = np.expand_dims(image, axis=2)
    #Ne radimo enkodiranje u jpeg, što znači da će ovo imati veću težinu (uint8)
    #ali će biti brže čitanje
    return image.tobytes()

## 2.2. Pomoćne funkcije za kreiranje Feature-a određenog tipa

In [7]:
#Funkcije za stvaranje BytesList, Int64List, FloatList iz ulaznih vrijednosti
#ovo je potrebno da bi naši podatci mogli biti ulaz za Feature

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

## 2.3. Funkcija za kreiranje Example-a jednog opažanja u skupu podataka

In [8]:
#Funkcija za konverziju uzorka (slika + label) u Example
def sample2example(image, label):
    '''Radi pretvorbu slike i oznake u Example'''
    #Kreiramo Feature dict
    feature_dict = {
        "image": _bytes_feature(images_as_bytes(image)),
        "label": _int64_feature(label)
    }
    features = tf.train.Features(feature=feature_dict)
    return tf.train.Example(features=features)

## 2.4. Funkcija za serijalizaciju Example-a, i zapisivanje u TFRecord

In [9]:
def tfrecord_writer(filename, images, labels, use_compression=False):
    '''Zapisivanje TF Recorda u ciljanu datoteku, uz bool izbor kompresije'''
    
    if use_compression:
        options = tf.io.TFRecordOptions(compression_type="GZIP")
    else:
        options = None
    
    num_samples = 0    
    with tf.io.TFRecordWriter(filename, options=options) as writer:
        for image, label in zip(images, labels):
            example = sample2example(image, label)
            #Da bi mogli zapisati Example potrebno ga je serijalizirati
            writer.write(example.SerializeToString())
            num_samples += 1
    print("Broj zapisanih uzoraka:", num_samples)   

In [10]:
#Test - bez kompresije
filename = "test.tfrecord"
tfrecord_writer(filename, X, y)

Broj zapisanih uzoraka: 60000


In [11]:
#Test - sa kompresijom
filename = "zip_test.tfrecord"
tfrecord_writer(filename, X, y, use_compression=True)

Broj zapisanih uzoraka: 60000


## 2.5. Funkcija za parsiranje zapisanog TFRecorda

In [12]:
def example2sample(example):
    #Obavezno definiramo opis značajki u Exampleu, ovo vraća dict sa image, labelom
    feat_desc = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    #Za jedno opažanje
    sample = tf.io.parse_example(example, feat_desc)
    #Dekodiranje raw byte stringa u sliku koju možemo vizulizirati
    sample['image'] = tf.io.decode_raw(sample['image'], tf.uint8)
    return sample

In [13]:
#Testiranje funkcije
dataset = tf.data.TFRecordDataset("test.tfrecord", compression_type=None)
image = []
for example in dataset.map(example2sample).take(1): image.append(example)
print(image)
print("Da li parser vraća točnu kopiju slike:", (image[0]['image'].numpy().reshape((28, 28)) == X[0]).all())

[{'image': <tf.Tensor: shape=(784,), dtype=uint8, numpy=
array([  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   3,  18,  18,  18,
       126, 136, 175,  26, 166, 255, 247, 127,   0,   0,   0,   0,   0,
       

## 2.6 Funkcija za preprocesuiranje uzorka: tip podatka, normalizacija, reshape

In [14]:
def preprocess(sample):
    '''Radi predprocesuiranja, izlaz je tuple(image, sample)'''
    
    image = sample['image']
    label = sample['label']
    #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
    #U originalni oblik  - iz 1d u 3d tenzor
    image = tf.reshape(image, [28, 28, 1])
    
    return image, label

In [15]:
#Testiranje funkcije
dataset = tf.data.TFRecordDataset("test.tfrecord")
dataset = dataset.take(1)
dataset = dataset.map(example2sample)
dataset = dataset.map(preprocess)

image = []
for example in dataset: image.append(example)
print(image)

[(<tf.Tensor: shape=(28, 28, 1), dtype=float32, numpy=
array([[[0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ]],

       [[0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        ],
        [0.        

In [17]:
#Test rada funkcije
Iteracija = 1
num_iters = 20
for example in tf.data.TFRecordDataset("test.tfrecord").batch(32):
    print("Iteracija:", Iteracija)
    Iteracija += 1
    num_iters -= 1
    if num_iters == 0:
        break

Iteracija: 1
Iteracija: 2
Iteracija: 3
Iteracija: 4
Iteracija: 5
Iteracija: 6
Iteracija: 7
Iteracija: 8
Iteracija: 9
Iteracija: 10
Iteracija: 11
Iteracija: 12
Iteracija: 13
Iteracija: 14
Iteracija: 15
Iteracija: 16
Iteracija: 17
Iteracija: 18
Iteracija: 19
Iteracija: 20


## 2.6. Funkcija za testiranje ulaznog pipeline-a

In [18]:
from timeit import default_timer as timer

def pipeline_timer(dataset, num_iterations=20):
    '''Mjeri izvođenja definiranog broja iteracije iz 
    zadanog dataset-a'''
    
    print("Početak mjerenja")
    start_time = timer()
    process_times = []
    for example in dataset:
        end_time = timer()
        process_times.append(end_time - start_time)
        start_time = end_time
        num_iterations -= 1
        if num_iterations == 0:
            break
    print("Kraj mjerenja")
    print("Prosječno vrijeme po batchu:", f"{np.mean(process_times)} sekundi")
    return process_times

In [19]:
#Test funkcije za mjerenje vremena
dataset = tf.data.TFRecordDataset("test.tfrecord").batch(32)
pipeline_timer(dataset, num_iterations=100)

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.00037874199999350823 sekundi


[0.02839980000135256,
 0.00017490000027464703,
 9.439999848837033e-05,
 8.679999882588163e-05,
 9.689999933470972e-05,
 8.800000068731606e-05,
 8.610000077169389e-05,
 9.00999984878581e-05,
 8.750000051804818e-05,
 0.0002137000010407064,
 0.00011780000204453245,
 8.819999857223593e-05,
 8.570000136387534e-05,
 9.17999968805816e-05,
 8.08000004326459e-05,
 8.100000195554458e-05,
 8.030000026337802e-05,
 8.41999972180929e-05,
 8.070000330917537e-05,
 0.00017109999680542387,
 9.950000094249845e-05,
 8.22000001790002e-05,
 8.08000004326459e-05,
 8.08000004326459e-05,
 8.059999890974723e-05,
 8.069999967119657e-05,
 8.05000017862767e-05,
 8.099999831756577e-05,
 8.330000127898529e-05,
 0.00016870000035851263,
 9.679999857326038e-05,
 8.22000001790002e-05,
 8.100000195554458e-05,
 8.049999814829789e-05,
 8.069999967119657e-05,
 8.019999950192869e-05,
 8.040000102482736e-05,
 8.059999890974723e-05,
 8.100000195554458e-05,
 0.00023610000062035397,
 0.00010289999772794545,
 8.22000001790002e-05

In [20]:
#Boxplot za usporedbu vremena različitih pristupa
def box_graph(data_list, name_list):
    '''Iscrtava boxplot za podatke iz data_list-e,
    daje imena svakog grupi prema name_list-u'''
    fig, ax = plt.subplots()
    ax.set_title('Usporedba vremena za grupe u sekundama')
    bp = ax.boxplot(data_list)
    plt.xticks(list(range(1, len(name_list)+1)), name_list)
    plt.show()

# 3. Eksperimenti sa ulaznim pipelineom baziranim na TFRecordu

In [21]:
#Funkcija za definiranje testnog neoptimizranog pipeline-a
def base_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija)
    dataset = dataset.map(example2sample)
    dataset = dataset.map(preprocess)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.repeat()
    dataset = dataset.batch(64)
    dataset = dataset.prefetch(1)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        dataset = dataset.with_options(options)
        
    return dataset

## 3.1. Baseline: Pristup bez TFRecorda

Ovdje možemo samo utvrditi brzinu učenja, jer nema ulaznog pipeline-a.

In [22]:
#Manualna priprema podataka
train_images = X.reshape((60000, 28, 28, 1))
train_images = train_images.astype('float32') / 255
train_labels = y
#Učenje
model = build_compiled_model()
model.fit(train_images, train_labels, epochs=5, batch_size=64)

Train on 60000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b8cd8ffe88>

## 3.2. Učenje uz TFRecord, bez opcija, bez kompresije i bez optimiziranog pipeline-a

In [23]:
#Definiranje dataset-a
dataset = base_pipeline("test.tfrecord", dodaj_opcije=False, kompresija=None)

In [24]:
#Definiranje modela
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b8e61f5dc8>

Učenje je 25% brže uz korištenje `TFRecorda` i `tf.data.dataset` za ulaz podataka, u odnosu na naivni uvoz podataka.

In [25]:
#Brzine pipeline-a po grupi
pipe_1_times = pipeline_timer(dataset, num_iterations=100)
pipe_1_times[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.008181425999973726 sekundi


[0.1892182999981742,
 0.005423999999038642,
 0.005437400002847426,
 0.005472699998790631,
 0.0054841999990458135,
 0.0062620000026072375,
 0.005466799997520866,
 0.005378500001825159]

## 3.3. Učenje uz TFRecord sa opcijama, ali bez kompresije

In [26]:
#Definiranje dataset-a
dataset = base_pipeline("test.tfrecord", dodaj_opcije=True, kompresija=None)

In [27]:
#Model
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9a2539088>

Izgleda da odabrane opcije ne pomažu u brzini učenja!

In [28]:
#Brzina pipeline-a po grupi
pipe_2 = pipeline_timer(dataset, num_iterations=100)
pipe_2[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0035410099999717204 sekundi


[0.14835259999745176,
 0.001577200000610901,
 0.0013843999986420386,
 0.0015734000007796567,
 0.0014102999994065613,
 0.0012295000015001278,
 0.001069999998435378,
 0.0011790999997174367]

Ovo dolje ukazuje da opcije ipak pomažu i to da je pipeline 50% brži!!

## 3.4. Učenje uz TFRecord bez opcija, ali s kompresijom

In [29]:
dataset = base_pipeline("zip_test.tfrecord", dodaj_opcije=False, kompresija="GZIP")

In [30]:
#Model
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9ac1da408>

Kompresija ne utječe na brzinu učenja.

In [31]:
#Brzina pipeline-a po grupi
pipe_3 = pipeline_timer(dataset, num_iterations=100)
pipe_3[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.008491692000025069 sekundi


[0.1880715000006603,
 0.004874900001595961,
 0.004905499998130836,
 0.006049300001905067,
 0.00522859999909997,
 0.00633899999957066,
 0.009848900001088623,
 0.010501400000066496]

Kompresija usporava pipeline.

## 3.5. Učenje uz TFRecord s opcijama i sa kompresijom

In [32]:
dataset = base_pipeline("zip_test.tfrecord", dodaj_opcije=True, kompresija="GZIP")

In [33]:
#Model
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9aefd6788>

Čini se da kompresija i opcije ne utječu na brzinu učenja.

In [34]:
#Brzina pipeline-a po grupi
pipe_4 = pipeline_timer(dataset, num_iterations=100)
pipe_4[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0038078320000204256 sekundi


[0.14636270000119112,
 0.011732300001312979,
 0.00142029999915394,
 0.002683499998965999,
 0.0016727000001992565,
 0.0018038000016531441,
 0.0016276999995170627,
 0.002558899999712594]

Ovdje je jasno da kompresija nema utjecaj na pipeline.

## 3.6 Utjecaj resize slike u pipeline-u.

In [40]:
def preprocess_2(sample):
    '''Radi predprocesuiranja, izlaz je tuple(image, sample)'''
    
    image = sample['image']
    label = sample['label']
    
    #U originalni oblik  - iz 1d u 4d tenzor
    image = tf.reshape(image, [-1, 28, 28, 1])
   
    #Povećavanje slike
    image = tf.image.resize(image, (128, 128))
     #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
     
    return image, label

In [41]:
#Funkcija za definiranje resize pipeline-a
def resize_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija)
    dataset = dataset.map(example2sample)
    dataset = dataset.map(preprocess_2)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.repeat()
    dataset = dataset.batch(64)
    dataset = dataset.prefetch(1)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        dataset = dataset.with_options(options)
        
    return dataset

In [42]:
#Definiranje modela sa većim ulazom
def build_compiled_model_2():
    
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.compile(optimizer='rmsprop', 
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model

In [43]:
dataset = resize_pipeline("test.tfrecord", dodaj_opcije=False, kompresija=None)

In [44]:
#Model
model = build_compiled_model_2()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

ValueError: in converted code:

    C:\Users\Public\Anaconda3\envs\tester\lib\site-packages\tensorflow_core\python\keras\engine\training_v2.py:677 map_fn
        batch_size=None)
    C:\Users\Public\Anaconda3\envs\tester\lib\site-packages\tensorflow_core\python\keras\engine\training.py:2410 _standardize_tensors
        exception_prefix='input')
    C:\Users\Public\Anaconda3\envs\tester\lib\site-packages\tensorflow_core\python\keras\engine\training_utils.py:573 standardize_input_data
        'with shape ' + str(data_shape))

    ValueError: Error when checking input: expected conv2d_18_input to have 4 dimensions, but got array with shape (None, None, 128, 128, 1)


**Resize je jako skupa operacija u pipeline-u**. Ovaj korak treba napraviti prije nego podatci budu spremljeni u TFRecord formatu!!!

In [45]:
#Brzina pipeline-a po grupi
pipe_5 = pipeline_timer(dataset, num_iterations=100)
pipe_5[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.01528476600000431 sekundi


[0.3369682999982615,
 0.011436100001446903,
 0.011622699999861652,
 0.01291779999883147,
 0.011837699999887263,
 0.011073199999373173,
 0.010455800002091564,
 0.010772699999506585]

**300% sporije u odnosu na pipeline bez resize unutra !!!**

# 3.7. Promjena iz 2 funkcije za predprocesuiranje u jednu

In [46]:
def integrated_example2sample(example):
    #Obavezno definiramo opis značajki u Exampleu, ovo vraća dict sa image, labelom
    feat_desc = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    #Za jedno opažanje
    sample = tf.io.parse_example(example, feat_desc)
    #Dekodiranje raw byte stringa u sliku koju možemo vizulizirati
    sample['image'] = tf.io.decode_raw(sample['image'], tf.uint8)
    #Izvuci slike i oznake
    image = sample['image']
    label = sample['label']
    #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
    #U originalni oblik  - iz 1d u 3d tenzor
    image = tf.reshape(image, [28, 28, 1])
    
    return image, label


In [47]:
#Funkcija za definiranje tight pipeline-a
def tight_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija)
    dataset = dataset.map(integrated_example2sample)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.repeat()
    dataset = dataset.batch(64)
    dataset = dataset.prefetch(1)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        dataset = dataset.with_options(options)
        
    return dataset

In [48]:
dataset = tight_pipeline("test.tfrecord", dodaj_opcije=False, kompresija=None)

In [49]:
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b78477c8>

Izgleda da su 2 map-a bolja od jednog?

In [50]:
#Brzina pipeline-a po grupi
pipe_6 = pipeline_timer(dataset, num_iterations=100)
pipe_6[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.004710695000030682 sekundi


[0.13984010000058333,
 0.0034278999992238823,
 0.0033790000015869737,
 0.0037636999986716546,
 0.0034894000018539373,
 0.003515000000334112,
 0.0034278999992238823,
 0.0034081000012520235]

Zaista je sporije od najbrže pipline-a, ajmo pogledati što se događa kada uključimo opcije.

In [51]:
dataset = tight_pipeline("test.tfrecord", dodaj_opcije=True, kompresija=None)

In [52]:
model = build_compiled_model()
model.fit(dataset, steps_per_epoch=np.ceil(len(X) // 64), epochs=5)

Train for 937.0 steps
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b7e407c8>

Na prvu, izgleda da opcije ne mijenjaju ništa.

In [53]:
pipe_7 = pipeline_timer(dataset, num_iterations=100)
pipe_7[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.003657277999991493 sekundi


[0.12762399999701302,
 0.0009793000026547816,
 0.0010698000005504582,
 0.0012150999973528087,
 0.0012478999997256324,
 0.00093620000188821,
 0.001021500000206288,
 0.000970599998254329]

Ali ono što se vidi, je zapravo isto najveća moguća brzina ovog pipeline-a.

# 3.8. Optimizirani pipeline 1

In [54]:
#Funkcija za definiranje tight pipeline-a
def opt1_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija, 
                                      num_parallel_reads=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(64)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [55]:
#Dataset
dataset = opt1_pipeline("test.tfrecord")

In [56]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b24dde08>

In [58]:
pipe_8 = pipeline_timer(dataset, num_iterations=100)
pipe_8[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.008868260000017471 sekundi


[0.7264973000019381,
 0.002453699999023229,
 0.013757500000792788,
 0.00021289999858709052,
 0.00016200000027311035,
 0.0001512000017100945,
 0.00014589999773306772,
 0.0013335000003280584]

In [59]:
#verzija sa opcijama
dataset = opt1_pipeline("test.tfrecord", dodaj_opcije=True)

In [60]:
pipe_9 = pipeline_timer(dataset, num_iterations=100)
pipe_9[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.008844800000006217 sekundi


[0.7154526999984228,
 0.002690900000743568,
 0.0031541999996989034,
 0.00304600000163191,
 0.0025599000000511296,
 0.0027098999998997897,
 0.002806399999826681,
 0.0019380999983695801]

In [61]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b8adb148>

In [65]:
def opt1_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog optimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija, 
                                      num_parallel_reads=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [66]:
dataset = opt1_pipeline("test.tfrecord", dodaj_opcije=True)

In [68]:
pipe_10 = pipeline_timer(dataset, num_iterations=100)
pipe_10[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.009324223000003257 sekundi


[0.7173960000000079,
 0.0025854999985313043,
 0.0025744000013219193,
 0.0032050999980128836,
 0.016780300000391435,
 0.0026026000014098827,
 0.0035379000000830274,
 0.002861000000848435]

In [69]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b90a5e48>

In [70]:
def opt1_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.TFRecordDataset(tf_rec_data, compression_type=kompresija, 
                                      num_parallel_reads=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.prefetch(buffer_size=1)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [71]:
dataset = opt1_pipeline("test.tfrecord", dodaj_opcije=True)

In [72]:
pipe_11 = pipeline_timer(dataset, num_iterations=100)
pipe_11[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.003925657999971009 sekundi


[0.2258905999988201,
 0.0028138999987277202,
 0.0029862000010325573,
 0.0025051000011444557,
 0.002902599997469224,
 0.0028352000008453615,
 0.002247299998998642,
 0.002609800001664553]

## 3.9. Optimizirani pipeline II

In [73]:
def example2sample(example):
    #Obavezno definiramo opis značajki u Exampleu, ovo vraća dict sa image, labelom
    feat_desc = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    #Za jedno opažanje
    sample = tf.io.parse_example(example, feat_desc)
    #Dekodiranje raw byte stringa u sliku koju možemo vizulizirati
    sample['image'] = tf.io.decode_raw(sample['image'], tf.uint8)
    return sample

In [74]:
def batch_preprocess(sample):
    '''Radi predprocesuiranja, izlaz je tuple(image, sample)'''
    
    image = sample['image']
    label = sample['label']
    #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
    #U originalni oblik  - iz 1d u 4d tenzor, batch
    image = tf.reshape(image, [64, 28, 28, 1])
    
    return image, label

In [75]:
def opt2_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog optimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.Dataset.list_files(tf_rec_data)
    dataset = dataset.interleave(tf.data.TFRecordDataset,
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #moramo baciti remainder van, jer se inače javlja greška
    #u dimenzijama kod reshape funkcije u preproces-u
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(batch_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    #dataset = dataset.cache()
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [76]:
dataset = opt2_pipeline("test.tfrecord", dodaj_opcije=True)

In [82]:
pipe_12 = pipeline_timer(dataset, num_iterations=100)
pipe_12[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.002609623999996984 sekundi


[0.20413430000189692,
 0.00046949999887146987,
 0.0008226999998441897,
 0.0004721000004792586,
 0.001025199999276083,
 0.0004036999998788815,
 0.0007413000021188054,
 0.0009603999969840515]

Top speed set-up, kreiranje podataka u odnosu na inicijalni pipeline (0.008 s/batchu) je poboljšano za 88% !!

In [77]:
model=build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc0f0ec788>

Moguće je i zadržati zadnji batch, ako definiramo slobodnu dimenziju batch-a. Ovo ću ja morati primijeniti jer ne znam unaprijed koja će biti veličina batcha

In [83]:
def batch_preprocess_optional_size(sample):
    '''Radi predprocesuiranja, izlaz je tuple(image, sample)'''
    
    image = sample['image']
    label = sample['label']
    #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
    #U originalni oblik  - iz 1d u 4d tenzor, batch (-1), varijabilna veličina batch-a
    image = tf.reshape(image, [-1, 28, 28, 1])
    
    return image, label

In [84]:
def opt2_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.Dataset.list_files(tf_rec_data)
    dataset = dataset.interleave(tf.data.TFRecordDataset,
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #Možemo ali i ne moramo izbacit remainder, jer preprocess funkcija može uzeti u obzir batch varijabilne veličine
    dataset = dataset.batch(64)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(batch_preprocess_optional_size, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    #dataset = dataset.cache()
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [87]:
dataset = opt2_pipeline("test.tfrecord", dodaj_opcije=True)

In [88]:
pipe_13 = pipeline_timer(dataset, num_iterations=100)
pipe_13[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.002892991000007896 sekundi


[0.23144670000328915,
 0.0004257000000507105,
 0.0006784999968658667,
 0.001040600000123959,
 0.0008645000016258564,
 0.0003515999997034669,
 0.0013140000010025688,
 0.0007573999973828904]

In [89]:
model=build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc2ade5488>

Kako dodati opciju kompresije u ovaj pipeline?

In [94]:
def opt3_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.Dataset.list_files(tf_rec_data)
    dataset = dataset.interleave(lambda x: tf.data.TFRecordDataset(x, compression_type=kompresija),
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #Možemo ali i ne moramo izbacit remainder, jer preprocess funkcija može uzeti u obzir batch varijabilne veličine
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.map(example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(batch_preprocess_optional_size, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [95]:
dataset = opt3_pipeline("zip_test.tfrecord", dodaj_opcije=True, kompresija="GZIP")

In [96]:
pipe_14 = pipeline_timer(dataset, num_iterations=100)
pipe_14[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.002823346000004676 sekundi


[0.20514230000117095,
 0.00048690000039641745,
 0.0005130999998073094,
 0.0023175999995146412,
 0.0008621999986644369,
 0.0007829000023775734,
 0.0008459999990009237,
 0.0031009999984235037]

In [100]:
model=build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc47761448>

Samo provjera funkcije bez kompresije

In [97]:
dataset = opt3_pipeline("test.tfrecord", dodaj_opcije=True)

In [106]:
pipe_15 = pipeline_timer(dataset, num_iterations=100)
pipe_15[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0009626600000046892 sekundi


[0.031069000000570668,
 0.0004226000019116327,
 0.0003259999975853134,
 0.0010013000028266106,
 0.0005318999974406324,
 0.0007809000017005019,
 0.0004307999988668598,
 0.0009099000017158687]

Kompresije usporava pipeline za 50% !!

In [106]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc47aecbc8>

Ali to se ne odražava na vremenu učenja, vjerojatno zbog brzine GPU-a

In [107]:
dataset = opt3_pipeline("test.tfrecord", dodaj_opcije=False)

In [108]:
pipe_16 = pipeline_timer(dataset, num_iterations=100)
pipe_16[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0008493359999920358 sekundi


[0.03104459999667597,
 0.0003826000029221177,
 0.00046189999920898117,
 0.0007879000004322734,
 0.0006795000008423813,
 0.0006749999993189704,
 0.0006162999998196028,
 0.00031939999826136045]

Ovdje se pokazalo čak da opcije nemaju nikakav efekt!!

In [109]:
model = build_compiled_model()

In [110]:
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1b9b8a8bf48>

Testiranje brzine pipeline-a ako postoji samo jedna funkcija za map, tj. ako se parser i preprocess funkcije objedine

In [111]:
def integrated_example2sample(example):
    #Obavezno definiramo opis značajki u Exampleu, ovo vraća dict sa image, labelom
    feat_desc = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    #Za jedno opažanje
    sample = tf.io.parse_example(example, feat_desc)
    #Dekodiranje raw byte stringa u sliku koju možemo vizulizirati
    sample['image'] = tf.io.decode_raw(sample['image'], tf.uint8)
    #Izvuci slike i oznake
    image = sample['image']
    label = sample['label']
    #U float
    image = tf.cast(image, tf.float32)
    #normalizacija
    image = image / 255.
    #U originalni oblik  - iz 1d u 4d tenzor
    image = tf.reshape(image, [-1, 28, 28, 1])
    
    return image, label

In [112]:
def opt4_pipeline(tf_rec_data, dodaj_opcije=False, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.Dataset.list_files(tf_rec_data)
    dataset = dataset.interleave(lambda x: tf.data.TFRecordDataset(x, compression_type=kompresija),
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #Možemo ali i ne moramo izbacit remainder, jer preprocess funkcija može uzeti u obzir batch varijabilne veličine
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.map(integrated_example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    if dodaj_opcije:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        
        dataset = dataset.with_options(options)
        
    return dataset

In [113]:
dataset = opt4_pipeline("test.tfrecord", dodaj_opcije=True)

In [114]:
pipe_17 = pipeline_timer(dataset, num_iterations=100)
pipe_17[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0007445459999871673 sekundi


[0.026995399999577785,
 0.00032310000096913427,
 0.0003622999975050334,
 0.0006329000025289133,
 0.00075970000034431,
 0.0004692999973485712,
 0.00036779999936698005,
 0.0003829000015684869]

In [136]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc4885be88>

Zaključak je da parser i preprocess funkcija mogu biti spojene bez efekta na učinkovitost pipeline-a, čak i bez opcija!!

Idemo vidjeti što se događa ako u potpunosti izbacimo opcije.

In [319]:
tf.config.optimizer.set_jit(True)

In [320]:
def opt5_pipeline(tf_rec_data, kompresija=None):
    '''Funkcija za generiranje ulaznog neoptimiziranog pipeline-a,
    iz tfrecorda, sa ili bez opcija'''
    dataset = tf.data.Dataset.list_files(tf_rec_data)
    dataset = dataset.interleave(lambda x: tf.data.TFRecordDataset(x, compression_type=kompresija),
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #Možemo ali i ne moramo izbacit remainder, jer preprocess funkcija može uzeti u obzir batch varijabilne veličine
    dataset = dataset.batch(64, drop_remainder=True)
    dataset = dataset.map(integrated_example2sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
            
    return dataset

In [321]:
dataset = opt5_pipeline("test.tfrecord")

In [324]:
pipe_18 = pipeline_timer(dataset, num_iterations=100)
pipe_18[:8]

Početak mjerenja
Kraj mjerenja
Prosječno vrijeme po batchu: 0.0008257979998597875 sekundi


[0.028442699986044317,
 0.0005489999894052744,
 0.0004710000357590616,
 0.00092299998505041,
 0.00042269995901733637,
 0.0003964000497944653,
 0.0005043999990448356,
 0.0004267999902367592]

In [325]:
model = build_compiled_model()
model.fit(dataset, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x2cc49edde88>

Opcije ne utječu na brzinu učenja

Provjera da li pipeline vraća slike koje očekujemo.

# 4. Pipeline kojeg ću koristiti

Definirana funkcija za izgradnju optimiziranog pipeline-a. Bitna je definirati vektoriziranu funkciju (funkciju koja prima kao ulaz grupu uzoraka) za parsiranje tfrecord-a.

In [218]:
def build_pipeline(file_pattern, batch_size, parser_fun, 
                   add_options=False, compression=None):
    
    '''Funkcija za generiranje optimiziranog pipeline-a iz direktorija koji sadrži tfrecord-e
    
    Argumenti
    ---------
    file_pattern: str
    glob obrazac za matchiranje/prepoznavanje datoteka, npr. "*.tfrecord", moguće je zadati i 
    punu putanju do direktorija
    
    batch_size: int
    Veličina mini grupe za učenje modela
    
    parser_fun: fun
    Vektorizirana funkcija za parsiranje tfrecord datoteke u format pogodane za daljnju obradu
    primjenom modela
    
    dodaj_opcije: bool
    Dodavanje statičkih optimizacija pipeline-a, zadana vrijednost je False
    
    kompresija: str
    Da li su podatci komprimirani, npr. u GZIP formatu, zadana vrijednost je None
    
    Povratna vrijednost
    -------------------
    dataset: tf.data.Dataset
    Instanca tf.data.Dataset klase pogodna za obradu modelom
    
    '''
    
    dataset = tf.data.Dataset.list_files(file_pattern)
    dataset = dataset.interleave(lambda x: tf.data.TFRecordDataset(x, compression_type=compression),
                                 cycle_length=tf.data.experimental.AUTOTUNE,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000)
    #U slučaju da je zadnji batch manje veličine u odnosu na puni batch, izbaci ga
    dataset = dataset.batch(batch_size, drop_remainder=True)
    dataset = dataset.map(parser_fun, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    #Statičke optmiziacije
    if add_options:
        options = tf.data.Options()
        options.experimental_optimization.map_and_batch_fusion = True
        options.experimental_optimization.map_fusion = True
        options.experimental_optimization.map_parallelization = True
        dataset = dataset.with_options(options)
        
    return dataset

Primjer parser funkcije

In [219]:
def example2sample(example):
    #Obavezno definiramo opis značajki pohranjenih u Example-u, ovo vraća dict sa ključevima: image, label
    feat_desc = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    #Za parsiranje više Example-ova
    sample = tf.io.parse_example(example, feat_desc)
    #Dekodiranje raw byte stringa u uint8, ovo sada možemo i vizualizirati
    sample['image'] = tf.io.decode_raw(sample['image'], tf.uint8)
    #Izvuci slike i oznake
    image = sample['image']
    label = sample['label']
    #Konverzija u float, inače nije moguća primjena gradijentne metode
    image = tf.cast(image, tf.float32)
    #Normalizacija u raspon [0, 1]
    image = image / 255.
    #Preoblikovanje u grupe slika u originalne dimenizije iz 1d tenzora u 4d tenzor,
    #uz proizvoljan broj slika u mini grupi (vektorizacija) 
    image = tf.reshape(image, [-1, 28, 28, 1])
    
    return image, label