# Using TF's data API to shuflle the dataset, load and preprocess it.

## Importing libraries

In [60]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os

## Shuffling the data

In [2]:
tf.random.set_seed(42)

In [3]:
dataset = tf.data.Dataset.range(10).repeat(3) # Creates the dataset

In [4]:
dataset = dataset.shuffle(buffer_size = 3, seed = 42).batch(7) # Shuffles the data

In [5]:
for item in dataset:
    print(item)

tf.Tensor([1 3 0 4 2 5 6], shape=(7,), dtype=int64)
tf.Tensor([8 7 1 0 3 2 5], shape=(7,), dtype=int64)
tf.Tensor([4 6 9 8 9 7 0], shape=(7,), dtype=int64)
tf.Tensor([3 1 4 5 2 8 7], shape=(7,), dtype=int64)
tf.Tensor([6 9], shape=(2,), dtype=int64)


- If we call `repeat()` on a shuffled dataset, it will by default create a new order at every iteration. If we prefer to use the same order in each iteration, set `reshuffle_each_iteration = False`.

## Shuffling the California housing dataset

- First we need to load the dataset, split them into train, validation and test sets and scale them.

In [6]:
housing = fetch_california_housing()

In [7]:
housing

{'data': array([[   8.3252    ,   41.        ,    6.98412698, ...,    2.55555556,
           37.88      , -122.23      ],
        [   8.3014    ,   21.        ,    6.23813708, ...,    2.10984183,
           37.86      , -122.22      ],
        [   7.2574    ,   52.        ,    8.28813559, ...,    2.80225989,
           37.85      , -122.24      ],
        ...,
        [   1.7       ,   17.        ,    5.20554273, ...,    2.3256351 ,
           39.43      , -121.22      ],
        [   1.8672    ,   18.        ,    5.32951289, ...,    2.12320917,
           39.43      , -121.32      ],
        [   2.3886    ,   16.        ,    5.25471698, ...,    2.61698113,
           39.37      , -121.24      ]]),
 'target': array([4.526, 3.585, 3.521, ..., 0.923, 0.847, 0.894]),
 'frame': None,
 'target_names': ['MedHouseVal'],
 'feature_names': ['MedInc',
  'HouseAge',
  'AveRooms',
  'AveBedrms',
  'Population',
  'AveOccup',
  'Latitude',
  'Longitude'],
 'DESCR': '.. _california_housing_dataset:\n

In [8]:
x = housing.data

In [9]:
y = housing.target.reshape(-1, 1)

In [10]:
x_train_full, x_test, y_train_full, y_test = train_test_split(x, y, random_state = 42)

In [11]:
x_train, x_valid, y_train, y_valid = train_test_split(x_train_full, y_train_full, random_state = 42)

In [12]:
scaler = StandardScaler()

In [13]:
scaler.fit(x_train)

StandardScaler()

In [14]:
x_mean = scaler.mean_
x_std = scaler.scale_

- Now we will be splitting the train, test and validation data and store them in multiple csv files.

In [15]:
for file_idx, row_indices in enumerate(np.array_split(np.arange(len(x_train)), 10)):
    print(file_idx, row_indices)

0 [   0    1    2 ... 1158 1159 1160]
1 [1161 1162 1163 ... 2319 2320 2321]
2 [2322 2323 2324 ... 3480 3481 3482]
3 [3483 3484 3485 ... 4641 4642 4643]
4 [4644 4645 4646 ... 5802 5803 5804]
5 [5805 5806 5807 ... 6963 6964 6965]
6 [6966 6967 6968 ... 8124 8125 8126]
7 [8127 8128 8129 ... 9285 9286 9287]
8 [ 9288  9289  9290 ... 10446 10447 10448]
9 [10449 10450 10451 ... 11607 11608 11609]


In [16]:
def save_to_mutiple_csv_files(data, name_prefix, header = None, n_parts = 10): 
    housing_dir = os.path.join('datasets', 'housing') 
    os.makedirs(housing_dir, exist_ok = True) # Creating base directories
    path_format = os.path.join(housing_dir, '{}_{:02d}.csv') # Path for each part of the set.
    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        #Dividing the entire data into required no. of parts and assign part no(file_idx) and index for each instance
        part_csv = path_format.format(name_prefix, file_idx) # Adding the name of the set and the file index to path                         
        filepaths.append(part_csv)
        with open(part_csv, 'wt', encoding = 'utf-8') as f: # Opening the created csv file for writing
            if header is not None:
                f.write(header) # Writing the header if available
                f.write('\n')
            for row_idx in row_indices:
                f.write(','.join(repr(col) for col in data[row_idx])) # Writing down the features of each instance seperated by commas.
                f.write('\n')
    return filepaths

In [17]:
x_train.shape

(11610, 8)

In [18]:
np.c_[x_train, y_train].shape

(11610, 9)

In [19]:
# Concatenating the train, validation and test data with the respective labels.
train_data = np.c_[x_train, y_train]
valid_data = np.c_[x_valid, y_valid]
test_data = np.c_[x_test, y_test]

In [20]:
housing.feature_names

['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude']

In [21]:
housing.target_names

['MedHouseVal']

In [22]:
header_cols = housing.feature_names + housing.target_names # Creating the header for the csv files

In [23]:
header = ','.join(header_cols)

In [24]:
# Splitting the datasets into multiple csv files
train_filepaths = save_to_mutiple_csv_files(train_data, 'train', header, n_parts = 20)
valid_filepaths = save_to_mutiple_csv_files(valid_data, 'validation', header, n_parts = 10)
test_fileaths = save_to_mutiple_csv_files(test_data, 'test', header, n_parts = 10)

In [25]:
pd.read_csv(train_filepaths[0]).head() # First 5 lines of the first part of training data

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedHouseVal
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [26]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline())

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedHouseVal

