Deep Learning system are often trained on **very large datasets.**

Tensorflow make easy:
* Data API
* TF Transform
* TFRecord
* TF Datasets

# TF Dataset

In [73]:
for m in dir(tf.data.Dataset):
    if not (m.startswith("_") or m.endswith("_")):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, "__doc__"):
            print("● {:21s}{}".format(m + "()", func.__doc__.split("\n")[0]))

● apply()              Applies a transformation function to this dataset.
● as_numpy_iterator()  Returns an iterator which converts all elements of the dataset to numpy.
● batch()              Combines consecutive elements of this dataset into batches.
● cache()              Caches the elements in this dataset.
● cardinality()        Returns the cardinality of the dataset, if known.
● concatenate()        Creates a `Dataset` by concatenating the given dataset with this dataset.
● element_spec()       The type specification of an element of this dataset.
● enumerate()          Enumerates the elements of this dataset.
● filter()             Filters this dataset according to `predicate`.
● flat_map()           Maps `map_func` across this dataset and flattens the result.
● from_generator()     Creates a `Dataset` whose elements are generated by `generator`. (deprecated arguments)
● from_tensor_slices() Creates a `Dataset` whose elements are slices of the given tensors.
● from_tensors()    

In [2]:
import tensorflow as tf
from tensorflow import keras

In [3]:
sample_data = [11, 22, 33, 44, 55, 66, 77, 88]
dataset = tf.data.Dataset.from_tensor_slices(sample_data) # convert data into tf data
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

In [4]:
for item in dataset:
    # print(item)
    print(item.numpy())

11
22
33
44
55
66
77
88


## Filter data

In [6]:
# Filter data
dataset = dataset.filter(lambda sample:sample>33)
for item in dataset:
    print(item)

tf.Tensor(44, shape=(), dtype=int32)
tf.Tensor(55, shape=(), dtype=int32)
tf.Tensor(66, shape=(), dtype=int32)
tf.Tensor(77, shape=(), dtype=int32)
tf.Tensor(88, shape=(), dtype=int32)


## Data transformations
NOTE: these functions are NOT in-place, hence require re-assignment

In [7]:
# dataset = dataset.repeat(3) # duplicate data
# dataset = dataset.batch(7) # group data. drop_remainder=True: drop the last batch
dataset = dataset.repeat(3).batch(7) # do above 2 transformations at once
dataset

<BatchDataset shapes: (None,), types: tf.int32>

In [8]:
for item in dataset:
    print(item)

tf.Tensor([44 55 66 77 88 44 55], shape=(7,), dtype=int32)
tf.Tensor([66 77 88 44 55 66 77], shape=(7,), dtype=int32)
tf.Tensor([88], shape=(1,), dtype=int32)


## Custom transformation on EACH SAMPLE

In [9]:
# Custom transformation on Each Sample
dataset = dataset.unbatch() # ungroup data
dataset

<_UnbatchDataset shapes: (), types: tf.int32>

### Sysntax 1 (only for simple transform)
Note: num_paralled_calls = #threads

In [10]:
# dataset = dataset.map(lambda sample: sample*10 if sample<60 else sample, num_parallel_calls=4)

### Sysntax 2 (CLEAREST & most flexible)

In [11]:
def my_func_sample(x):
    if x < 60:
        x = x * 10
    return x
dataset = dataset.map(lambda sample: my_func_sample(sample), num_parallel_calls=4)
 

### Sysntax 3 (short form of sysntax 2)

In [13]:
dataset = dataset.map(my_func_sample, num_parallel_calls=4) 
#drawback: only pass 1 parameter (sample), do not pass another parameter to the my_func_sample()

In [14]:
for item in dataset:
    print(item)

tf.Tensor(440, shape=(), dtype=int32)
tf.Tensor(550, shape=(), dtype=int32)
tf.Tensor(66, shape=(), dtype=int32)
tf.Tensor(77, shape=(), dtype=int32)
tf.Tensor(88, shape=(), dtype=int32)
tf.Tensor(440, shape=(), dtype=int32)
tf.Tensor(550, shape=(), dtype=int32)
tf.Tensor(66, shape=(), dtype=int32)
tf.Tensor(77, shape=(), dtype=int32)
tf.Tensor(88, shape=(), dtype=int32)
tf.Tensor(440, shape=(), dtype=int32)
tf.Tensor(550, shape=(), dtype=int32)
tf.Tensor(66, shape=(), dtype=int32)
tf.Tensor(77, shape=(), dtype=int32)
tf.Tensor(88, shape=(), dtype=int32)


