# **Data API in TensorFlow**

## Basics Dataset
Representan una sequencia de elementos, usualmente leidos gradualmente desde el disco. Por simplicidad crearemos un ejemplo que cabe en la memoria RAM.
 
En este ejemplo la funcion `from_tensor_slices()` toma un tensor  y crea un `tf.data.Dataset` con cada elemento de X(a lo largo de la primera dimension ).Entonces *dataset* contiene 10 elementos

In [31]:
import tensorflow as tf

X= tf.range(10) # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset
# dataset = tf.data.Dataset.range(10)

<TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [32]:
for tensor in dataset:
    print(tensor)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


tf.data.AUTOTUNE
tf.data.AUTOTUNE
Tenga en cuenta que cada operacion retorna un nuevo `dataset` y que se tiene el parametro **num_parallel_calls** para usar el paralelismo y se puede dejar que TF elija el numero de hilos óptimos disponibles mediante `tf.data.AUTOTUNE`

In [33]:
dataset = dataset.repeat(3).batch(7)
for item in dataset: 
    print(item)


tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


Como se puede observar, no los datos estan odenados(**no shuffle**). Además el metodo `repeat()` no copia los elementos en **memoria** tres veces,(de hecho si se llama repeat() sin parametros devuleve un iterador para siempre de los datos)
 
También se pueden realizar transfromaciones usando el metodo **map()**

In [34]:
dataset_square = dataset.map(lambda x: x * 2)


Minetras que  `map()` aplica trasformacion a cada elemento, `apply()` aplica una trasformacion al dataset entero

In [36]:
dataset = tf.data.Dataset.range(100)


def dataset_fn(ds):
    return ds.filter(lambda x: x < 5)


dataset = dataset.apply(dataset_fn)
list(dataset.as_numpy_iterator())

[0, 1, 2, 3, 4]

También se pued esolo filtrar 

In [38]:
dataset = dataset.filter(lambda x: x < 10)

Cuando necesita unos pocos elementos pued eusar el `take()` para iterar sobre el dataset

In [55]:
for item in dataset.take(3):
    print(item)

list(dataset.take(3).as_numpy_iterator())

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)


[0, 1, 2]

## Suffling the Data

el método `shuffle()` creará un nuevo conjunto de datos que comenzará llenando un **búfer** con los primeros elementos del conjunto de datos de origen. A
continuación, cada vez que se le pida un elemento, extraerá uno al azar del búfer y lo sustituirá por uno nuevo del conjunto de datos de origen, hasta que haya iterado por completo a través del conjunto de datos de origen.
 
Debes especificar el tamaño del **buffer**, y es importante que sea lo suficientemente grande

In [62]:
dataset = tf.data.Dataset.range(10).repeat(3)  # 0 to 9, three times
dataset = dataset.shuffle(buffer_size=10,seed=42).batch(10)
for item in dataset:
    print(item)


tf.Tensor([5 2 8 1 7 9 2 0 0 4], shape=(10,), dtype=int64)
tf.Tensor([6 3 1 0 9 3 7 6 7 8], shape=(10,), dtype=int64)
tf.Tensor([3 4 6 8 4 1 5 9 5 2], shape=(10,), dtype=int64)


Para un gran conjunto de datos que no cabe en la memoria, este simple enfoque de barajar el búfer puede no ser suficiente, ya que el búfer será pequeño en comparación con el conjunto de datos. Una solución puede ser: 

## Interleaving lines from multiple files


Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it:

In [63]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

For a very large dataset that does not fit in memory, you will typically want to split it into many files first, then have TensorFlow read these files in parallel. To demonstrate this, let's start by splitting the housing dataset and save it to 20 CSV files:

In [66]:
import os
import numpy as np 


def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("resources", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths


In [67]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

Okay, now let's take a peek at the first few lines of one of these CSV files:

In [None]:
import pandas as pd

pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


Or in text mode:

In [69]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline(), end="")

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


Now let’s create a dataset containing only these file paths
Alternatively, you could use file patterns; for example, train_filepaths = "resources/housing/my_train_*.csv"
 
By default, the `list_files()` function returns a dataset that **shuffles** the filepaths 

In [70]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)


In [140]:
for _ in filepath_dataset.take(5):
    print(_)

tf.Tensor(b'resources\\housing\\my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'resources\\housing\\my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'resources\\housing\\my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'resources\\housing\\my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'resources\\housing\\my_train_16.csv', shape=(), dtype=string)


El método `interleave()` creará un conjunto de datos que extraerá cinco rutas de archivo del conjunto de datos **filepath_dataset**, y para cada
una de ellas llamará a la función que le hayas dado (una lambda en
este ejemplo) para crear un nuevo conjunto de datos (en este caso un `TextLineDataset`).  (saltándose la primera línea de cada archivo , que es la fila de header, mediante el método `skip()`)
 
Para que quede claro, en esta fase habrá siete conjuntos de datos en total: el conjunto de datos filepath, el conjunto de datos interleave y los cinco TextLineDatasets creados internamente por el conjunto de datos interleave.
 
Cuando iteremos sobre el conjunto de datos de
intercalación, se desplazará a través de estos cinco conjuntos de datos TextLineDataset, leyendo una línea a la vez de cada uno hasta que **todos los conjuntos de datos** se queden sin elementos.

By default`interleave()`  does not use parallelism(you can set the num_parallel_calls argument to the number
of threads you want)

In [122]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers)


In [127]:
for line in dataset.take(5):
    print(line.numpy())

list(dataset.take(3).as_numpy_iterator())