3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442

5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687

3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621

7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621



In [27]:
train_filepaths

['datasets/housing/train_00.csv',
 'datasets/housing/train_01.csv',
 'datasets/housing/train_02.csv',
 'datasets/housing/train_03.csv',
 'datasets/housing/train_04.csv',
 'datasets/housing/train_05.csv',
 'datasets/housing/train_06.csv',
 'datasets/housing/train_07.csv',
 'datasets/housing/train_08.csv',
 'datasets/housing/train_09.csv',
 'datasets/housing/train_10.csv',
 'datasets/housing/train_11.csv',
 'datasets/housing/train_12.csv',
 'datasets/housing/train_13.csv',
 'datasets/housing/train_14.csv',
 'datasets/housing/train_15.csv',
 'datasets/housing/train_16.csv',
 'datasets/housing/train_17.csv',
 'datasets/housing/train_18.csv',
 'datasets/housing/train_19.csv']

## Building an input pipeline

In [28]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed = 42) # Returns a dataset that shuffles the file paths.

In [29]:
for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'datasets/housing/train_15.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_08.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_03.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_04.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/train_17.csv', shap

- We can set `shuffle = False` in `list_files()` method if we dont want the shuffling of items to happen.

In [30]:
n_readers = 5
dataset = filepath_dataset.interleave(lambda filepath : tf.data.TextLineDataset(filepath).skip(1), cycle_length = n_readers)

- The `interleave()` method will create a dataset pulling out 5 filepaths from the `filepath_dataset` and to each one it will apply the lambda function given to create a new dataset using `TextLineDataset()` method.
- At this stage there will be 7 datasets in total; the `filepath_dataset`, the `interleave()` dataset, and 5 `TextLineDatasets()`.
- When we iterate over the interleave dataset, it will iterate thorugh these 5 TextLineDatasets and read 1 line from each of them until all datasets are out of items. Then it will take 5 another 5 filepaths and repeat the same until it runs out of filepaths.
- For interleave to work properly it is good to have files of identical length, otherwise the ends of the longest files will not be interleaved.
- By default `interleave()` does not use parallellism. If we want it to read multiple files in parallel, set `num_parallel_calls` to the no. of threads we want.

In [31]:
for line in dataset.take(5):
    print(line.numpy())

b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504'
b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159'
b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'


- These are just byte strings. We need to parse them and scale the data.

- We need to replace the missing values in all the 8 features if any by the default values. This can be done using the `tf.io.decode_csv` func.

In [35]:
record_defaults = [0, np.nan, tf.constant(np.nan, dtype = tf.float64), 'Hello', tf.constant([])] # The default values

In [37]:
parsed_fields = tf.io.decode_csv('1, 2, 3, 4, 5',  record_defaults) # Parsing the csv line

In [38]:
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b' 4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

- Since there are no missing values in the above line, the default values were not used.

In [41]:
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)

In [42]:
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

- The missing values in the line was replaced by the default values.

