**Chapter 13 – Loading and Preprocessing Data with TensorFlow**

_This notebook contains all the sample code and solutions to the exercises in chapter 13._

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/ageron/handson-ml2/blob/master/13_loading_and_preprocessing_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
  </td>
  <td>
    <a target="_blank" href="https://kaggle.com/kernels/welcome?src=https://github.com/ageron/handson-ml2/blob/master/13_loading_and_preprocessing_data.ipynb"><img src="https://kaggle.com/static/images/open-in-kaggle.svg" /></a>
  </td>
</table>

# Setup

First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0.

In [1]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Is this notebook running on Colab or Kaggle?
IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

# if IS_COLAB or IS_KAGGLE:
#     %pip install -q -U tfx
#     print("You can safely ignore the package incompatibility errors.")

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Datasets

## Basics

In [2]:
n_objects, n_features = int(1E3), 10
X = tf.random.normal(shape=(n_objects, n_features), stddev = 5)
y = tf.random.normal(shape=(n_objects, ), mean = -10, stddev = 10)

dataset = tf.data.Dataset.from_tensor_slices((X, y))

tf.nn.moments(X, axes = [0,1])

(<tf.Tensor: shape=(), dtype=float32, numpy=0.102768265>,
 <tf.Tensor: shape=(), dtype=float32, numpy=25.171745>)

In [3]:


