# 🎲 Pré Processamento de dados com **TENSORFLOW**


Aqui está descrito como dividir o processamento de dados nos casos em que não é possível carregar todo o dataset em **memória**.

### **Data API**

In [1]:
import tensorflow as tf

Caso tenha caído aqui sem antes olhar os outros arquivos, há um código em python explicando (por cima) o que é Tensor, tipos de dados, semelhanças e diferenças com a biblioteca **numpy** etc. Caso não tenha visto ainda, acesse, nesse meso repositório, o arquivo [TensorFlowExplanation.ipynb](TensorFlowExplanation.ipynb)

O código abaixo cria um tensor de range **10** e o **divide** em pedaços.

In [2]:
X = tf.range(10) # Criando qualquer Tensor de dados
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

Veja como é simples iterar por cada **conjunto** de dados:

In [3]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


### Transformação de encadeamento

repeat duplica o *dataset*, enquanto o *batch* cria grupos, nesse caso, de 7.

In [4]:
dataset = dataset.repeat(3).batch(7)

for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


podemos rodar funções também (a função deve ser convertida em TF Function antes de ser passada aqui):

In [5]:
dataset = dataset.map(lambda x: x * 2)
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


o unbatch() separa os elementos dos lotes e os retorna individualmente:

In [6]:
dataset = dataset.unbatch()
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, sh

In [7]:
dataset = dataset.filter(lambda x: x < 10)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)


para examinar apenas alguns elementos:

In [8]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)


# 🎲 Embaralhando os dados (Shuffle)

In [9]:
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset.take(3):
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)


Gerando arquivos para serem lidos de forma **otimizada**:

In [10]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import os

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)

# Média e desvio padrão, serão usados posteriomente
X_mean = scaler.mean_
X_std = scaler.scale_

### O código abaixo foi retirado inteiramente do livro "**Mãos à Obra: Aprendizado de Máquina com Scikit-Learn, Keras & TensorFlow**", com acréscimos de minha autoria.

In [11]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("ArquivosGeradosPelosCodigos/datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([str(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

In [12]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

referenciando o filepath

Criando conjunto de dados contendo os arquivos de treinamento que estão em uma pasta:

> Por padrão, **list_files()** retorna um conjunto de dados que **embaralha** os caminhos dos arquivos.

In [13]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
filepath_dataset

<_ShuffleDataset element_spec=TensorSpec(shape=(), dtype=tf.string, name=None)>

In [14]:
n_readers = 5
dataset = filepath_dataset.interleave( # Cria um conjunto de dados que puxará cinco caminhos de arquivos do filepath_dataset e, para cada um deles, ele chamará a função que será passada, nese caso, uma lambda funcion.
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers, #
    num_parallel_calls=tf.data.experimental.AUTOTUNE # Ler o dataset em paralelo, o *AUTOTUNE* escolhe o número de threads dinamicamente com base na CPU.
)

printando 5 exemplos

In [15]:
for line in dataset.take(5):
    print(line.numpy())

b'4.2083,44.0,5.323204419889502,0.9171270718232044,846.0,2.3370165745856353,37.47,-122.2,2.782'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442'


# 🎲 Pré-processamento dos dados

### Função **preprocess**
A função **preprocess** processa uma linha de dados no formato CSV, separando as características (features) do rótulo (label), e normaliza as características subtraindo a média e dividindo pelo desvio padrão. Ela retorna as características normalizadas e o rótulo.

**Passos**:
1. **Definição dos valores padrão**: A função define um valor padrão para cada campo da linha CSV, sendo 0. para as características e um tensor vazio para o rótulo.

```CMD
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
```

2. **Decodificação da linha CSV**: A linha é convertida em uma lista de tensores usando a função tf.io.decode_csv. Cada valor no CSV é transformado em um tensor.

```
fields = tf.io.decode_csv(line, record_defaults=defs)
```

3. **Separação das características e do rótulo**: As características (primeiras colunas) são separadas do rótulo (última coluna), criando dois tensores: x para as características e y para o rótulo.

```
x = tf.stack(fields[:-1])  # Características
y = tf.stack(fields[-1:])  # Rótulo
```

4. **Normalização das características**: As características são normalizadas subtraindo a média (X_mean) e dividindo pelo desvio padrão (X_std), retornando as características normalizadas e o rótulo.

```
return (x - X_mean) / X_std, y
```

In [16]:
n_inputs = 8

@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215985, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

# 🚀 Juntando a **Leitura** com o **Pré-Processamento**

In [17]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5, n_read_threads=None, shuffle_buffer_size=10000, n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = filepath_dataset.interleave( # Cria um conjunto de dados que puxará cinco caminhos de arquivos do filepath_dataset e, para cada um deles, ele chamará a função que será passada, nese caso, uma lambda funcion.
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers, #
        num_parallel_calls=n_read_threads # Ler o dataset em paralelo, o *AUTOTUNE* escolhe o número de threads dinamicamente com base na CPU.
    )
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    return dataset.batch(batch_size).prefetch(1)

![Minha Imagem](images/pipeline.jpg)

# 🤖 Treinando um modelo com **KERAS** usando o **PIPELINE** que trabalhamos até agora!

In [18]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [19]:
from tensorflow import keras

In [20]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=X_train.shape[1:]),
    keras.layers.Dense(1),
])
model.compile(loss='MSE', optimizer=keras.optimizers.SGD(learning_rate=1e-3))
batch_size = 32
history = model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10, validation_data=valid_set)

