In [74]:
import tensorflow as tf
import numpy as np

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pathlib import Path

In [19]:
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X)
[print(item) for item in dataset]

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


[None, None, None, None, None, None, None, None, None, None]

In [20]:
X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}

dataset = tf.data.Dataset.from_tensor_slices(X_nested)
[print(item) for item in dataset]

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


[None, None, None]

In [None]:
# chaining Tansformation

dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
dataset = dataset.repeat(3).batch(7)  # we can do drop remainder = True

In [42]:
for i in dataset:
    print(i)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [None]:
dataset = dataset.map(
    lambda x: x * 2
)  # we can spawn multiple threads to speed things up by setting num_parallel_calls to number of therads
[print(item) for item in dataset]

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


[None, None, None, None, None]

In [None]:
dataset = dataset.filter(lambda x: tf.reduce_sum(x) < 50)
[print(item) for item in dataset]

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


[None, None]

In [None]:
[print(item) for item in dataset.take(2)]  # taking only a sample

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)


[None, None]

In [47]:
# shuffling data
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7)
[print(item) for item in dataset]

tf.Tensor([1 4 2 3 5 0 6], shape=(7,), dtype=int64)
tf.Tensor([9 8 2 0 3 1 4], shape=(7,), dtype=int64)
tf.Tensor([5 7 9 6 7 8], shape=(6,), dtype=int64)


2025-03-18 21:03:37.861531: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


[None, None, None]

In [None]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42
)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42
)

In [69]:
def save_to_csv_files(data, name_prefix, header=None, n_parts=10):

    housing_dir = Path() / "datasets" / "housing"
    housing_dir.mkdir(exist_ok=True, parents=True)
    filename_format = "my_{}_{:02d}.csv"

    filepaths = []
    m = len(data)
    chunks = np.array_split(np.arange(m), n_parts)
    for file_idx, row_indices in enumerate(chunks):
        part_csv = housing_dir / filename_format.format(name_prefix, file_idx)
        filepaths.append(part_csv.as_posix())

        with open(part_csv, "w") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths


train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_csv_files(test_data, "test", header, n_parts=10)

In [None]:
filepath_datasets = tf.data.Dataset.list_files(train_filepaths, seed=42)
# for filepath in filepath_datasets:
#     print(filepath)

n_readers = 5
dataset = filepath_datasets.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers,
    num_parallel_calls=2,
)

In [124]:
# Preprocessing data

scaler = StandardScaler()
scaler.fit(X_train)

X_mean, X_std = scaler.mean_, scaler.scale_
n_inputs = 8  # number of columns


def parse_csv_line(line):
    defs = [0.0] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]), tf.stack(fields[-1:])


def preprocess(line):
    x, y = parse_csv_line(line)
    return (x - X_mean) / X_std, y


# Putting everything together + prefetching


def csv_reader_dataset(
    filepaths,
    n_readers=5,
    n_read_threads=None,
    n_parse_threads=5,
    shuffle_buffer_size=10_000,
    seed=42,
    batch_size=32,
):
    dataset = tf.data.Dataset.list_files(filepaths, seed=seed)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers,
        num_parallel_calls=n_read_threads,
    )
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size, seed=seed)
    return dataset.batch(batch_size=batch_size).prefetch(1), dataset

In [99]:
example_set = csv_reader_dataset(train_filepaths, batch_size=3)

In [None]:
# Using the dataset with keras

train_set, dtrain = csv_reader_dataset(train_filepaths)
valid_set, dvalid = csv_reader_dataset(valid_filepaths)
test_set, dtest = csv_reader_dataset(test_filepaths)

tf.keras.backend.clear_session()
tf.keras.utils.set_random_seed(42)

model = tf.keras.Sequential(
    [
        tf.keras.layers.Input(shape=X_train.shape[1:]),
        tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal"),
        tf.keras.layers.Dense(1),
    ]
)

model.compile(loss="mse", optimizer="sgd")
model.fit(train_set, validation_data=valid_set, epochs=5)

Epoch 1/5
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 883us/step - loss: 3.1663 - val_loss: 8.3868
Epoch 2/5




[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 721us/step - loss: 0.5241 - val_loss: 45.4512
Epoch 3/5
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 720us/step - loss: 0.5398 - val_loss: 9.6166
Epoch 4/5
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 718us/step - loss: 0.5096 - val_loss: 0.9846
Epoch 5/5
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 717us/step - loss: 0.4298 - val_loss: 0.3954


<keras.src.callbacks.history.History at 0x30e8e3bf0>

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 8), dtype=tf.float32, name=None), TensorSpec(shape=(None, 1), dtype=tf.float32, name=None))>