In [1]:
import gc
from pathlib import Path

ROOT_DIR = Path().absolute().parent
MLRUNS_DIR = ROOT_DIR.parents[1] / "mlruns"
DATA_DIR = ROOT_DIR / "dataset"
TFR_DIR = DATA_DIR / "tfrecords"
PROTO_DIR = ROOT_DIR / "protobufs"

if not TFR_DIR.is_dir():
    TFR_DIR.mkdir(parents=True)
if not PROTO_DIR.is_dir():
    PROTO_DIR.mkdir(parents=True)

print(f"{MLRUNS_DIR}\n{DATA_DIR}")

/home/twogoodap/Coding_Playground/Machine_Learning/Hands_on_Machine_Learning/handson-ml/mlruns
/home/twogoodap/Coding_Playground/Machine_Learning/Hands_on_Machine_Learning/handson-ml/handson_ml/chapter_13/dataset


In [2]:
import mlflow

mlflow.set_tracking_uri(f"sqlite:///{MLRUNS_DIR}/mlflow.db")
mlflow.set_experiment("tf_data_api")

<Experiment: artifact_location='/home/twogoodap/Coding_Playground/Machine_Learning/Hands_on_Machine_Learning/handson-ml/handson_ml/chapter_13/mlruns/2', creation_time=1699089661167, experiment_id='2', last_update_time=1699089661167, lifecycle_stage='active', name='tf_data_api', tags={}>

## The `tensorflow.data` API

In [4]:
import tensorflow as tf
from tensorflow import data

In [None]:
ts = data.Dataset.from_tensor_slices(tf.range(10))

for t in ts:
    print(t)

In [None]:
ts[0]

In [None]:
rs = data.Dataset.range(10)

for r in rs:
    print(r)

In [None]:
rs[0]

In [None]:
X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}

for x in data.Dataset.from_tensor_slices(X_nested):
    print(x)

In [None]:
X_nested = {"a": [[1, 2, 3], [4, 5, 6]], "b": [[7, 8, 9], [10, 11, 12]]}

for x in data.Dataset.from_tensor_slices(X_nested):
    print(x)

## Chaining Transformations

In [None]:
import tensorflow as tf
from tensorflow import data

In [None]:
dataset = data.Dataset.range(10)

for rb in dataset.repeat(5).batch(8):
    print(rb)

In [None]:
for srb in dataset.shuffle(5, seed=42).repeat(5).batch(10):
    print(srb)

In [None]:
for d in dataset:  # No change
    print(d)

In [None]:
for md in dataset.map(lambda x: 2 * x + 4, num_parallel_calls=data.AUTOTUNE):
    print(md)

