Dataset if the core element of tf.data API

In [1]:
import tensorflow as tf

In [2]:
X = tf.range(10)
X

<tf.Tensor: shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])>

# Read data from tensor

In [3]:
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [4]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [6]:
# other way
for item in tf.data.Dataset.range(10): # opposite to above example, here datatype will be int64
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [7]:
X_nested = {
    'a': ([1,2,3],[4,5,6]),
    'b': ([7,8,9])
}
dataset = tf.data.Dataset.from_tensor_slices(X_nested)

for item in dataset:
    print(item)

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


# Chaining Transformations

In [1]:
import tensorflow as tf
from tensorflow.data import Dataset
dataset = Dataset.from_tensor_slices(tf.range(10))

In [2]:
dataset_repeat = dataset.repeat(3).batch(7)
for item in dataset_repeat:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [4]:
dataset_aquare = dataset.map( lambda x: x ** 2).repeat(3).batch(5)

for item in dataset_aquare:
    print(item)

tf.Tensor([ 0  1  4  9 16], shape=(5,), dtype=int32)
tf.Tensor([25 36 49 64 81], shape=(5,), dtype=int32)
tf.Tensor([ 0  1  4  9 16], shape=(5,), dtype=int32)
tf.Tensor([25 36 49 64 81], shape=(5,), dtype=int32)
tf.Tensor([ 0  1  4  9 16], shape=(5,), dtype=int32)
tf.Tensor([25 36 49 64 81], shape=(5,), dtype=int32)


In [6]:
dataset_aquare = dataset.map( lambda x: x ** 2).filter(lambda x: (x%2 == 0)).repeat(3).batch(5)

for item in dataset_aquare:
    print(item)

Cause: could not parse the source code of <function <lambda> at 0x0000027D101C7BE0>: found multiple definitions with identical signatures at the location. This error may be avoided by defining each lambda on a single line and with unique argument names. The matching definitions were:
Match 0:
lambda x: x % 2 == 0

Match 1:
lambda x: x ** 2

Cause: could not parse the source code of <function <lambda> at 0x0000027D101C7BE0>: found multiple definitions with identical signatures at the location. This error may be avoided by defining each lambda on a single line and with unique argument names. The matching definitions were:
Match 0:
lambda x: x % 2 == 0

Match 1:
lambda x: x ** 2

tf.Tensor([ 0  4 16 36 64], shape=(5,), dtype=int32)
tf.Tensor([ 0  4 16 36 64], shape=(5,), dtype=int32)
tf.Tensor([ 0  4 16 36 64], shape=(5,), dtype=int32)


In [8]:
dataset_fltr = dataset.map( lambda x: x ** 2).repeat(3).filter(lambda x: tf.reduce_sum(x) > 25) # only contains the batchs whose sum is greater than 50
for item in dataset_fltr:
    print(item)

tf.Tensor(36, shape=(), dtype=int32)
tf.Tensor(49, shape=(), dtype=int32)
tf.Tensor(64, shape=(), dtype=int32)
tf.Tensor(81, shape=(), dtype=int32)
tf.Tensor(36, shape=(), dtype=int32)
tf.Tensor(49, shape=(), dtype=int32)
tf.Tensor(64, shape=(), dtype=int32)
tf.Tensor(81, shape=(), dtype=int32)
tf.Tensor(36, shape=(), dtype=int32)
tf.Tensor(49, shape=(), dtype=int32)
tf.Tensor(64, shape=(), dtype=int32)
tf.Tensor(81, shape=(), dtype=int32)


In [9]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)


# Shuffling the Data

In [12]:
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=4, seed=2024).batch(7)
for item in dataset:
    print(item)

tf.Tensor([2 3 4 6 7 1 8], shape=(7,), dtype=int64)
tf.Tensor([5 0 0 1 2 4 5], shape=(7,), dtype=int64)
tf.Tensor([3 7 8 9 9 6], shape=(6,), dtype=int64)


# Interleaving Lines from Multiple Files

move record from one file to other at load time

In [13]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

## Split dataset and write into files

In [14]:
import numpy as np
from pathlib import Path

def save_to_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = Path('data') / "datasets" / "housing"
    housing_dir.mkdir(parents=True, exist_ok=True)
    filename_format = "my_{}_{:02d}.csv"

    filepaths = []
    m = len(data)
    chunks = np.array_split(np.arange(m), n_parts)
    for file_idx, row_indices in enumerate(chunks):
        part_csv = housing_dir / filename_format.format(name_prefix, file_idx)
        filepaths.append(str(part_csv))
        with open(part_csv, "w") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

In [15]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

In [16]:
train_filepaths = save_to_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_csv_files(test_data, "test", header, n_parts=10)

In [17]:
# sample lines
print("".join(open(train_filepaths[0]).readlines()[:4]))

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621



In [18]:
train_filepaths