b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159'
b'4.2083,44.0,5.323204419889502,0.9171270718232044,846.0,2.3370165745856353,37.47,-122.2,2.782'
b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504'
b'3.6641,17.0,5.577142857142857,1.1542857142857144,511.0,2.92,40.85,-121.07,0.808'


[b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205',
 b'3.9688,41.0,5.259786476868327,0.9715302491103203,916.0,3.2597864768683276,33.98,-118.07,1.698',
 b'3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442']

## Preprocessing the Data
* En primer lugar, el código asume que hemos calculado
    previamente la media y la desviación estándar de cada
    característica del conjunto de entrenamiento. X_mean y X_std son     tensores 1D (o matrices NumPy) que contienen ocho valores  flotantes, uno por característica de entrada.

* La función `preprocess()` toma una línea CSV y comienza
analizándola. Para ello utiliza la función `tf.io.decode_csv()`, que toma dos argumentos: el primero es la línea a analizar, y el
segundo es una matriz que contiene el valor por defecto para
cada columna en el archivo CSV. Esta matriz le dice a
TensorFlow no sólo el valor por defecto para cada columna, sino
también el número de columnas y sus tipos. En este ejemplo, le
decimos que todas las columnas de características son **float** y
que si faltan deberían ser por defecto 0, pero proporcionamos un array vacío de tipo **tf.float32** como valor por defecto para la última columna (the target): el array indica a TensorFlow que esta columna contiene float, pero que levante una excepcion si falta ese valor.
* `decode_csv()` function returns a list of scalar tensors (one
per column), but we need to return 1D tensor arrays. So we call
`tf.stack()` on all tensors except for the last one (the target): this will stack these tensors into a 1D array. We then do the same for the target value (this makes it a 1D tensor array with a single
value, rather than a scalar tensor)

In [135]:

X_mean, X_std = [0.]*8, [1.]*8  # mean and scale of each feature in the training
set
n_inputs = 8


def preprocess(line):

    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y


In [137]:
for line in dataset.take(1):
    print(preprocess(line))

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
array([ 1.6571000e+00,  3.4000000e+01,  4.4549761e+00,  1.0876777e+00,
        1.3580000e+03,  3.2180095e+00,  3.7939999e+01, -1.2235000e+02],
      dtype=float32)>, <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.052], dtype=float32)>)


## Putting everything together
Para que el código sea reutilizable, vamos a reunir todo lo que hemos visto hasta ahora en una pequeña función de ayuda: creará y devolverá un conjunto de datos que cargará eficazmente los datos de housing in California desde varios archivos CSV, los preprocesará, los mezclará, los
repetirá opcionalmente y los procesará por lotes 

In [147]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):

    # obtener un generador infnito default:shuffle de de listas de archivos csv
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)

    # obtener de manera intercruzada los datos en cada linea de los archivos csv
    dataset = dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                 cycle_length=n_readers, num_parallel_calls=n_read_threads)

    # volver abarajar los datos
    dataset = dataset.shuffle(shuffle_buffer_size)

    # aplicar el preprocesamineto a cada linea
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)

    # obtener el conjunto de datos en batches
    dataset = dataset.batch(batch_size)

    # obtener un dataset que hará todo lo posible para estar siempre 1 batch por delante
    return dataset.prefetch(1)


`prefetch(tf.data.AUTOTUNE)`  posibilita q mientras el algoritmo de entrenamiento está trabajando en
un lote, el conjunto de datos ya estará trabajando en paralelo para preparar
el siguiente lote (por ejemplo, leyendo los datos del disco y
preprocesándolos)

Si además nos aseguramos de que la carga y
el preprocesamiento son multihilo (estableciendo `num_parallel_calls` al
llamar a `interleave()` y `map()`), podemos explotar múltiples núcleos de la
CPU y, con suerte, hacer que la preparación de un lote de datos sea más
corta que la ejecución de un paso de entrenamiento en la GPU: de esta
forma, la GPU se utilizará casi al 100% (excepto por el tiempo de
transferencia de datos de la CPU a la GPU3), y el entrenamiento se
ejecutará mucho más rápido
![prefetch](resources/prefetch.png)

If the dataset is small enough to fit in memory, you can significantly speed
up training by using the dataset’s `cache()` method to cache its content to
RAM. You should generally do this after loading and preprocessing the
data, but before **shuffling, repeating, batching, and prefetching**. This way,
each instance will only be read and preprocessed once (instead of once per
epoch), but the data will still be shuffled differently at each epoch, and the
next batch will still be prepared in advance

In [1]:
for m in dir(tf.data.Dataset):
    if not (m.startswith("_") or m.endswith("_")):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, "__doc__"):
            print("● {:21s}{}".format(m + "()", func.__doc__.split("\n")[0]))


● apply()              Applies a transformation function to this dataset.
● as_numpy_iterator()  Returns an iterator which converts all elements of the dataset to numpy.
● batch()              Combines consecutive elements of this dataset into batches.
● bucket_by_sequence_length()A transformation that buckets elements in a `Dataset` by length.
● cache()              Caches the elements in this dataset.
● cardinality()        Returns the cardinality of the dataset, if known.
● choose_from_datasets()Creates a dataset that deterministically chooses elements from `datasets`.
● concatenate()        Creates a `Dataset` by concatenating the given dataset with this dataset.
● element_spec()       The type specification of an element of this dataset.
● enumerate()          Enumerates the elements of this dataset.
● filter()             Filters this dataset according to `predicate`.
● flat_map()           Maps `map_func` across this dataset and flattens the result.
● from_generator()     Create