In [None]:
for fd in (
    dataset.map(lambda x: (4 * x + 5) // 2, num_parallel_calls=data.AUTOTUNE)
    .repeat(5)
    .batch(8)
    .filter(lambda x: tf.reduce_sum(x) > 80)
):
    print(fd)

In [None]:
for id in dataset.repeat().batch(7).take(5):  # 5 out of inf
    print(id)

## Shuffling the data

In [None]:
from tensorflow import data

dataset = data.Dataset.range(10)

for srd in dataset.shuffle(buffer_size=10, seed=42).repeat(10).batch(16):
    print(srd)

## Preprocessing using `tf.data`

### Creating the split dataset

In [None]:
import cudf as cd
import dask_cudf as dcd
from sklearn.datasets import fetch_california_housing

dcd.from_cudf(
    cd.from_dataframe(
        fetch_california_housing(as_frame=True)["frame"], allow_copy=True
    ),
    npartitions=5,
).to_csv(str(DATA_DIR / "california_housing"), index=False)

### Reading csv using `tf.data`

In [None]:
import tensorflow as tf
from tensorflow import data, io

In [None]:
N_INPUTS = 8


def parse_csv_line(line: str) -> tuple[tf.Tensor, tf.Tensor]:
    fields = io.decode_csv(
        line, record_defaults=[0.0] * N_INPUTS + [tf.constant([], dtype=tf.float32)]
    )

    return tf.stack(fields[:-1]), tf.stack(fields[-1:])

In [None]:
N_READERS, N_THREADS, SEED = 5, 5, 42

dataset = (
    (
        data.Dataset.list_files(
            str(DATA_DIR / "california_housing" / "*.part"), seed=SEED
        )
        .interleave(
            lambda f: data.TextLineDataset(f).skip(1),
            cycle_length=N_READERS,
            num_parallel_calls=N_THREADS,
        )
        .map(parse_csv_line, num_parallel_calls=N_THREADS)
    )
    .shuffle(buffer_size=10_000)
    .batch(32)
    .prefetch(data.AUTOTUNE)
)

In [None]:
for d in dataset.take(1):
    print(d)

## TfRecord Format

In [None]:
import tensorflow as tf
from tensorflow import data
from tensorflow.io import TFRecordOptions, TFRecordWriter

In [None]:
with TFRecordWriter(str(TFR_DIR / "first.tfrecord")) as f:
    f.write(b"First Record: 01")
    f.write(b"First Record: 02")
    f.write(b"First Record: 03")

with TFRecordWriter(str(TFR_DIR / "second.tfrecord")) as f:
    f.write(b"Second Record: 01")
    f.write(b"Second Record: 02")
    f.write(b"Second Record: 03")

with TFRecordWriter(str(TFR_DIR / "third.tfrecord")) as f:
    f.write(b"Third Record: 01")
    f.write(b"Third Record: 02")
    f.write(b"Third Record: 03")

In [None]:
import glob

rec_ds = data.TFRecordDataset(
    glob.glob(str(TFR_DIR / "*.tfrecord")), num_parallel_reads=3
)

for r in rec_ds:
    print(r)

## Protocol Buffers

In [None]:
import tensorflow as tf
from tensorflow import data
from tensorflow.io import TFRecordOptions, TFRecordWriter

In [None]:
from protobufs.person_pb2 import Person

person = Person(name="Al", id=22, emails=["al@alexandro.com"])
person.emails.append("john@richard.org")

In [None]:
print(person)
print(person.name)
print(person.id)
person.name = "Alexandro"
print(person.name)

In [None]:
print((serialized := person.SerializeToString()))

In [None]:
person2 = Person()
person2.ParseFromString(serialized) == len(serialized)

In [None]:
person2 == person

### Decoding Custom Protobuf using Tensorflow Op

In [None]:
tf.io.decode_proto?

In [None]:
(
    person_tf := tf.io.decode_proto(
        bytes=serialized,
        message_type="Person",
        field_names=["name", "id", "emails"],
        output_types=[tf.string, tf.int32, tf.string],
        descriptor_source=str(PROTO_DIR / "person.desc"),
    )
)

In [None]:
person_tf.values

### TensorFlow Protobufs

```proto
syntax = "proto3";

message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
    oneof kind {
        BytesList bytes_list = 1;
        FloatList float_list = 2;
        Int64List int64_list = 3;
    }
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
```

In [None]:
from tensorflow.train import BytesList, Example, Feature, Features, Int64List

(
    person_example := Example(
        features=Features(
            feature={
                "name": Feature(bytes_list=BytesList(value=[b"Alejandro"])),
                "id": Feature(int64_list=Int64List(value=[1])),
                "emails": Feature(
                    bytes_list=BytesList(
                        value=[b"al@alejandro.com", b"bal@balkrishna.org"]
                    )
                ),
            }
        )
    )
)

In [None]:
tfr_options = {"compression_type": "GZIP"}

with TFRecordWriter(
    str(DATA_DIR / "tfrecords" / "person_example.tfrecord"),
    options=TFRecordOptions(**tfr_options),
) as f:
    f.write(person_example.SerializeToString())

del person_example

In [None]:
Example?

In [None]:
from tensorflow.io import FixedLenFeature, VarLenFeature

person_ds = data.TFRecordDataset(
    str(DATA_DIR / "tfrecords" / "person_example.tfrecord.gz"), **tfr_options
).map(
    lambda ex: tf.io.parse_example(
        ex,
        features={
            "name": VarLenFeature(dtype=tf.string),
            "id": FixedLenFeature(shape=(), dtype=tf.int64),
            "emails": VarLenFeature(dtype=tf.string),
        },
    ),
    num_parallel_calls=data.AUTOTUNE,
)

In [None]:
for p in person_ds:
    print(tf.sparse.to_dense(p["name"]))
    print(p["id"])
    print(p["emails"].values)

### Serializing Images and Tensors

In [None]:
from matplotlib import pyplot as plt
from sklearn.datasets import load_sample_images

img = load_sample_images()["images"][0]
plt.imshow(img)
plt.title("Original Image")
plt.axis(False);

In [None]:
# Only necessary if image is not already a .jpeg
tf.io.encode_jpeg(img)

In [None]:
img_example = Example(
    features=Features(
        feature={
            "image": Feature(
                bytes_list=BytesList(value=[tf.io.encode_jpeg(img).numpy()])
            )
        }
    )
)

In [None]:
with TFRecordWriter(
    str(TFR_DIR / "image_example.tfrecord.gz"), options=TFRecordOptions(**tfr_options)
) as f:
    f.write(img_example.SerializeToString())

del img_example

In [None]:
img_ds = data.TFRecordDataset(
    str(TFR_DIR / "image_example.tfrecord.gz"), **tfr_options
).map(
    lambda ex: tf.io.decode_jpeg(
        tf.io.parse_single_example(
            ex, features={"image": VarLenFeature(dtype=tf.string)}
        )["image"].values[0]
    ),
    num_parallel_calls=data.AUTOTUNE,
)

In [None]:
for i in img_ds:
    plt.imshow(i)
    plt.title("Decoded Image")
    plt.axis(False)

## `SequenceExample` Protobuf

```proto
syntax = "proto3";

message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
    Features context = 1;
    FeatureLists feature_lists = 2;
};
```

In [None]:
from typing import Iterable

from tensorflow.train import FeatureList, FeatureLists, SequenceExample

context = Features(
    feature={
        "author_id": Feature(int64_list=Int64List(value=[123])),
        "title": Feature(bytes_list=BytesList(value=[b"A", b"Desert", b"Place", b"."])),
        "pub_date": Feature(int64_list=Int64List(value=[1623, 12, 25])),
    }
)

content = [
    ["When", "shall", "we", "three", "meet", "again", "?"],
    ["In", "thunder", ",", "lightning", ",", "or", "in", "rain", "?"],
]
comments = [
    ["When", "the", "hurlyburly", "'s", "done", "."],
    ["When", "the", "battle", "'s", "lost", "and", "won", "."],
]


def words_to_feature(words: Iterable[str]) -> Feature:
    return Feature(
        bytes_list=BytesList(value=[bytes(word, encoding="utf8") for word in words])
    )


(
    seq_example := SequenceExample(
        context=context,
        feature_lists=FeatureLists(
            feature_list={
                "content": FeatureList(
                    feature=[words_to_feature(words) for words in content]
                ),
                "comments": FeatureList(
                    feature=[words_to_feature(words) for words in comments]
                ),
            }
        ),
    )
)

In [None]:
with TFRecordWriter(
    str(TFR_DIR / "sequence_example.tfrecord.gz"),
    options=TFRecordOptions(**tfr_options),
) as f:
    f.write(seq_example.SerializeToString())

del seq_example

In [None]:
seq_ds = data.TFRecordDataset(
    str(TFR_DIR / "sequence_example.tfrecord.gz"), **tfr_options
).map(
    lambda ex: tf.io.parse_single_sequence_example(
        ex,
        context_features={
            "author_id": FixedLenFeature(shape=(), dtype=tf.int64),
            "title": VarLenFeature(dtype=tf.string),
            "pub_date": FixedLenFeature(shape=(3,), dtype=tf.int64),
        },
        sequence_features={
            "content": VarLenFeature(dtype=tf.string),
            "comments": VarLenFeature(dtype=tf.string),
        },
    )
)

In [None]:
for parsed_context, s in seq_ds:
    parsed_sequences = {k: tf.RaggedTensor.from_sparse(v) for k, v in s.items()}

In [None]:
parsed_sequences

## Keras Preprocessing Layers

In [4]:
import tensorflow as tf
from tensorflow import data, keras
from tensorflow.keras import layers

In [5]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing(as_frame=True)
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target, test_size=0.2, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_full, y_train_full, test_size=0.2, random_state=42
)