for i, (x_s, y_s) in enumerate(dataset.take(5), start = 1):
    print(f"""
    X[{i}] = {x_s}
    y[{i}] = {y_s}
    {'-'*54}
    """)


    X[1] = [ -2.3791492   3.6895828  -5.841405  -11.443262    4.8080134   6.582841
  -6.316092    8.652361    1.5965953  -4.547207 ]
    y[1] = -22.000883102416992
    ------------------------------------------------------
    

    X[2] = [ 6.1332855  6.7402654  5.127617   1.4579009  3.9343169 -2.7634237
 -3.2014904 -3.8324695  7.109708  -5.5299864]
    y[2] = -5.111372947692871
    ------------------------------------------------------
    

    X[3] = [ -1.7277592    7.442224     1.5269917   -0.4040202    0.01922429
  -1.2068157   -1.3811892    1.7028779    2.743732   -12.549468  ]
    y[3] = -16.89586639404297
    ------------------------------------------------------
    

    X[4] = [-0.7257024  -5.289981    3.660913    0.6807263   4.6829324   2.269279
 -2.257893   10.022546    0.7001651  -0.35701635]
    y[4] = -21.507286071777344
    ------------------------------------------------------
    

    X[5] = [ 7.624774  -2.2251675  1.6859418  1.7380327  3.3757062  5.754802
  5.075

In [4]:
len(dataset.batch(7)) # 1000 // 7 = 142

143

In [5]:
print(*dataset.batch(27).unbatch().take(1))

(<tf.Tensor: shape=(10,), dtype=float32, numpy=
array([ -2.3791492,   3.6895828,  -5.841405 , -11.443262 ,   4.8080134,
         6.582841 ,  -6.316092 ,   8.652361 ,   1.5965953,  -4.547207 ],
      dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-22.000883>)


In [6]:
len(dataset.repeat(3)) # 3*1000

3000

In [7]:
dataset = dataset.shuffle(buffer_size=10, seed = 54) # buffer_size = 10

In [8]:
mapped_dataset = dataset.map(lambda x, y: (0.5 * (x[:5] + x[5:])**2, y))
print(*mapped_dataset.take(5), sep=f"\n{'-'*54}\n")

(<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([20.351957 ,  6.025712 ,  1.0152572,  4.0992417, 29.434898 ],
      dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-10.023413>)
------------------------------------------------------
(<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([15.544623,  7.786911, 37.28845 , 49.805374,  9.604953],
      dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-13.613569>)
------------------------------------------------------
(<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([8.8355112e+00, 3.4492753e+00, 3.9507368e+00, 4.8478420e+01,
       3.4010030e-02], dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-22.000883>)
------------------------------------------------------
(<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([47.950874, 84.39883 , 13.6026  ,  5.155099, 21.067839],
      dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-19.719894>)
-----------------------------------------------------

In [9]:
print(*dataset.filter(lambda x, y: tf.norm(x) < 10).take(1))

(<tf.Tensor: shape=(10,), dtype=float32, numpy=
array([-0.38563824,  3.7454274 , -1.625139  ,  3.3938072 ,  0.10597518,
        3.705729  ,  2.893445  , -3.7528381 , -1.2541567 ,  0.77914983],
      dtype=float32)>, <tf.Tensor: shape=(), dtype=float32, numpy=-10.03923>)


In [10]:
batched_ds = dataset.batch(54)
for batch in batched_ds.take(5):
    for i in batch:
        print(f"I = {i.shape}")

I = (54, 10)
I = (54,)
I = (54, 10)
I = (54,)
I = (54, 10)
I = (54,)
I = (54, 10)
I = (54,)
I = (54, 10)
I = (54,)


In [20]:
len(batched_ds.repeat(1000000))

19000000

## Split the California dataset to multiple CSV files

## Loading data to multiple csv

In [11]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import pandas as pd

In [176]:
housing = fetch_california_housing()

X, y = dict(), dict()

X['train'], X['test'], y['train'], y['test'] = train_test_split(
    housing['data'], housing['target'].reshape(-1,1),
    random_state = 54
)

X['train'], X['val'], y['train'], y['val'] = train_test_split(
    X['train'], y['train'],
    random_state = 54
)

split = 'train'
scaler = StandardScaler()
scaler.fit(X[split])

X_mean_var = {split: tf.nn.moments(X[split], axes = [0]) for split in X.keys()}

for split in X.keys():
    X_mean_var[split] = (tf.cast(X_mean_var[split][0], tf.float32),
                         tf.cast(X_mean_var[split][1], tf.float32))

In [177]:
X_mean_var['test']

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 3.8953583e+00,  2.8606588e+01,  5.4673748e+00,  1.1041431e+00,
         1.4298213e+03,  2.9509528e+00,  3.5653744e+01, -1.1958934e+02],
       dtype=float32)>, <tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([3.6592014e+00, 1.5707469e+02, 7.8098898e+00, 3.6089209e-01,
        1.2627025e+06, 1.2997230e+00, 4.5875440e+00, 4.0762849e+00],
       dtype=float32)>)

In [108]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

In [15]:
data = {split : np.c_[X[split], y[split]] for split in ('train', 'val', 'test')}
header = ",".join(housing.feature_names + ["MedianHouseValue"])

filepaths = {split: save_to_multiple_csv_files(data[split], split, header, n_parts = 20) for split in data.keys()
             }
filepaths['val'][:5]

['datasets/housing/my_val_00.csv',
 'datasets/housing/my_val_01.csv',
 'datasets/housing/my_val_02.csv',
 'datasets/housing/my_val_03.csv',
 'datasets/housing/my_val_04.csv']

## Building an Input Pipeline

In [40]:
filepath_dataset = {
    split: tf.data.Dataset.list_files(filepaths[split], shuffle= True) # by default shuffle = True
    for split in data.keys()
}
print(*filepath_dataset['train'].take(5), sep='\n')

tf.Tensor(b'datasets/housing/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_07.csv', shape=(), dtype=string)


In [None]:
# alternative for list_files
aboba_ds = tf.data.Dataset.from_tensor_slices(filepaths['val']).shuffle(10).take(5)
print(*aboba_ds, sep='\n')

In [88]:
dataset = {split: filepath_dataset[split].interleave(
    lambda path: tf.data.TextLineDataset(path).skip(1), # skip first row (header)
    cycle_length = tf.data.AUTOTUNE,
    #block_length = 16,
    num_parallel_calls = tf.data.AUTOTUNE,
    deterministic = False
) for split in data.keys()}

print(*dataset['val'].take(5), sep='\n\n')

tf.Tensor(b'10.7958,25.0,7.950089126559715,0.9857397504456328,1608.0,2.8663101604278074,37.27,-122.03,5.00001', shape=(), dtype=string)

tf.Tensor(b'5.8652,20.0,7.116022099447513,0.988950276243094,539.0,2.977900552486188,34.18,-118.9,3.026', shape=(), dtype=string)

tf.Tensor(b'2.0057,17.0,5.100806451612903,1.060483870967742,1024.0,2.064516129032258,37.97,-120.33,1.189', shape=(), dtype=string)

tf.Tensor(b'2.7344,35.0,5.735955056179775,1.0617977528089888,953.0,2.6769662921348316,37.31,-120.46,0.878', shape=(), dtype=string)

tf.Tensor(b'5.1985,30.0,6.334128878281623,1.0190930787589498,2114.0,2.522673031026253,34.0,-118.04,2.792', shape=(), dtype=string)


In [180]:
n_features = X['train'].shape[-1] #8
assert n_features == 8, 'aboba'

@tf.function
def preprocess(line, split = 'train'):
    fields = tf.io.decode_csv(line,
                              record_defaults=[0.]*n_features + [tf.constant([], dtype=tf.float32)])
    x, y = tf.stack(fields[:-1]), tf.stack(fields[-1:])
    mean, var = X_mean_var[split]
    return (x - mean) / var, y

In [181]:
preprocess(b'10.7958,25.0,7.950089126559715,0.9857397504456328,1608.0,2.8663101604278074,37.27,-122.03,5.00001')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 1.9781709e+00, -2.2624265e-02,  5.9477699e-01, -7.7682143e-01,
         1.4654540e-04, -1.5605444e-03,  3.6099887e-01, -6.1708879e-01],
       dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.00001], dtype=float32)>)