## Custom transformation on WHOLE DATASET

### Sysntax 1 (simple transform)

In [15]:
max_val = 100
# dataset = dataset.apply(lambda datset: datset.filter())

### Sysntax 2 (CLEAREST)

In [16]:
def my_func(ds):
    ds = ds.filter(lambda sample: sample < max_val)
    new_ds = ds.map(lambda sample: sample / 10)
    return new_ds

dataset = dataset.apply(lambda datset: my_func(datset))

### Sysntax 3 (short form of syntax 2)

In [17]:
# dataset = datset.apply(my_func)

In [18]:
for item in dataset:
    print(item)

tf.Tensor(6.6, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)
tf.Tensor(6.6, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)
tf.Tensor(6.6, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)


## Randomly shuffle the dataset

NOTE: shuffle dataset helps to make sure your training data are iid 

      (REQUIRED in training using gradient descent)

### 1. With small dataset
INFO: shuffle() method works by getting N items of the dataset (N='buffer_size') into RAM each time. Then it randomly draws samples from this buffer and replaces the drawn ones with new samples from the dataset.

NOTE: MUST set 'buffer_size' so that the buffet doesn't exceed RAM capacity.

In [19]:
dataset = dataset.shuffle(buffer_size=3, seed=42)
for item in dataset:
    print(item)

tf.Tensor(6.6, shape=(), dtype=float64)
tf.Tensor(6.6, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)
tf.Tensor(8.8, shape=(), dtype=float64)
tf.Tensor(7.7, shape=(), dtype=float64)
tf.Tensor(6.6, shape=(), dtype=float64)


### 2. With LARGE dataset
NOTE: shuffle() alone CAN'T shuffle well large dataset since the buffer size is relatively small compared to the dataset's size.
#### Solution:
* 1. Split dataset into muptiple files.
* 2. Read these files at once to draw random samples from ALL parts of the dataset.

In [27]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
import numpy as np
import os

Demo on California housing dataset
INFO: 20640 samples, 8 features, label values: 0.15 - 5.

https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_california_housing.html 

In [23]:
housing = fetch_california_housing()
print("Cali housing dataset size:",housing.data.shape)
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

Cali housing dataset size: (20640, 8)


In [28]:
# STEP 1: Save data to multiple files 
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)
train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)
print('\nDone writing files. Training file paths:',train_filepaths)


Done writing files. Training file paths: ['datasets\\housing\\my_train_00.csv', 'datasets\\housing\\my_train_01.csv', 'datasets\\housing\\my_train_02.csv', 'datasets\\housing\\my_train_03.csv', 'datasets\\housing\\my_train_04.csv', 'datasets\\housing\\my_train_05.csv', 'datasets\\housing\\my_train_06.csv', 'datasets\\housing\\my_train_07.csv', 'datasets\\housing\\my_train_08.csv', 'datasets\\housing\\my_train_09.csv', 'datasets\\housing\\my_train_10.csv', 'datasets\\housing\\my_train_11.csv', 'datasets\\housing\\my_train_12.csv', 'datasets\\housing\\my_train_13.csv', 'datasets\\housing\\my_train_14.csv', 'datasets\\housing\\my_train_15.csv', 'datasets\\housing\\my_train_16.csv', 'datasets\\housing\\my_train_17.csv', 'datasets\\housing\\my_train_18.csv', 'datasets\\housing\\my_train_19.csv']


In [29]:
# STEP 2: Read created files at once 
# Read some first samples of a file (just to see)
import pandas as pd
print('\nSome first samples:')
print(pd.read_csv(train_filepaths[0]).head())

# Create a dataset containing file paths in RANDOM ORDER, using list_files() 
print('\nFile paths (in RANDOM ORDER):')
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, shuffle=True, seed=42)
for filepath in filepath_dataset:
    print(filepath)


Some first samples:
   MedInc  HouseAge  AveRooms  AveBedrms  Population  AveOccup  Latitude  \
0  3.5214      15.0  3.049945   1.106548      1447.0  1.605993     37.63   
1  5.3275       5.0  6.490060   0.991054      3464.0  3.443340     33.69   
2  3.1000      29.0  7.542373   1.591525      1328.0  2.250847     38.44   
3  7.1736      12.0  6.289003   0.997442      1054.0  2.695652     33.55   
4  2.0549      13.0  5.312457   1.085092      3297.0  2.244384     33.93   

   Longitude  MedianHouseValue  