del housing, X_train_full, y_train_full
gc.collect()

0

In [7]:
norm = layers.Normalization()
model = keras.Sequential([norm, layers.Dense(1)])

In [33]:
NUM_TOKENS = 5
inp = tf.constant([[1], [0], [2], [0], [3], [2], [4]])
minp = tf.constant([[1, 2], [0, 1], [2, 3], [0, 0], [3, 4], [2, 1], [4, 0]])

### `CategoryEncoding`

In [34]:
cat_enc = layers.CategoryEncoding(num_tokens=2 * NUM_TOKENS, output_mode="multi_hot")
cat_enc(minp + [0, NUM_TOKENS])

<tf.Tensor: shape=(7, 10), dtype=float32, numpy=
array([[0., 1., 0., 0., 0., 0., 0., 1., 0., 0.],
       [1., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 1., 0., 0., 0., 0.]], dtype=float32)>

In [39]:
print(f"minp.T:\n{tf.transpose(minp)}\n")
oh_enc = layers.CategoryEncoding(num_tokens=NUM_TOKENS, output_mode="one_hot")
layers.Concatenate()([oh_enc(m) for m in tf.transpose(minp)])

minp.T:
[[1 0 2 0 3 2 4]
 [2 1 3 0 4 1 0]]



<tf.Tensor: shape=(7, 10), dtype=float32, numpy=
array([[0., 1., 0., 0., 0., 0., 0., 1., 0., 0.],
       [1., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 1., 0., 0., 0., 0.]], dtype=float32)>

In [43]:
layers.Flatten()(tf.one_hot(minp, depth=NUM_TOKENS))

<tf.Tensor: shape=(7, 10), dtype=float32, numpy=
array([[0., 1., 0., 0., 0., 0., 0., 1., 0., 0.],
       [1., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 1., 0., 0., 0., 0.]], dtype=float32)>