Epoch 1/10


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 2.7951 - val_loss: 0.8699
Epoch 2/10
[1m137/362[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m0s[0m 1ms/step - loss: 0.8370



[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.8001 - val_loss: 0.6891
Epoch 3/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.6684 - val_loss: 0.6239
Epoch 4/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.6167 - val_loss: 0.5788
Epoch 5/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.5781 - val_loss: 0.5443
Epoch 6/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.5312 - val_loss: 0.5177
Epoch 7/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.5211 - val_loss: 0.4978
Epoch 8/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.5041 - val_loss: 0.4812
Epoch 9/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.4936 - val_loss: 0.4686
Epoch 10/10
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━

In [21]:
model.evaluate(test_set)

[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 992us/step - loss: 0.4624


0.45811715722084045

In [22]:
new_set = test_set.take(3).map(lambda X, y: X) # Três novas instâncias
model.predict(new_set)

[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step


array([[2.194584  ],
       [2.4585843 ],
       [1.6384693 ],
       [1.6487656 ],
       [2.9252386 ],
       [2.5406444 ],
       [1.8640559 ],
       [1.8613362 ],
       [2.1053681 ],
       [0.6044264 ],
       [1.221794  ],
       [0.41859362],
       [2.242443  ],
       [1.4441857 ],
       [2.321509  ],
       [2.3609056 ],
       [1.1015701 ],
       [3.54688   ],
       [2.240097  ],
       [2.4807873 ],
       [3.0862296 ],
       [2.8283174 ],
       [2.4194024 ],
       [1.908144  ],
       [2.1460903 ],
       [3.198835  ],
       [1.6646818 ],
       [1.6228166 ],
       [2.0827374 ],
       [1.5109441 ],
       [1.219913  ],
       [3.4387276 ],
       [2.1047635 ],
       [2.6320863 ],
       [3.5839622 ],
       [0.9674235 ],
       [2.877403  ],
       [1.1428621 ],
       [2.7800267 ],
       [1.9673657 ],
       [2.657668  ],
       [2.3161569 ],
       [2.3518004 ],
       [2.5178075 ],
       [2.6081023 ],
       [3.0369234 ],
       [1.8887975 ],
       [2.578

# 🎲 TFRecord

é o **formato** de dados preferido do tensorflow. Apesar do CSV ser conveniente, para dados enormes não é o indicado, ai que entre o ***TFRecord***

In [None]:
with tf.io.TFRecordWriter("ArquivosGeradosPelosCodigos/meu_tfrecord.tfrecord") as f:
    f.write(b"Primeiro registro no tfrecord!")
    f.write(b"Segundo registro!")

In [24]:
filepaths = ["ArquivosGeradosPelosCodigos/meu_tfrecord.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)

tf.Tensor(b'Primeiro registro no tfrecord!', shape=(), dtype=string)
tf.Tensor(b'Segundo registro!', shape=(), dtype=string)


é possível criar um arquivo *TFRecord* compactado:

In [25]:
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("ArquivosGeradosPelosCodigos/meu_tfrecord_compactado.tfrecord", options) as f:
    f.write(b"Primeiro registro no tfrecord!")
    f.write(b"Segundo registro!")

In [27]:
dataset = tf.data.TFRecordDataset(["ArquivosGeradosPelosCodigos/meu_tfrecord_compactado.tfrecord"], compression_type="GZIP")
for item in dataset:
    print(item)

tf.Tensor(b'Primeiro registro no tfrecord!', shape=(), dtype=string)
tf.Tensor(b'Segundo registro!', shape=(), dtype=string)