['data\\datasets\\housing\\my_train_00.csv',
 'data\\datasets\\housing\\my_train_01.csv',
 'data\\datasets\\housing\\my_train_02.csv',
 'data\\datasets\\housing\\my_train_03.csv',
 'data\\datasets\\housing\\my_train_04.csv',
 'data\\datasets\\housing\\my_train_05.csv',
 'data\\datasets\\housing\\my_train_06.csv',
 'data\\datasets\\housing\\my_train_07.csv',
 'data\\datasets\\housing\\my_train_08.csv',
 'data\\datasets\\housing\\my_train_09.csv',
 'data\\datasets\\housing\\my_train_10.csv',
 'data\\datasets\\housing\\my_train_11.csv',
 'data\\datasets\\housing\\my_train_12.csv',
 'data\\datasets\\housing\\my_train_13.csv',
 'data\\datasets\\housing\\my_train_14.csv',
 'data\\datasets\\housing\\my_train_15.csv',
 'data\\datasets\\housing\\my_train_16.csv',
 'data\\datasets\\housing\\my_train_17.csv',
 'data\\datasets\\housing\\my_train_18.csv',
 'data\\datasets\\housing\\my_train_19.csv']

In [19]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=2024)

for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'data\\datasets\\housing\\my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_04.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_18.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_06.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'data\\datasets\\housing\\my_train_03.csv

In [22]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers,
    num_parallel_calls  = 2
)

In [23]:
for line in dataset.take(5):
    print(line)

tf.Tensor(b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504', shape=(), dtype=string)
tf.Tensor(b'3.6641,17.0,5.577142857142857,1.1542857142857144,511.0,2.92,40.85,-121.07,0.808', shape=(), dtype=string)
tf.Tensor(b'5.9522,26.0,6.196521739130435,1.0069565217391305,1479.0,2.5721739130434784,34.5,-119.75,4.384', shape=(), dtype=string)
tf.Tensor(b'1.6571,34.0,4.454976303317536,1.0876777251184835,1358.0,3.2180094786729856,37.94,-122.35,1.052', shape=(), dtype=string)
tf.Tensor(b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418', shape=(), dtype=string)


# Preprocessing the data

In [24]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler.fit(X_train)

In [25]:
X_mean, X_std = scaler.mean_, scaler.scale_  # extra code
n_inputs = 8

def parse_csv_line(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]), tf.stack(fields[-1:])

def preprocess(line):
    x, y = parse_csv_line(line)
    return (x - X_mean) / X_std, y


In [26]:
preprocess(b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.36618188, -0.998705  ,  0.00781878, -0.00675364, -0.06140145,
         0.0072037 , -0.94465536,  0.9367464 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.418], dtype=float32)>)

# Common code

In [29]:
def csv_reader_dataset(filepaths, 
                       n_readers = 5,
                       n_read_threads = None,
                       n_parse_threads = 5,
                       shuffle_buffer_size = 10000,
                       seed = 2024,
                       batch_size = 32):
    filepaths_dataset = tf.data.Dataset.list_files(filepaths)
    dataset = filepaths_dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers,
        num_parallel_calls= n_read_threads
    )

    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size, seed =seed)
    return dataset.batch(batch_size).prefetch(1)


In [30]:
train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
    print("X =", X_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[ 3.8949592  -0.60316414  0.2525362  -0.42889774 -1.1337166  -0.15197325
   0.7465546  -1.2472363 ]
 [-0.69191706 -0.91959685 -0.74102616 -0.10083511  1.3987724  -0.4382982
  -1.3428637   1.2765881 ]
 [-1.0656718   0.1879177  -0.77928287 -0.1753464  -0.31510663 -0.22388874
   1.3977404  -0.9073946 ]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[5.00001]
 [1.713  ]
 [0.987  ]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-0.42343786 -0.36583957 -1.0228724  -0.25668368  1.5940706   1.4113777
  -0.8931223   0.8417895 ]
 [-0.31365788 -0.12851503 -0.15183015 -0.02689418  0.5272311  -0.21276747
   0.49825948 -1.037335  ]
 [ 0.27813795 -0.44494775 -0.36923787 -0.18608692  1.2983854  -0.27202266
  -0.7760026   0.5819124 ]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[1.517]
 [1.575]
 [3.669]], shape=(3, 1), dtype=float32)



# Using dataset with Keras for model training

In [31]:
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [32]:
tf.keras.backend.clear_session()
tf.random.set_seed(2024)

In [35]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal",
                          input_shape=X_train.shape[1:]),
    tf.keras.layers.Dense(1),
])
model.compile(loss="mse", optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001))
model.fit(train_set, validation_data=valid_set, epochs=5) # notice the way we pass the data

Epoch 1/5


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x27d13dd5cf0>

In [36]:
test_mse = model.evaluate(test_set)
new_set = test_set.take(3) 
y_pred = model.predict(new_set)
print(test_mse)

1.141301155090332


In [38]:
y_pred[0:2,:]

array([[1.3985403],
       [1.4472278]], dtype=float32)