0    -122.43             1.442  
1    -117.39             1.687  
2    -122.98             1.621  
3    -117.70             2.621  
4    -116.93             0.956  

File paths (in RANDOM ORDER):
tf.Tensor(b'datasets\\housing\\my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_17.csv', shape=(), dtype=string)
t

In [30]:
# Create a dataset containing data from MULTIPLE FILES, using interleave()
# INFO: interleave() creates a dataset from N (N=cycle_length) RANDOM files (with names in 'filepath_dataset'). 
#       When you get data from this dataset, you will get first rows of these N random files (one row/file each time).
#       When you get all rows from these N files, the other N files (from 'filepath_dataset') will be generated.
# NOTE: files (with names in 'filepath_dataset') SHOULD have the identical length,
#       otherwise the ends of the longer files won't be gotten.      
N_files_1_read = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1), # skip(1): the header row
    cycle_length=N_files_1_read)
print('\nSome first samples of the SHUFFLED dataset:')
for item in dataset.take(7):
    #print(item)
    print(item.numpy())


Some first samples of the SHUFFLED dataset:
b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418'
b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0'
b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67'
b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'
b'3.6548,29.0,4.6434540389972145,0.9916434540389972,2919.0,4.0654596100278555,34.3,-118.42,1.803'
b'3.9543,35.0,5.134122287968442,0.9506903353057199,1305.0,2.57396449704142,33.94,-118.0,2.144'


## Converts CSV records to tensors using decode_csv()
INFO: 
+ decode_csv() converts CSV records to tf tensors.
+ A CSV record is a string with commas and NO space.

In [31]:
#sample_record = '11,22,33,44,55' # with NO missing values
sample_record = ',22,,,55' # with MISSING values

In [32]:
# NOTE on creating DEFAULT VALUES:
#   + No. of default values MUST match exactly no. of fields in the records.
#   + Empty default values (eg. tf.constant([])) mean the fields are REQUIRED (no missing allowed).
default_values=[0.101, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])] 
#default_values=[0.101, np.nan, "Hello", tf.constant([])] # NOT enough values => ERROR
processed_record = tf.io.decode_csv(sample_record, default_values)
processed_record

[<tf.Tensor: shape=(), dtype=float32, numpy=0.101>,
 <tf.Tensor: shape=(), dtype=float32, numpy=22.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=55.0>]

## Preprocessing data

### Processing 1 record
eager mode  vs  graph mode

In [33]:
@tf.function # Convert below function to tf function (FASTER than regular python function)
def preprocess(line, no_of_features=1):
    default_val = [0.]*no_of_features + [tf.constant([], dtype=tf.float32)] # last field is the label (CAN'T be missing)
    fields = tf.io.decode_csv(line, record_defaults=default_val)
    x = tf.stack(fields[:-1]) # tf.stack(): merges elements into 1 array
    y = tf.stack(fields[-1:])
    # Do other preprocessing here...
    return x, y
no_of_features = X_train.shape[-1]
sample_record = b'4.6477,38,,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504' # b: string of byte literals
(x,y) = preprocess(sample_record, no_of_features)
print('\nSample record:\nx =',x,'\ny =',y)


Sample record:
x = tf.Tensor(
[   4.6477      38.           0.           0.9118644  745.
    2.5254238   32.64      -117.07     ], shape=(8,), dtype=float32) 
y = tf.Tensor([1.504], shape=(1,), dtype=float32)


## Preprocessing a dataset

In [34]:
def csv_reader_dataset(filepaths, no_of_features, line_skip=1, no_files_1_read=5,
                       num_threads=1, shuffle_buffer_size=10000, batch_size=32):
    # Create a dataset of file paths:
    dataset = tf.data.Dataset.list_files(filepaths)
    # Create a dataset of shuffled samples from files (in the 'filepaths'):
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(line_skip),
        cycle_length=no_files_1_read, num_parallel_calls=num_threads)
    # Cache the data in RAM for speed:
    dataset = dataset.cache() # NOTE: dataset size MUST <= RAM capacity
    # Shuffle records 1 more time:
    dataset = dataset.shuffle(shuffle_buffer_size)
    # Preprocess records:
    dataset = dataset.map(lambda line: preprocess(line,no_of_features), num_parallel_calls=num_threads)
    # Group records into batches:
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(1) # NOTE: prefetch(): can increase running speed significantly, by preparing the data AHEAD of calling.
dataset = csv_reader_dataset(train_filepaths, no_of_features, batch_size=2)
print('\n\nProcessed training data:')
for record in dataset.take(2):
    #print('\n',record)    
    print('\n',np.round(record[0][0],2))



