#ETL con Apache Koalas usando Amazon Dataset 

Un **ETL pipeline** es un proceso común para almacenar y produce datos más confiables y enriquecidos y que consiste en tres pasos: **Extracción** de los datos, de una o múltiples fuentes; **Transformación** de los mismos; y **Carga (o Load)** de esos datos en un destino común (ie: Data Lake). 

En la siguiente demo, veremos cómo aplicar un ETL Pipeline básico en Databricks, usando Apache Koalas, y tomando como dataset un listado de Usuarios en Amazon.


#### Explorando datos
El Amazon Dataset, está almacenado en el Databricks File System (o DBFS) como uno de los datasets de ejemplo que ofrece Databricks. En el path `/databricks-datasets/amazon/users` hay un listado de usuarios (con sus IDs) en formato CSV.

In [2]:
print(dbutils.fs.ls("/databricks-datasets/amazon/users"))

## Paso 1: Extracción del Dataset
Comenzamos con la primer fase del ETL: la **Extracción** de los datos de los usuarios de Amazon, que almacenamos en memoria en un DataFrame llamado `users_df`.

In [4]:
import numpy as np
import databricks.koalas as ks

users_path = "/databricks-datasets/amazon/users/"

In [5]:
users_df = ks.read_csv(users_path)

In [6]:
users_df.head()

Unnamed: 0,user
0,84838498
1,424687383
2,474450171
3,1691900178
4,959087395


## Paso 2: Análisis y Transformación

La segunda fase del ETL es la **Transformación** del DataFrame de usuarios en el `unique_users_df` DataFrame, que no contiene duplicados.

Primero, verificamos el total de usuarios en `users_df`, y cuantos de ellos están repetidos o son null.

In [9]:
null_values = users_df.isnull().sum()['user']
unique_users = users_df.nunique()['user']
total_users = users_df.count()['user']

In [10]:
print("Null values: {0}".format(null_values))
print("Unique Users: {0}".format(unique_users))
print("Total Users: {0}".format(total_users))


Como el `total_users` es distinto al `unique_users`, eso significa que hay usuarios duplicados. 
Crearemos un nuevo DataFrame llamado `unique_users_df` para eliminar usuarios duplicados, y para eso usamos `unique()`.

In [12]:
unique_users_df = ks.DataFrame(users_df['user'].unique())
print("Total unique users: {0}".format(unique_users_df.count()['user']))

In [13]:
unique_users_df.head(10)

Unnamed: 0,user
0,785573682
1,1325428211
2,931960654
3,58723512
4,131984955
5,1229690137
6,893812680
7,890537209
8,491959511
9,1081196643


## Paso 3: Carga de los datos procesados

El tercer paso es la **Carga** de los datos generados, que almacenamos de forma permanente. En este caso, se guardarán en el path `/unique_users.parquet` en formato Parquet.

In [15]:
amazon_path = "/FileStore/tables/amazon"
unique_users_path = "{0}/unique_users.parquet".format(amazon_path)

In [16]:
unique_users_df.to_parquet(unique_users_path)

## Paso 4: Validación de los datos almacenados (Opcional)

Luego de generar los archivos finales de salida, podemos verificar que se hayan escrito correctamente.

In [18]:
amazon_path = "/FileStore/tables/amazon"
users_path = "{0}/unique_users.parquet".format(amazon_path)

In [19]:
unique_users_new_df = ks.read_parquet(users_path)
print("Total unique users: ", unique_users_new_df.count())

Como no tenemos duplicados, el nuevo dataset nos quedó con la cantidad exacta de usuarios.

## Paso 5: Conclusión
Finalmente, ¡el ETL está terminado! Ahora,e ste set de usuarios puede ser consumido o utilizado por otras notebooks o programas con distintos fines (como visualizar datos, construir modelos de Machine Learning, etc).