## Complete fn

In [182]:
def csv_reader_dataset(filepaths, repeat_num = 1, n_readers = tf.data.AUTOTUNE,
                       n_read_threads = tf.data.AUTOTUNE, shuffle_buffer_size = 10000,
                       n_parse_threads = 5, batch_size = 32):
    
    return tf.data.Dataset.list_files(filepaths, shuffle = True).repeat(repeat_num).interleave(
    lambda path: tf.data.TextLineDataset(path).skip(1),
    cycle_length = n_readers,
    num_parallel_calls = n_read_threads,
    deterministic = False).shuffle(shuffle_buffer_size).map(preprocess, num_parallel_calls = n_parse_threads).batch(batch_size).prefetch(1)

In [183]:
train_set = csv_reader_dataset(filepaths['train'], batch_size=3)
for X_batch, y_batch in train_set.take(2):
    print("X =", X_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[ 2.7219290e-01 -2.2624265e-02 -2.4003375e-01 -3.3482653e-01
   8.2998414e-04 -2.4873824e-04  3.8725787e-01 -6.0709274e-01]
 [ 3.3990380e-01  5.8546226e-02  2.6861336e-02 -8.1814337e-01
  -4.9866445e-04 -2.9836772e-03  4.5728242e-01 -6.2958431e-01]
 [-3.6715299e-01  6.4790107e-02 -4.7858879e-01 -8.6362475e-01
  -4.6745720e-04  5.9427796e-03 -3.6331865e-01  3.3001697e-01]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[2.307]
 [2.27 ]
 [0.973]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[ 5.7708580e-02  1.4839037e-02  2.2341978e-02 -1.8193322e-01
  -5.0060262e-05  3.3784460e-03 -3.9395431e-01  3.8749194e-01]
 [-3.8158506e-01  3.9814573e-02 -9.3437485e-02 -6.7295682e-01
  -3.8007690e-04 -2.8853829e-03  2.5377330e-01 -6.7318536e-02]
 [ 4.3704927e-01  1.3971671e-01 -9.4473727e-02 -1.0433053e+00
  -5.2285008e-04 -3.2577531e-03 -3.1079984e-01  3.3251455e-01]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[2.259]
 [0.662]
 [3.179]], shape=(3, 1), dtype=float32)



## Training model with this dataset

In [188]:
keras.backend.clear_session()
batch_size = 32

In [192]:
train_set = csv_reader_dataset(filepaths['train'], repeat_num=None)

In [193]:
model = keras.Sequential(
    [keras.layers.Dense(30, activation = 'relu', input_shape = X['train'].shape[1:]),
     keras.layers.Dense(1)]
)

model.compile(loss = 'mse', optimizer = keras.optimizers.Adam(learning_rate=1e-3))
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_2 (Dense)             (None, 30)                270       
                                                                 
 dense_3 (Dense)             (None, 1)                 31        
                                                                 
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________


In [194]:
model.fit(train_set, steps_per_epoch = len(X['train'] // batch_size) ,epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
  318/11610 [..............................] - ETA: 25s - loss: 0.2978

KeyboardInterrupt: ignored

## Custom training loop

In [195]:
keras.backend.clear_session()

optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error


In [202]:
def train(model, n_epochs, train_filepaths, **kwargs):
    train_set = csv_reader_dataset(train_filepaths, repeat_num = n_epochs, **kwargs)

    steps_per_epoch = len(X['train']) // kwargs['batch_size']
    total_steps = n_epochs * steps_per_epoch

    for global_step, (X_batch, y_batch) in enumerate(train_set.take(total_steps), start = 1):
        if tf.equal(global_step % 100, 0):
            tf.print(f'\rGlobal step: {global_step} / {total_steps}')
            #tf.print(f'Train loss = {loss}')
        
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses) # accumulate losses
            

        gradients = tape.gradient(loss, model.trainable_variables) # compute grad
        optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # optimizer's step


In [203]:
train_config = {
    'batch_size': 32
}

train(model, 10, filepaths['train'], **train_config)

Global step: 100 / 3620
Train loss = 0.15878109633922577
Global step: 200 / 3620
Train loss = 0.20291303098201752
Global step: 300 / 3620
Train loss = 0.35004815459251404
Global step: 400 / 3620
Train loss = 0.24711014330387115
Global step: 500 / 3620
Train loss = 0.3956557512283325
Global step: 600 / 3620
Train loss = 0.8426235318183899
Global step: 700 / 3620
Train loss = 0.23693214356899261


KeyboardInterrupt: ignored