<h1 style='font-size:40px'> Loading and Preprocessing Data with TensorFlow</h1>

<h2 style='font-size:30px'> The Data API</h2>
<div> 
    <ul style='font-size:20px'>
        <li> 
            A Data API é um módulo do Tensor Flow voltado ao tratamento de datasets volumosos. É interessante ser usado quando os dados não cabem na memória RAM ou placa de vídeo.
        </li>
        <li> 
            A classe `Dataset` é onde nossos dados são armazenados.
        </li>
    </ul>
</div>

In [9]:
import tensorflow as tf
import tensorflow.keras as keras

# Criando um `Dataset` a partir de um `tf.range`. 
X = tf.range(10)

# Cada número de `X` será encapsulado em um tensor.
dataset = tf.data.Dataset.from_tensor_slices(X)

for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [73]:
# O mesmo Dataset poderia ter sido montado da seguinte maneira:
dataset = tf.data.Dataset.range(10)

<h3 style='font-size:30px;font-style:italic'> Chaining Transformations</h3>
<div> 
    <ul style='font-size:20px'>
        <li> 
            As funções da classe `Dataset` nunca fazem transformações in-place; sempre retornam um novo objeto.
        </li>
    </ul>
</div>

In [74]:
#  `repeat` repetirá os dados do dataset, enquanto `batch` vai criar batches com n instâncias.
for item in dataset.repeat(3).batch(7):
    # Para evitar que o último batch com os elementos restantes do dataset seja formado, passe `drop_remainder`=True.
    print(item)
    
# Observe que `batch` não embaralha os dados na hora de sua separação.

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


In [39]:
# `map` aplica uma função por elemento.
# `apply` invoca uma transformação a todo o dataset.
list(dataset.map(lambda x: x**2))

[<tf.Tensor: shape=(), dtype=int64, numpy=0>,
 <tf.Tensor: shape=(), dtype=int64, numpy=1>,
 <tf.Tensor: shape=(), dtype=int64, numpy=16>,
 <tf.Tensor: shape=(), dtype=int64, numpy=81>,
 <tf.Tensor: shape=(), dtype=int64, numpy=256>,
 <tf.Tensor: shape=(), dtype=int64, numpy=625>,
 <tf.Tensor: shape=(), dtype=int64, numpy=1296>,
 <tf.Tensor: shape=(), dtype=int64, numpy=2401>,
 <tf.Tensor: shape=(), dtype=int64, numpy=4096>,
 <tf.Tensor: shape=(), dtype=int64, numpy=6561>]

In [47]:
# `filter`, é bastante eficaz em remover dados indesejados.
list(dataset.filter(lambda x: x<5))

[<tf.Tensor: shape=(), dtype=int64, numpy=0>,
 <tf.Tensor: shape=(), dtype=int64, numpy=1>,
 <tf.Tensor: shape=(), dtype=int64, numpy=2>,
 <tf.Tensor: shape=(), dtype=int64, numpy=3>,
 <tf.Tensor: shape=(), dtype=int64, numpy=4>]

<h3 style='font-size:30px;font-style:italic'> Shuffling the Data</h3>
<div> 
    <ul style='font-size:20px'>
        <li> 
            O método `shuffle` embaralha as instâncias com a seguinte lógica: escolhe as primeiras x instâncias do dataset e as agrupa em um buffer. Daí, as mistura e sorteia um dado, quando solicitado. Após a extração, o buffer fica com um espaço sobrando, que é preenchido com a próxima instância do dataset.
        </li>
        <li> 
            Por conta do algoritmo, é contraindicado usar `buffer_size`'s pequenos. A documentação do TF indica, até mesmo, designarmos um valor maior ou igual ao o dataset. Apenas cuidado com a memória RAM!            
        </li>
    </ul>
</div>

In [76]:
# Embaralhando um dataset artificial (`buffer_size`=10).