In [46]:
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex :
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [47]:
try:
    parsed_fields = tf.io.decode_csv('1, 2, 3, 4, 5, 6, 7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


- The no. of fields should match the no. of fields in `record_default`

In [48]:
n_inputs = x_train.shape[-1]

In [51]:
@tf.function
def preprocess(line): # Func to parse and scale the csv files
    defs = [0.] * n_inputs + [tf.constant([], dtype = tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults = defs)
    x = tf.stack(fields[:-1]) # Stacking all the feature 1D tensors
    y = tf.stack(fields[-1:])
    return (x - x_mean) / x_std, y # Scaling the values

In [52]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [53]:
def csv_reader_dataset(filepaths, repeat = 1, n_readers = 5, n_read_threads = None, shuffle_buffer_size = 10000, n_parse_threads = 5, batch_size = 32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(lambda filepath : tf.data.TextLineDataset(filepath).skip(1), cycle_length = n_readers, 
                                 num_parallel_calls = n_read_threads)
    dataset = dataset.shuffle(buffer_size = shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls = n_parse_threads)
    dataset = dataset.batch(batch_size = batch_size)
    return dataset.prefetch(1)

In [54]:
tf.random.set_seed(42)

In [55]:
train_set = csv_reader_dataset(train_filepaths, batch_size = 3) # Getting the sample training dataset

In [57]:
for x_batch, y_batch in train_set.take(2):
    print(x_batch, y_batch)

tf.Tensor(
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]], shape=(3, 8), dtype=float32) tf.Tensor(
[[1.752]
 [1.313]
 [1.535]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]], shape=(3, 8), dtype=float32) tf.Tensor(
[[0.919]
 [1.028]
 [2.182]], shape=(3, 1), dtype=float32)


In [58]:
train_set = csv_reader_dataset(train_filepaths, repeat = None) # Repeat is set to None as this will be taken care by tf.keras
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_fileaths)

- Now that we have loaded and preprocessed the train, validation and test data, we can use it to train and evaluate the model.

In [62]:
def model_reset():
    keras.backend.clear_session()
    tf.random.set_seed(42)
    np.random.seed(42)

In [63]:
model_reset()

In [65]:
model = keras.models.Sequential()
model.add(keras.layers.Dense(30, activation = 'relu', input_shape = x_train.shape[1:]))
model.add(keras.layers.Dense(1))

In [66]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 30)                270       
_________________________________________________________________
dense_2 (Dense)              (None, 1)                 31        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________


In [67]:
model.compile(loss = 'mse', optimizer = keras.optimizers.SGD(lr = 1e-3))

In [68]:
batch_size = 32
history = model.fit(train_set, steps_per_epoch = len(x_train) // batch_size, epochs = 10, validation_data = valid_set)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [69]:
model.evaluate(test_set, steps = len(x_test) // batch_size)



0.4787752032279968

- We can also use a custom training loop to train the model using the data.

In [75]:
optimizer = keras.optimizers.Nadam(lr = 0.01)
loss_fn = keras.losses.mean_squared_error

In [80]:
n_epochs = 5
batch_size = 32
n_steps = len(x_train) // batch_size
total_steps = n_epochs * n_steps
global_step = 0
for x_batch, y_batch in train_set.take(total_steps): # Taking a batch of x and y
    global_step += 1
    print('\rSteps {}/{}'.format(global_step, total_steps), end = '') # Printing the no. of steps
    with tf.GradientTape() as tape:
        y_pred = model(x_batch)
        main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
        loss = tf.add_n([main_loss] + model.losses)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Steps 1810/1810

- We can include all the loading and preprocessing steps inside the custom train func itself.

In [101]:
model_reset()

In [102]:
@tf.function
def train(model, n_epochs, batch_size = 32, n_readers = 5, n_read_threads = 5, shuffle_buffer_size = 10000, n_parse_threads = 5):
    train_set = csv_reader_dataset(train_filepaths, repeat = n_epochs,  n_readers = n_readers, n_read_threads = n_read_threads, shuffle_buffer_size = shuffle_buffer_size, 
                                   n_parse_threads = n_parse_threads, batch_size = batch_size)
    # Getting the train dataset from mutiple csv files. The repeat param is set to the no.of epochs to get the required instances.
    n_steps_per_epoch = len(x_train) // batch_size
    total_steps = n_steps_per_epoch * n_epochs
    global_step = 0
    for x_batch, y_batch in train_set.take(total_steps): # Taking a batch of x and y
        global_step += 1
        if tf.equal(global_step % 100, 0):
            tf.print('\rGlobal step', global_step, '/', total_steps) # Printing the no. of steps
        # Using tf funcs instead of python funcs as it is needed for autograph and tracing.
        with tf.GradientTape() as tape:
            y_pred = model(x_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [103]:
train(model, 5)

Global step 100 / 1810
Global step 200 / 1810
Global step 300 / 1810
Global step 400 / 1810
Global step 500 / 1810
Global step 600 / 1810
Global step 700 / 1810
Global step 800 / 1810
Global step 900 / 1810
Global step 1000 / 1810
Global step 1100 / 1810
Global step 1200 / 1810
Global step 1300 / 1810
Global step 1400 / 1810
Global step 1500 / 1810
Global step 1600 / 1810
Global step 1700 / 1810
Global step 1800 / 1810


- The training time is much less as we used TF function which got converted to TF graph in the first step of first epoch and further the same graph is used for further steps and epochs.