Processed training data:

 [ 2.0800e+00  3.6000e+01  4.5200e+00  1.0500e+00  1.4690e+03  4.9000e+00
  3.3920e+01 -1.1819e+02]

 [   3.14   33.      5.04    1.02  814.      2.48   37.95 -122.04]


## Training model

In [72]:
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

(X_train_full, y_train_full), (X_test, y_test) = tf.keras.datasets.boston_housing.load_data()
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

n_inputs = X_train.shape[-1]

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Dense(10, activation='elu', kernel_initializer="he_uniform", input_shape=X_train.shape[1:]))
model.add(tf.keras.layers.Dense(1))

opt = tf.keras.optimizers.Nadam(lr=0.01, beta_1=0.9, beta_2=0.999)
model.compile(loss='mean_absolute_error', optimizer=opt)

batch_size = 32

history = model.fit(X_train, y_train, \
         epochs=10, steps_per_epoch=len(X_train) // batch_size, \
         validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Object `model.fit` not found.


## Evaluate the housing model

In [60]:
# model = keras.models.load_model(r'models/housing_best.h5')
model.evaluate(test_set)
new_data = test_set.take(1)
# NOTE: new_data is in fact a data-random-sampling object, 
#       hence each time we "touch" it, it generate new samples.
#       => For result comparison, we need to store its generated data to a list.
new_x_fixed = []
new_y_fixed = []
for (x,y) in new_data:
    new_x_fixed.append(x)
    new_y_fixed.append(y)
print('\nPredictions:\n',model.predict(new_x_fixed)[:5])
print('\nTrue labels:')
for y in new_y_fixed:
    print(y.numpy()[:5])

ValueError: in user code:

    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\training.py:1233 test_function  *
        return step_function(self, iterator)
    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\training.py:1224 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    c:\program files\python37\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    c:\program files\python37\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    c:\program files\python37\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:3417 _call_for_each_replica
        return fn(*args, **kwargs)
    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\training.py:1217 run_step  **
        outputs = model.test_step(data)
    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\training.py:1183 test_step
        y_pred = self(x, training=False)
    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\base_layer.py:998 __call__
        input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
    c:\program files\python37\lib\site-packages\tensorflow\python\keras\engine\input_spec.py:207 assert_input_compatibility
        ' input tensors. Inputs received: ' + str(inputs))

    ValueError: Layer sequential_20 expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 13) dtype=float32>, <tf.Tensor 'ExpandDims:0' shape=(None, 1) dtype=float32>]


## WRITE YOUR OWN TRAINING LOOP FUNCTION
Original code: cell 40 in '13_loading_and_preprocessing_data.ipynb'
NOTE: requires knowledge custom model

In [76]:
@tf.function
def train(model, n_epochs, train_filepaths, no_of_features, optimizer, loss_fn, 
          batch_size=32, no_files_1_read=5, num_threads=1, shuffle_buffer_size=10000):
    train_set = csv_reader_dataset(train_filepaths, no_of_features, no_files_1_read=no_files_1_read,
                                   num_threads=num_threads, shuffle_buffer_size=shuffle_buffer_size, batch_size=batch_size)
    for epoch in tf.range(n_epochs):
        iter = 0
        for X_batch, y_batch in train_set:
            iter += 1
            with tf.GradientTape() as tape:
                y_pred = model(X_batch)
                main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                loss = tf.add_n([main_loss] + model.losses)
                tf.print("\rEpoch ", epoch+1, "/", n_epochs,', iter ',iter,': loss = ',loss,sep='')
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            n_epochs=2
no_of_features = X_train.shape[-1] # = 8
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
train(model, n_epochs, train_filepaths, no_of_features, optimizer, loss_fn)

TypeError: in user code:

    C:\Users\84766\AppData\Local\Temp/ipykernel_3252/2213743708.py:7 train  *
        train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,

    TypeError: tf__csv_reader_dataset() got an unexpected keyword argument 'repeat'