# `reshuffle_each_iteration` garante que um novo embaralhamento dos dados ocorrerá, caso usemos `repeat`, por exemplo.
dataset = tf.data.Dataset.range(5)
list(dataset.shuffle(5, seed=42, reshuffle_each_iteration=True).repeat(2))

[<tf.Tensor: shape=(), dtype=int64, numpy=0>,
 <tf.Tensor: shape=(), dtype=int64, numpy=4>,
 <tf.Tensor: shape=(), dtype=int64, numpy=1>,
 <tf.Tensor: shape=(), dtype=int64, numpy=3>,
 <tf.Tensor: shape=(), dtype=int64, numpy=2>,
 <tf.Tensor: shape=(), dtype=int64, numpy=3>,
 <tf.Tensor: shape=(), dtype=int64, numpy=4>,
 <tf.Tensor: shape=(), dtype=int64, numpy=1>,
 <tf.Tensor: shape=(), dtype=int64, numpy=0>,
 <tf.Tensor: shape=(), dtype=int64, numpy=2>]

<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> Interleaving lines from multiple files</h4>
<div> 
    <ul style='font-size:20px'>
        <li> 
            A fim de garantirmos a ordem aleatória dos dados no abastecimento de modelos, talvez seja interessante quebrarmos o set de treino em múltiplos arquivos. Assim, os lemos simultaneamente, intercalando as suas linhas. No final, conseguimos ainda aplicar a função `shuffle` para misturar todo o produto da leitura.
        </li>
    </ul>
</div>

In [120]:
# Diretório onde vão ser armazenados os dados.
from os import mkdir
mkdir('data')

In [121]:
from sklearn.datasets import fetch_california_housing
import pandas as pd
X,y = fetch_california_housing(return_X_y=True,as_frame=True)
data = pd.merge(X,y, left_index=True, right_index=True)

for i in range(0, len(data), 1000): # Forma rápida de gerar os arquivos, mesmo sabendo que vou perder algumas linhas.
    data.iloc[i:i+1000].to_csv(f'data/housing_{int(i/1000)}.csv')

In [122]:
# O método `list_files` localiza um conjunto de arquivos baseado em certo padrão nas strings.
# Retorna um dataset com os paths de todos os arquivos que estão de acordo com a string oferecida.
filepath_dataset = tf.data.Dataset.list_files('data/housing_*.csv', shuffle=True)
list(filepath_dataset)

[<tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_4.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_14.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_5.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_1.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_19.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_2.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_20.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_11.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_10.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_12.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_7.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_0.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_8.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housing_18.csv'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'data/housi

In [136]:
# A leitura dinâmica dos arquivos pode ser feita com o método `interleave`.

# Esse aplica uma função sobre todo o `Dataset` e intercala as suas linhas.
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers,
    # Por padrão, não há uso de paralelismo. Uma linha de cada dataset é lida por vez.
    # Use `num_parallel_calls` se quiser fazer esse processo por multithreading.
    num_parallel_calls=-1
)

In [142]:
# Mas veja: os valores das features estão todos contidos dentro de uma string bytes.

# É necessário ainda fazermos mais um tratamento, coletando os números individualizados.
list(dataset.take(5))

[<tf.Tensor: shape=(), dtype=string, numpy=b'12000,7.5408,3.0,8.493150684931507,1.0342465753424657,519.0,3.5547945205479454,33.93,-117.57,2.719'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'16000,4.0846,52.0,4.821316614420063,0.9561128526645768,819.0,2.5673981191222572,37.74,-122.47,3.336'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'3000,4.1484,10.0,4.791907514450867,0.8439306358381503,447.0,2.5838150289017343,35.3,-119.03,1.029'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'11000,13.466,26.0,8.874233128834355,1.0582822085889572,983.0,3.0153374233128836,33.75,-117.79,5.00001'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'9000,4.0,49.0,6.866295264623956,1.0362116991643453,1018.0,2.8356545961002784,34.0,-118.34,2.968'>]

