In [3]:
from IPython.display import Image

# Flujo de Información

Regularmente existen varios pasos de procesamiento para preparar los datos.
  - Extraer los datos (desde una carpeta, internet, una base de
    datos) e importarlos.
  - Validar los datos.
  - Transformarlos a un formato más adecuado.
  - Ejecutar agregaciones y generación de variables.
  - Modelos


## Pipeline
> Un pipeline consiste en un flujo de pasos separados de forma limpia, representados en un grafo dirigido. 

- Desacopla los pasos de procesamiento (como la limpieza, la agregación, la unión) del uso final de los datos. 
    - Facilita la reutilización
- Agnóstico de la tecnología.
- El linaje de datos es fácil de seguir.
- Funciona de forma coordinada

<font color=red> 
 Un pipeline hace algo, no es un repositorio

![title](img/data-science.png)

## Puntos clave:


1. Reproducible   
2. Consistente     
3. Adaptable  
4. **Modularizable**

***


## Antes:


### Opción 1

En bash se corren scripts de python
```
> python import.py   
> python tidy.py  
> python transform.py    
> python model.py

```


### Opcion 2
- Definir una interfaz de línea de comando para cada tarea
- Se vuelve una tarea larga e intensiva apilada

```
> if __name__ == '__main__':    
>     get_data()   
>     clean_data()    
>     join_data()    
>     run_model()    
```


En bash:    
`python do_everything.py`




## Ahora - Orquestador

Se busca que sea:
- Representado por un grafo dirigido acíclico (DAG).
- Modularizable
- Identificable (como funciones)
- Idempotente
- Que soporte puntos de control (checkpointing)

Al concepto de coordinar y gestionar programación se les conoce como orquestación.

  
<img src="img/dag1.png" alt="Drawing" style="width: 500px;"/>
<img src="img/dag2.jpg" alt="Drawing" style="width: 400px;"/>

###  Herramientas:

- [Google Cloud Workflow](https://cloud.google.com/dataflow/?hl=es-419)
- [Apache Beam](https://beam.apache.org/)
- [Pegasus](https://pegasus.isi.edu/)
- [Luigi](http://luigi.readthedocs.io/en/stable/workflows.html)
- [DataPipeline (Java)](https://northconcepts.com/docs/what-is-data-pipeline/)

<img src="img/luigi.png" alt="Drawing" style="width: 80px;"/>

***

<img src="img/luigi_logo.png" alt="Drawing" style="width: 200px;" align="left"/>

Flujo de trabajo por lotes desarrollado por Erik Bernhardsson, Elias Freider y otros en Spotify como orquestador en 2012.

## ¿Por qué Luigi?

- Software libre
- Gratuito
- Hace que sea fácil escribir código modular y crear dependencias entre tareas.
- Escrito en _python_ y se pueden ejecutar tasks de: R, bash, Spark, SQL, etc
- Sintaxis consistente y tareas especializadas
- Soporta herramientas externas: _postgresql_, _s3_, _Hadoop_
- Idempotente
- Identificable, lo que evita tareas duplicadas
- Puntos de Control / Checkpoints
- Tiene una interfaz gráfica
- ¡Manda correos electrónicos!
- Servidor central que distribuye tareas automáticamente optimizando tiempo y recursos.


***


## ¿Cómo construir el flujo?

Los flujos de trabajo consisten en dos tipos de clases **Tasks** y **Tagets**.

### Tasks
- Son definidos como clases the python que heredan de una clase de luigi 
- Los tasks son end donde se lleva a cabo la ejecución 
- Dependen unas de otras y producen checkpoints


***



Cada  tarea contiene métodos que se deben especificar:


- **`requires()`** Es el método que se usa para especificar las dependencias con otros Tasks, incluso pueden ser de la misma clase.

- **`output()`**  Regresa los objetivos o checkpoints, tipicamente son archivos o cualquier Target definida.

- **`run()`**  Aquí va todo el código que la tarea debe ejecutar como su trabajo.



### Targets 
Es una clase destino que corresponde a un archivo en un disco, un archivo en HDFS o algún tipo de punto de control, como una entrada en una base de datos. En realidad, el único método que los Targets deben implementar es el método exists, que devuelve True si y solo si el objetivo existe.

En la práctica, rara vez se necesita implementar subclases de Target. Luigi viene con una caja de herramientas de varios objetivos útiles. En particular, LocalTarget.

### Tipos de tareas:

- **luigi.Task**
    - Requiere especificar los tres métodos: *requieres*, *run* y *output*

- **luigi.WrapperTask**
  - Sirve para disparar varias clases, 
  - Sólo hay que  especificar el método *requires*
  
  
- **luigi.ExternalTask**
  - Representa un objeto externo al pipeline, por ejemplo un archivo.
  - Sólo hay que codificar el método *output*
  
  
### Otros componentes:

- **luigi server**
  - Servidor encargado de la orquestación remota de los Task
  
  
- **Parámetros**
  - Son como los argumentos de los Task, deben de ser diferentes si queremos
    varias ejecuciones de los mismos. Un ejemplo puede ser la fecha o los
    hiperparámetros de un modelo.
 
 
- **Archivo luigi.cfg**
  - Aquí se configura varios de los parámetros de los Task y del servidor de Luigi
  
  
***


### Ejemplo 1

Correr con alguno de los dos comandos:
```
python luigi_helloworld.py --local-scheduler HelloWorld   
PYTHONPATH='.' luigi --module luigi_helloworld --local-scheduler HelloWorld
```

In [1]:
import luigi

class HelloWorld(luigi.Task):
    
    def requires(self):
        return None
    
    def output(self):
        return luigi.LocalTarget('../outputs/helloworld.txt')
    
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')

### Ejemplo 2

Generar Tasks a partir de la siguiente estructura:

<img src="img/dag1.png" alt="Drawing" style="width: 400px;"/>

Correr con:
```
PYTHONPATH='.' luigi --module luigi_example --local-scheduler AllTasks
```

In [2]:
import luigi
import time


# Meta Task
class TaskExample(luigi.Task):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outputs/{}.txt'.format(self.filename))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('DONE!')

# DAG
class TaskA(TaskExample):
    def requires(self):
        return None

class TaskG(TaskExample):
    def requires(self):
        return None

class TaskB(TaskExample):
    def requires(self):
        return TaskA('A')

class TaskC(TaskExample):
    def requires(self):
        return TaskB('B')

class TaskD(TaskExample):
    def requires(self):
        return [TaskB('B'), TaskG('G')]

class TaskE(TaskExample):
    def requires(self):
        return [TaskC('C'), TaskB('B'), TaskD('D')]

class TaskF(TaskExample):
    def requires(self):
        return TaskE('E')

class AllTasks(luigi.WrapperTask):
    def requires(self):
        return TaskF('F')

## Planificador local y global


1. En una terminal por separado:
```
luigid
```

2. Abrir en el navegador: http://localhost:8082

3. En una terminal, correr el ejemplo anterior definiendo "localhost" como el "--scheduler-host":
```
PYTHONPATH='.' luigi --module luigi_example --scheduler-host localhost AllTasks
```


### Visualizador Gráfico

<img src="img/luigi-ex-graph.png" alt="Drawing" style="width: 600px;"/>

***


## Ejemplo Iris

### Objetivo: 
Clasificar flores a partir de caractértisticas del sépalo y sus pétalos.
- Los datos se encuentran en `data/iris.csv`


### Tasks:
- `IrisData()` : Revisa que existan los datos
- `TrainTestSplit()`: Separa los datos en entrenamiento y prueba y los guarda en csv
- `TrainModel()`: Entrena el modelo
- `IrisPipeline()`: Manda a llamar a las diferentes modelos

### Parámetros
- `models`: los tipos de modelos a correr
- `features`: variables del modelo
- Hiperparámetros de cada modelo


*** 

### Interfaz gráfica

<img src="img/luigi-interface.png" alt="Drawing" style="width: 800px;"/>

# En resumen

- Pipeline: flujo de pasos separados de forma limpia, representados en un grafo dirigido.
- No es secuencial
- Se puede retomar en cualquier parte
- Se tiene mayor control 
- Es más reproducible