<h3 style='font-size:30px;font-style:italic'> Preprocessing the Data</h3>
<div> 
    <ul style='font-size:20px'>
        <li> 
            Criaremos uma função que recebe uma linha do dataset e a retorna devidamente tratada.
        </li>
    </ul>
</div>

In [277]:
# Médias e desvios-padrão das colunas para padronização.
X_mean, X_std = X.mean().to_numpy(), X.std().to_numpy()

# Função de tratamento dos dados.
def preprocess(line):
    n_inputs = 8 # Quantidade de colunas presentes.
    # Lista com os valores-padrão a serem imputados nas células caso haja NaN's. Um tensor vazio será gerado caso 
    # o nulo ocorra em uma target-variable, acarretando em um erro.
    default_values = [0.]*n_inputs + [tf.constant([], dtype=tf.float32)]
    # Decodificando a linha. 'default_values' será usado na ocorrência de vazios.
    fields = tf.io.decode_csv(line, record_defaults=default_values) # `fields é uma lista de tensores que contêm um único número.
    # A função `stack` empilha os valores dos tensores em um único tensor unidimensional.
    x, y = tf.stack(fields[:-1]), tf.stack(fields[-1:])
    # Retornando as variáveis independentes padronizadas juntamente com as dependentes.
    return (x-X_mean)/X_std, y

In [244]:
preprocess(b'2.23,1.73,10.98,4.56,9.01,0.01,0.03,20.31,5.41')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ -0.8635921 ,  -2.1381242 ,   2.2435777 ,   7.3079667 ,
         -1.250785  ,  -0.29468906, -16.66791   ,  69.81657   ],
       dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.41], dtype=float32)>)

<h3 style='font-size:30px;font-style:italic'> Putting Everything Together</h3>
<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> Prefetching</h4>
<div> 
    <ul style='font-size:20px'>
        <li> 
            O método `prefetch` contribui para a performance de nosso programa. Enquanto um dado é processado, o método faz com que a próxima instância (ou batch) passe pelo tratamento em paralelo, economizando tempo.
        </li>
    </ul>
</div>

<center>
    <img src='prefetch.png'>
</center>

In [273]:
range_dataset = tf.data.Dataset.range(10)

# Preparando a próxima instância proveniente de 'range_dataset'.
range_dataset.prefetch(1)

# Preparando o próximo batch criado a partir de 'range_dataset'.
range_dataset.repeat(3).prefetch(1)

<PrefetchDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>

In [298]:
# Finalizando a seção com uma função  que realiza todos os passos demonstrados.
def csv_reader_dataset(filepath:str, cycle_length:int=5, buffer_size:int=1000, repeat:int=3, batch_size:int=32):
    filepaths = tf.data.Dataset.list_files(filepath) # Coletando todos os arquivos correspondentes ao padrão `filepath`.
    # Lendo `cycle_length` arquivos, pulando as suas primeiras linhas. 
    dataset = filepaths.interleave(lambda x: tf.data.TextLineDataset(x).skip(1), 
                                   cycle_length=cycle_length, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).cache() # Tratando os dados e os armazenando em cache.
    dataset = dataset.shuffle(buffer_size).repeat(repeat) # Embaralhando e repetindo os dados.
    # 
    return dataset.batch(batch_size, num_parallel_calls=tf.data.AUTOTUNE).prefetch(1)

<div> 
    <ul style='font-size:20px'>
        <li> 
            Usamos `cache` após o tratamento, mas antes do shuffling e batching. Isso garantirá que a mistura e batches serão distintos a cada iteração. 
        </li>
    </ul>
</div>

In [303]:
# Teste da função.
csv_reader_dataset('data/housing_*.csv')

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 8), dtype=tf.float32, name=None), TensorSpec(shape=(None, 1), dtype=tf.float32, name=None))>

<p style='color:red'> Using the Dataset with tf.kerasUsing the Dataset with tf.keras
</p>