<a href="https://colab.research.google.com/github/institutohumai/cursos-python/blob/master/GCP/05_Airflow/Airflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" data-canonical-src="https://colab.research.google.com/assets/colab-badge.svg"></a>

# 1. Airflow: desarrollo, scheduling y monitoring de trabajos.

<a href="https://airflow.apache.org/">
<img alt="Logo de Airflow" src="./images/AirflowLogo.png" height="150px"/>
</a>

## 1.1. Introducción a conceptos básicos

### ¿Qué es Airflow?

&nbsp;&nbsp;&nbsp;&nbsp;Son las 3 de la mañana y suena la alarma. Medio dormida, la persona a cargo del pipeline se levanta y se acerca su computadora. Con la mirada medio perdida, aprieta "enter" sobre el comando pre-escrito que tenía sobre su terminal y que va a procesar los datos del dia requeridos para el dia siguiente a primera hora. Se da vuelta y se dirije de nuevo hacia la cama preguntadose si no habrá otra manera de schedulear tareas... 

&nbsp;&nbsp;&nbsp;&nbsp;**Airflow** es una plataforma open-source que nos permite desarrollar, schedulear y monitorear trabajos, de manera que podemos organizarlas en flujos de tareas. Es un projecto comenzado por Airbnb haya por el 2014 para manejar los complejos flujos de trabajo que existia en una empresa de ese tamaño y que desde su concepción fue hecho [open-source](https://github.com/apache/airflow) y que al momento de escribir este material tiene tan solo 28K estrellas en github...

### ¿Pára que sirve?

&nbsp;&nbsp;&nbsp;&nbsp; Como ya mencionamos, Airflow sive para orquestar trabajos en batch en donde existe una interdependencia entre los mismos. Para esto se hace de operadores de diversos tipos que permiten conectar y utilizar una poco módica cantidad de frameworks y tecnologías.
Esto quiere decir que si tu flujo de tareas tiene un principio y un fin bien definidos y que siempre se deben correr en un intervalo constantes, entonces se pueden **programar** en Airflow como un DAG. Hago énfasis en la palabra programar, porque es así como se orquestan las tareas, a través de código python. Airflow es una herramienta que nos brinda una hermosa UI para monitoreo y gestión de los flujos de trabajo, así como de las tareas individuales que los compone, pero no está pensada para crear tus flujos de trabajo en la misma.

| <img alt="Grafo de Hello World" src="images/hello_world_graph_view.png" /> |
|:--:|
|*Ejemplo de un simple flujo de trabajo en Airflow*|


&nbsp;&nbsp;&nbsp;&nbsp; El hecho de que los flujos de trabajo sean código nos brinda la oportunidad de:
* Versionar como cualquier otro código con git.
* Diferentes personas pueden trabajar en simultaneo en un mismo flujo de trabajo en diferentes etapas y colaborativamente en determinar el correcto flujo del mismo.
* Se pueden escribir tests para las funcionalidad.
* Los componentes son extensibles. Si necesitás agregar más funcionalidad de la que ya brinda Airflow, podés crear tus propios operadores.
* El scheduling tambien es programable, por lo que si las reglas de "cuando lanzar el flujo" son extrañas pero programables, se pueden schedulear en Airflow.

&nbsp;&nbsp;&nbsp;&nbsp; El código que genera el flujo que se mostró en la imagen anterior es el siguiente y en donde se muestra 2 maneras diferentes de especificar una tarea:

```python
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# Un DAG representa un flujo de trabajos, al menos una coleccíon 
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

    # Las tareas se representan con operadores
    hello = BashOperator(task_id="hello", bash_command="echo hola humai!")

    # Las tareas que se defininen con el decorador, son tareas de PythonOperator
    @task()
    def airflow():
        print("hola airflow!")

    # Se establece el orden en el que deben dar las tareas del flujo
    hello >> airflow()
```

---

### 1.2 Instalación

&nbsp;&nbsp;&nbsp;&nbsp; Para usar Airflow existen varias alternativas como usarlos adentro de Docker, adentro de Kubernetes o standalone de manera local.

&nbsp;&nbsp;&nbsp;&nbsp; Para instalar de manera local solamente el core (como mencionamos existe un buen número de plugins y addons para Airflow), podemos correr el siguiente comando:

```bash
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.4.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
```

&nbsp;&nbsp;&nbsp;&nbsp; Podemos corrobarad que todo se instaló correctamente verificando la versión instalada:

```console
root@6ff3c3340c9b:/airflow# airflow version
2.4.2
```

&nbsp;&nbsp;&nbsp;&nbsp; Una vez verificado que está bien instalado, podemos levantar la plataforma con `airflow standalone`. Luego de inicializar el servidor web, podremos acceder a la dirección `localhost:8080` para acceder a la UI. En esa direccioó encontraremos una pantalla de login.

| <img alt="Login Airflow" src="images/AirflowLogIn.png" height="400px"/> |
|:--:|
|*Login de Airflow*|

&nbsp;&nbsp;&nbsp;&nbsp; Las credenciales para acceder se muestran en los logs de la terminal:

| <img alt="Credenciales de login Airflow" src="images/AirflowLonginCreds.png" height="400px"/> |
|:--:|
|*Logs de `airflow standalone`*|*

**(La contraseña de `admin` tambien se puede encontrar en `$AIRFLOW_HOME/standalone_admin_password.txt`. Si no se tocó `$AIRFLOW_HOME` -> `cat ~/airflow/standalone_admin_password.txt`)*

&nbsp;&nbsp;&nbsp;&nbsp; Y voilà! Estamos adentro:

| <img alt="UI de Airflow" src="images/AirflowUI.png"/> |
|:--:|
|*UI de Airflow*|


---

### 1.3 Primera ejecución

&nbsp;&nbsp;&nbsp;&nbsp; Para empezar a generar una pequeña intuición de como funciona Airflow, hagamos una primera ejecucion de un flujo de trabajo, en este caso `example_bash_operator`. Para esto tenemos 2 opciones:
* O bien lanzamos una ejecucion para un flujo desde la UI: 

| <img alt="Ejecutar un Airflow" src="images/ExecDag.png"/> |
|:--:|
|*Ejecución de un DAG de Airflow*|

* O mismo desde la consola:

```console
root@6ff3c3340c9b:/airflow# airflow dags trigger example_bash_operator
[2022-10-26 15:55:58,868] {__init__.py:42} INFO - Loaded API auth backend: airflow.api.auth.backend.session
Created <DagRun example_bash_operator @ 2022-10-26T13:55:58+00:00: manual__2022-10-26T13:55:58+00:00, state:queued, queued_at: 2022-10-26 13:55:58.927668+00:00. externally triggered: True>
```

&nbsp;&nbsp;&nbsp;&nbsp; Ahora si lo activamos desde la consola vamos a ver que la tarea está encolada para ejecución: 
| <img alt="Cola Airflow" src="images/AirflowQueue.png"/> |
|:--:|
|*Flujo de ejecución encolado*|

&nbsp;&nbsp;&nbsp;&nbsp; Ajam... ¿Y por qué no se ejecuta?. Si notamos a la izquierda del flujo de trabajo veremos que está desactivado, es decir, no está siendo tenido en cuenta por el scheduler. De modo tal que si queremos que corrar debemos prenderlo, ya sea haciendo click en la UI o corriendo el siguiente comando

```console
root@6ff3c3340c9b:/airflow#  airflow dags unpause example_bash_operator
Dag: example_bash_operator, paused: False
```
| <img alt="Ejecución exitosa" src="images/AirflowSuccess.png"/> |
|:--:|
|*Ejecuciones exitosas*|


&nbsp;&nbsp;&nbsp;&nbsp; Bien!! Ahora si pudimos correr nuestras... ¿2 ejecuciones?. ¿Por qué hay 2 ejecuciones? Si clickeamos en la UI, veremos que las 2 ejecuciones exitosas fueron:
* Una ejecución manual, la que invocamos nosotros ya sea por CLI o por Web.
* Una ejecución scheduleada. Esta última fue ejecutada por el **Airflow Scheduler** a la hora de activar un DAG. Este es un buen punto a tener en cuando a la hora de comenzar a trabajar con Airflow: siempre se va a realizar una ejecución en el momento de activar el DAG correpondiente a la que debería haber sido la última ejecución scheduleada. En este caso podemos ver que era la correspondiente al dia de ayer a las 00:00.

| <img alt="Doble ejecución" src="images/AirflowDouble.png"/> |
|:--:|
|*Diferentes tipos de ejecuciones*|

&nbsp;&nbsp;&nbsp;&nbsp; Y si clickeamos en alguna de las 2 ejecuciones, lo primero que nos mostrará será el grafo de ejecución junto con sus resultados:

| <img alt="Resultados del flujo" src="images/DagResults.png"/> |
|:--:|
|*Resultados del flujo*|
 
 *(El código del dag puede verse en la pestaña "Code" y [en este link](extras/example_bash_operator.py))*


---

### 1.4 Conceptos principales y arquitectura

&nbsp;&nbsp;&nbsp;&nbsp; Ahora que corrimos un flujo de trabajo y tuvimos un primer encuentro con como funciona Airflow, hagamos un paso para atrás y hablemos de la terminología y conceptos que conlleva la plataforma y que ya venimos usando.

&nbsp;&nbsp;&nbsp;&nbsp; Dentro de Airflow, un flujo de trabajo es representado por un grafo acíclico dirigido o DAG (del inglés Directed Acyclic Graph), lo que quiere decir que no hay forma de comenzar en un punto *p* y encontrar un camino que vuelva a *p*. Estos DAGs estan compuesto de unidades de trabajo llamadas tareas (Tasks) de manera tal de que entre ellas se establezca el orden de prioridad en terminos de dependencias o flujo de datos. Por ejemplo, que hacer si todo va bien, como si no:

| <img alt="Ejemplo de flujo" src="images/edge_label_example.png"/> |
|:--:|
|*Ejemplo de DAG*|

&nbsp;&nbsp;&nbsp;&nbsp; En términos de arquitectura, Airflow consiste de una serie de componentes que trabajan en conjunto:

* El scheduler que se encarga tanto de triggerear nuevos flujos como de enviarle al **executor** tareas a ejecutar (como hicimos en nuestro ejemplo).

* El mencionado executor que se encarga de efectivamente ejecutar tareas. Para el entorno en el que estamos trabajando (desarrollo), el executor está embebido en el scheduler, pero en un ambiente productivo se puede (y debe) usar un executor propiamente dicho (Por ejemplo: [CeleryExecutor](https://github.com/celery/celery) o [KubernetesExecutor](https://kubernetes.io/es/)) y enviar las tareas a workers.

* El webserver que nos presenta una herramienta visual para controlar el estado, triggerear y debugear nuestros DAGs y sus tareas.

* Una carpeta en donde vamos a guardar nuestros DAGs. Esta carpeta va a ser leida tanto por el scheduler como por el executor y todos sus workers (De otro modo no sabrían cual de nuestras obras maestras ejecutar).

* Una base de datos para guardar la metadata. Es usada por el scheduler, el executor y el webserver. En nuestro caso de `standalone`, Airflow usa una base de datos [SQlite](https://www.sqlite.org/index.html) que se puede encontrar en `$AIRFLOW_HOME/airflow.db`

| <img alt="Arquitectura de Airflow" src="images/arch-diag-basic.png"/> |
|:--:|
|*Arquitectura de Airflow*|

&nbsp;&nbsp;&nbsp;&nbsp; Como ya vimos, un DAG es una serie de tareas que normalmente es de alguno de estos 3 tipos:
* Operadores (Operators), tareas predefinidas que se pueden conectar facilmente para generar un flujo. Los más comunes son BashOperator (que ejecuta comandos o scripts de bash), PythonOperator (que ejecuta una función de Python) y EmailOperator (que manda un mail). Pero estos no son todos, uno puede crear sus propios operadores custom o usar alguno de 60+ paquetes creados por diferentes providers, como Google Calendar o AWS S3.
* Sensores (Sensors), una subclase de operadores que se basan en esperar eventos externos para comenzar.
* Tareas determinadas por el decorador `@task` que son funciones de Python, similares a las que se pueden obtener con un PythonOperator. Esta opción es similar a como se construyen flujos de datos en herramientas como [Metaflow](https://metaflow.org/) e internamente lleva el nombre de TaskFlow.

&nbsp;&nbsp;&nbsp;&nbsp; Dado que ya sabemos como crear tareas, ahora nos toca organizarlas. Para Airflow, los DAGs están diseñados para correrse muchas veces e incluso en paralelo, contemplando que la data que se va a procesar es la del intervalo que hay en su periodo de scheduling (Por ejemplo si se schedulea con `@daily`, la información a procesar debería ser aquella generada entre las 00:00 y las 24:00).

&nbsp;&nbsp;&nbsp;&nbsp; Para representar la dependencia de una tarea con otra (o otras) se pueden usar 2 sintaxis:

* A través de `>>` y `<<`:
```python
primera_tarea >> [segunda_tarea, tercera_tarea]
cuarta_tarea << tercera_tarea
```

* A través de `set_upstream` y `set_downstream`:
```python
primera_tarea.set_downstream([segunda_tarea, tercera_tarea])
cuarta_tarea.set_upstream(tercera_tarea)
```

&nbsp;&nbsp;&nbsp;&nbsp; En el contexto de los DAGs, las mencionadas dependencias representan los arcos. Por defecto, una tarea va a esperar a que todas las tareas que tiene debajo hayan terminados exitosamente antes de comenzar(*), pero eso se puede customizar para generar por ejemplo, diferentes ramas de tareas que tienen dependencias en común, que una tarea comience a solo si corresponde a la última ejecución de un scheduling o si queremos que dependa de una condicíon específica de como terminaron las tareas más abajo.

**(Si las tareas fallan, por defecto Airflow trataré nuevamente de ejecutarlo con un delay configurable entre pruebas. Si se alcanza el limité, fallará la tarea y, si no se tiene contemplado, el DAG)*

&nbsp;&nbsp;&nbsp;&nbsp; ¿Cómo se comunican las tareas? Para esto, de nuevo, las opciones son 3:
* XComs (“Cross-communications”) que es el sistema por el cual se envian pequeñas cantidades de información.
* A partir de subir y descargar archivos a un storage en la nube (Por ejemplo GCS). Pensemos que estos metodos no son excluyente; tranquilamente se puede subir un dataset en la tarea A y comunicarle a la tarea B, via Xcoms, que dataset es el que debe procesar.
* Si configuramos las tareas mediante la API de TaskFlow, la información se va a pasar automaticamente mediante Xcoms.

&nbsp;&nbsp;&nbsp;&nbsp; Como podemos ver, Airflow es una plataforma con una enorme cantidad de features y conceptos. Solo para que no queden cosas en el tintero, pasaré a listar un par que nos quedarón:
* Existen subDAGs que nos permiten definir un DAG adentro de otro. De la misma manera en la que codeamos una función porque tener código duplicado es una mala práctica, los subDAGs nos permiten instanciar varias veces una misma secuencia de pasos a seguir si así lo requeriese nuestro flujo.
* Similar al concepto anterior, podemos agrupar tareas dentro de un grupo (TaskGroup). Esto no tiene ningún efecto funcional, solo estético (💅). La idea es que si nuestro DAG es muy complejo, visualmente no nos sirve analizar los flujos si lo que estamos viendo es una maraña de tareas, por lo que visualmente podemos agruparlas (normalmente en terminos funcionales de conjunto) de modo tal de que nos sea más facil distinguir el flujo entre procesos.

| <img alt="Grupo de tareas" src="images/task_group.gif"/> |
|:--:|
|*Grupo de tareas*|

* Tambien contamos con un feature en donde se pueden guardar variables de entorno, accesibles desde las tareas y que a su vez tambien pueden ser escritas en las mismas. Un caso particularmente similar es el de las conexiones con bases de datos. Estas pueden ser configuradas desde el CLI o en la sección `Admin` de la UI.

---

### 1.5 Planteo de problema

&nbsp;&nbsp;&nbsp;&nbsp; Mucho ruido y pocas nueces! Hagamos nuestro propio DAG! Vimos que al iniciar nuestra UI, se nos brinda una serie de DAGs de ejemplo predefinidos. Estás más que invitad@ a ver que contienen ya que todos muestran como funcionan muchos de los conceptos que nombramos más arriba. Ahora, ¿Cómo agregamos los propios?

&nbsp;&nbsp;&nbsp;&nbsp; Como mencionamos en la sección de arquitectura, Airflow requiere que se especifique una carpeta de donde leer las definiciones de los DAGs. Dicha carpeta se especifica en el archivo de configuración `$AIRFLOW_HOME/airflow.cfg`, bajo la entrada `dags_folder`. (Uno sabe que una plataforma tiene un par de features cuando su archivo de configuración tiene más de 300 entradas...)

&nbsp;&nbsp;&nbsp;&nbsp; Para nuestro caso de estudio creemos el escenario. Vamos a crear un pequeño ETL para un random walker que solo puede avanzar(?) con las siguientes condiciones:
* Nuestro experimento debería haber comenzado en 2022-01-01 y deberá ejecutarse a cada minuto (Acá podemos ver que evidentemente no podemos tener una persona iniciando nuestro script cada 1 minuto).
* Primero vamos a tirar un dado y una moneda N veces: cara es avanzar y ceca es retrocedor, mientras que el dado nos dará la cantidad de pasos (Este paso simula la generación de datos, normalmente puede ser una condición externa al proceso).
* Luego de tener nuestras N muestras vamos a tomar una decision: si luego de las N veces, el random walker avanzó entonces registramos el avance; ahora, si el random walker retrocedió, simplemente lo dejamos en la posición en la que estaba al comienzo. (Está etapa simulará tanto la transformación de la data y que hacer en caso de "fallo").
* Por último registraremos la posición final del random walker.

| <img alt="Robot caminando" src="images/robotWalking.jpg"/> |
|:--:|
|*Robot caminando*|

---

### 1.6 Armado del esqueleto


&nbsp;&nbsp;&nbsp;&nbsp; Genial! Comencemos! Lo primero que vamos a hacer es crear el archivo en donde escribiremos el DAG:
```bash
mkdir $AIRFLOW_HOME/dags 
touch $AIRFLOW_HOME/dags/cuasi_random_walker.py
```

&nbsp;&nbsp;&nbsp;&nbsp; Abrimos el archivo con nuestro IDE preferido y creamos el contexto del DAG:

```python
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id='random_walker',
    schedule='* * * * *',
    start_date=datetime.datetime(2022, 1, 1),
    catchup=False,
    tags=['experimentos'],
    params={"N": "50"},
) as dag:
    ejemplo = EmptyOperator(
        task_id='prototipo',
    )

    ejemplo
```
&nbsp;&nbsp;&nbsp;&nbsp; En este caso estamos creando un DAG con una sola tarea de ejemplo. Es una buena práctica crear los DAGs con un contexto pero sepamos que se puden generar, por ejemplo como resultado de una función. Pasemos a explicar sus parámetros:
* **dag_id**: El nombre de nuestro DAG. Es el que veremos en nuestra UI.
* **schedule**: Con que frecuencia se debería ejecutar nuestro DAG si tal cosa existe, puede que solo querramos que se ejecute de manera externa o en base a otros DAGs. Este  parámetro soporta expresiones cron (Siempre se puede visitar [crontab.guru](https://crontab.guru/)) o valores preseteados como `@hourly` o `@daily`.
* **start_date**: Desde que momento este DAG es valido. Existe un analogó para establecer hasta que momento es valido.
* **catchup**: Este  parámetro establece si se debería hacer un fillback. Esto quiere decir que si estuviese activado lanzaría todos los DAGs que se deberían haber ejecutado entre 2022-01-01:00:00 y el momento en el que activemos el DAG. Es importante mencionar, como vimos en el ejemplo al comienzo, que indefectiblemente de si este parámetro está activado o no, el scheduler hará una ejecucion (DagRun) correspondiente al último schedule.
* **tags**: Lista de tags que nos ayuda visualmente a identificar DAGs que sirve a un fin común o area.
* **params**: Diccionario que podremos usar para acceder en nuestros templates.

&nbsp;&nbsp;&nbsp;&nbsp; Ahora que sabemos como generar DAGs y prototipos, borremos `ejemplo` y armemos el esqueleto de nuestra flujo:
```python
from airflow.operators.python import BranchPythonOperator
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

    ...
    # Lanzamos los dados y la moneda
    tirar_dados = EmptyOperator(
        task_id='tirar_dados',
    )

    # Dependiendo del resultado vamos a avanzar o permanecer quietos
    resultados = ['avanzar', 'permanecer']

    branching = BranchPythonOperator(
        task_id='branching',
        python_callable=lambda: random.choice(resultados),
    )

    # Vamos a tirar los dados antes de analizar sus resultados
    tirar_dados >> branching

    # No importa que camino se haya tomado, vamos a registrar en donde está el walker
    loggear_posicion = EmptyOperator(
        task_id='loggear_posición',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    # En este caso, para ambos caminos vamos a calcularla la nuevo posición
    for option in resultados:
        t = EmptyOperator(
            task_id=option,
        )

        nueva_posicion = EmptyOperator(
            task_id='nueva_posicion_' + option,
        )

        # Las Label son opcionales pero nos sirven para entender el flujo
        branching >> Label(option) >> t >> nueva_posicion >> loggear_posicion
```
&nbsp;&nbsp;&nbsp;&nbsp; Si abrimos nuestro DAG en este momento en la UI, en la sección de grafo deberiamos ver lo siguiente:

| <img alt="BOO!" src="images/Skeleton.png"/> |
|:--:|
|*Spooky skeleton*|


---

### 1.7 Implementamos tareas

&nbsp;&nbsp;&nbsp;&nbsp; Para la implementación de los pasos instalemos numpy como dependencia:
```bash
pip install numpy
```

&nbsp;&nbsp;&nbsp;&nbsp; Creemos carpetas para guardar la data. En un escenario real esto podría ser un data lake o data warehouse.
```bash
mkdir -p /tmp/data/dados
mkdir -p /tmp/data/monedas
mkdir -p /tmp/data/posiciones
```

&nbsp;&nbsp;&nbsp;&nbsp; Guardemos la posicion inicial
```bash
echo "0" >> /tmp/data/posiciones/2022-01-01_00-00-00.txt
```

&nbsp;&nbsp;&nbsp;&nbsp; Y ahora sí, implementemos los step:

* ### `tirar_dados`: reemplazamos el EmptyOperator

```python
from airflow.operators.python import PythonOperator
import numpy as np

# Funciones
def tirar_dados_func(N: str):

    # Convertimos N a int
    N = int(N)

    # A partir de la fecha generamos una etiqueta
    time_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    # Tiro dados
    dados = np.random.randint(1, 7, size=N)

    # Tiro monedas
    monedas = np.random.choice([-1, 1], size=N)

    # Guardamos valores
    with open(f"/tmp/data/dados/{time_name}.npy", "wb") as f:
        np.save(f, dados)

    with open(f"/tmp/data/monedas/{time_name}.npy", "wb") as f:
        np.save(f, monedas)

...
    # Lanzamos los dados y la moneda
    tirar_dados = PythonOperator(
        task_id="tirar_dados",
        op_args=[
            "{{ params.N }}",  # Acá usamos el formato de templating
        ],
        python_callable=tirar_dados_func,
    )
```

* ### `branching`:
```python
import glob
import os

def get_last_file(path: str) -> str:
    """Get the name of last created file in a given path

    Args:
        path (str): Path to the explore

    Returns:
        str: Name of the last created file
    """
    list_of_files = glob.glob(f"{path}*")
    latest_file = max(list_of_files, key=os.path.getctime)
    return latest_file


def branching_func(
    **context,
):  # Agregamos el contexto para poner mandar mensajes entre tareas

    # Obtengo los ultimos dados y monedas
    ultimos_dados = get_last_file("/tmp/data/dados/")
    ultimas_monedas = get_last_file("/tmp/data/monedas/")

    dados = np.load(ultimos_dados)
    monedas = np.load(ultimas_monedas)

    # Calculo la suma total de pasos en ambas direcciones
    suma = (dados * monedas).sum()

    # Mandamos el mensaje de cuanto fue la suma
    task_instance = context["task_instance"]
    task_instance.xcom_push(key="suma_de_pasos", value=str(suma))

    # Si el walker debe avanzar
    if suma >= 0:
        return "avanzar"

    # Caso contrario
    return "permanecer"

...
    # Dependiendo del resultado vamos a avanzar o permanecer quietos
    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=branching_func,
        provide_context=True,  # Proveemos el contexto para enviar mensajes entre tareas
    )

    # Vamos a tirar los dados antes de analizar sus resultados
    tirar_dados >> branching
```





* ### Vamos a tener que separara el `avanzar` y `retroceder` ya que tienen diferente comportamiento:

```python
def avanzar_func(
    **context,
):

    # Recibimos el mensaje de `branching`
    task_instance = context["task_instance"]
    suma = task_instance.xcom_pull(task_ids="branching", key="suma_de_pasos")

    # "Procesamos" la nueva suma.
    print(f"El walker avanza {suma} pasos!")


def nueva_posicion_avanzar_func(
    **context,
):

    # Recibimos el mensaje de `branching`
    task_instance = context["task_instance"]
    suma = task_instance.xcom_pull(task_ids="branching", key="suma_de_pasos")

    # Ultima posicion
    ultima_posicion = get_last_file("/tmp/data/posiciones/")
    with open(ultima_posicion, "r+") as f:
        posicion = f.read()

    # Calculamos la nueva posicion
    nueva_posicion = int(suma) + int(posicion)

    print(f"{suma = }")
    print(f"{posicion = }")

    # Guardamos la ultima posicion
    time_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    with open(f"/tmp/data/posiciones/{time_name}.txt", "w") as f:
        f.write(str(nueva_posicion))


def permanecer_func(
    **context,
):

    # Recibimos el mensaje de `branching`
    task_instance = context["task_instance"]
    suma = task_instance.xcom_pull(task_ids="branching", key="suma_de_pasos")

    # "Procesamos" la nueva suma.
    print(f"El walker no va a moverse porque la suma fue {suma}!")


def nueva_posicion_permanecer_func():

    # Ultima posicion
    ultima_posicion = get_last_file("/tmp/data/posiciones/")
    with open(ultima_posicion, "r") as f:
        posicion = f.read()

    # Calculamos la "nueva" posicion
    nueva_posicion = int(posicion)

    # Guardamos la "nueva" posicion
    time_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    with open(f"/tmp/data/posiciones/{time_name}.txt", "w") as f:
        f.write(str(nueva_posicion))
...

    avanzar = PythonOperator(
        task_id="avanzar",
        python_callable=avanzar_func,
        provide_context=True,
    )

    # Brancheamos y luego avanzamos
    branching >> Label("avanzar") >> avanzar

    # Calculamos la nueva posicion
    nueva_posicion_avanzar = PythonOperator(
        task_id="nueva_posicion_avanzar",
        python_callable=nueva_posicion_avanzar_func,
        provide_context=True,
    )

    # Luego de procesar el avance, calculamos la nuva posicion
    avanzar >> nueva_posicion_avanzar

    # Permanecemos quietos
    permanecer = PythonOperator(
        task_id="permanecer",
        python_callable=permanecer_func,
        provide_context=True,
    )

    # Brancheamos y luego avanzamos
    branching >> Label("permanecer") >> permanecer

    # Calculamos la nueva posicion
    nueva_posicion_permanecer = PythonOperator(
        task_id="nueva_posicion_permanecer",
        python_callable=nueva_posicion_permanecer_func,
    )

    # Luego de procesar el avance, calculamos la nuva posicion
    permanecer >> nueva_posicion_permanecer
```

* ### Y por último `loggear_posicion`:

```python 
def loggear_posicion_func():

    # Obtengo la ultima posicion
    ultima_posicion = get_last_file("/tmp/data/posiciones/")
    with open(ultima_posicion, "r") as f:
        posicion = f.read()

    # "Proceso" la última posicion
    print(f"En este momento, el walker está en {posicion}!")

...

    # No importa que camino se haya tomado, vamos a registrar en donde está el walker
    loggear_posicion = PythonOperator(
        task_id="loggear_posicion",
        python_callable=loggear_posicion_func,
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    # Si se avanzo o no, se loggea
    [nueva_posicion_avanzar, nueva_posicion_permanecer] >> loggear_posicion
```

&nbsp;&nbsp;&nbsp;&nbsp; Uff eso fue un montón, pero pudimos pasar por la mayoria de los conceptos que vimos anteriormente. El archivo final debería verse [asi]("extras/cuasi_random_walker.py").

&nbsp;&nbsp;&nbsp;&nbsp; Encendamos motores y prendamos nuestro walker! Luego de unos minutos veremos que el DAG se corrió varias veces; en algunos casos el walker avanzó y en otros permaneció quieto. Para ver la posición del walker siempre podemos ver los logs del step `loggear_posicion`. 

| <img alt="Walker on" src="images/Walker.png"/> |
|:--:|
|*Walker corriendo*|

&nbsp;&nbsp;&nbsp;&nbsp; Cómo podemos ver, los DAGs no fallan, aunque el walker "fallo" y no avanzó (los DAGs solo tienen 2 estados final: `success` o `failed`). Es importante mencionar que de la misma manera en donde en un programa no siempre todo va como querriamos, el programa finaliza bien pero con un comportamiento indeseado. Esto se puede traducir en que la excepcion estaba contemplada (otra muy distinta es que explote el programa...). De la misma manera, a la hora de generar nuestro pipelines o DAGs deberiamos tener en cuenta los casos "no tan bonitos" ,en donde se deben tomar acciones correspondientemente, y dejar que los `failed` de los pipelines nos digan que tenemos un nuevo caso no contemplado a trabajar.


### 1.8 Ambiente productivo

&nbsp;&nbsp;&nbsp;&nbsp; Ahora bien. Ya tenemos un ambiente de desarollo local en donde nuestro DAG funciona. Pero...
| <img alt="Delivery" src="images/delivery.png"/> |
|:--:|
|*Delivery de flujos*|

&nbsp;&nbsp;&nbsp;&nbsp; Por tanto, el paso siguiente es llevar nuestra solución a la nube. Y que mejor lugar que [Cloud Composer](https://cloud.google.com/composer), un servicio de Airflow gestionado por Google Cloud.

&nbsp;&nbsp;&nbsp;&nbsp; Teniendo nuesto proyecto seleccionado, para acceder a Composer podemos buscarlo y acceder al mismo

| <img alt="Composer" src="images/ComposerSearch.png"/> |
|:--:|
|*Composer en el buscador*|

&nbsp;&nbsp;&nbsp;&nbsp; Una vez adentro del servicio el paso siguiente es crear un ambiente, para eso vamos a llenar los siguientes campos y crear el ambiente:
* Nombre: develop
* Ubicación: indistinto (Ej: us-central-1)
* Cantidad de nodos: lo dejamos en 3
* Tamaño del disco: dado que no vamos a necesitar demasiado, podemos reducir el valor por defecto (100GB) a 30GB.
* Version de imagen: de nuevo, al ser experimental podemos seleccionar la última version disponible. En mi caso fue `composer-1.19.12-airflow-2.3.3`.
* Cantidad de schedulers: Con 1 solo estamos bien.

&nbsp;&nbsp;&nbsp;&nbsp; Ya ingresados todos los campos y clickeado el boton de `CREAR` nos queda esperar hasta que el ambiente se encuentre estable:
| <img alt="Composer estable" src="images/StableComposer.png"/> |
|:--:|
|*Composer estable*|

&nbsp;&nbsp;&nbsp;&nbsp; Genial! Ya tenemos un ambiente andando. 
&nbsp;&nbsp;&nbsp;&nbsp; Cuando me refería a las diferencias entre un ambiente de desarollo y uno productivo es que, por ejemplo, si tratasemos de subir nuestro DAG a la plataforma no funcionaría por incompatibilidades en la versión (en composer se usa Airflow 2.3.3 y en local 2.4.2), así como del sistema de archivos (que usamos para transferir información). Como solucion a esos problemas, podemos modificar nuestro DAG para:
* Reemplazar `schedule` por el deprecado  parámetro `schelude_interval` en la creación del DAG.
* Reemplazar la carpeta en donde se escribe la información de los DAGs de `/tmp/data` a `/home/airflow/data`, que en Composer convenientemente se sincroniza con el bucket que usa backend 👌.
* Agregar funciones de fallback para el cold-start de carpetas y archivos.
&nbsp;&nbsp;&nbsp;&nbsp; Una versión con esos problemas corregidos puede obtenerse [aca]("extras/cuasi_random_walker_composer_version.py").

&nbsp;&nbsp;&nbsp;&nbsp; Antes de subir nuestro DAG, debemos indicarle a Composer que el ambiente en el que vamos a trabajar requiere que instalemos dependecias (`numpy`). Para eso podremos agregarla en la siguiente opcion:

| <img alt="Agregar numpy" src="images/AddNumpy.png"/> |
|:--:|
|*Agregar numpy*|

&nbsp;&nbsp;&nbsp;&nbsp; Una vez que agreguemos la dependencias deberemos esperar a que los cambios se repliquen en todo el ambiente. Cuando eso termine solo nos quedará subir nuestro DAG al bucket de nuestro ambiente:
| <img alt="Carpeta de DAGS" src="images/UploadDag.png"/> |
|:--:|
|*Carpeta de DAGS*|

&nbsp;&nbsp;&nbsp;&nbsp; Luego de subir nuestro DAG, Composer automaticamente lo agregará al scheduler y lo habilitará. Esto signifca que si abrimos la UI y esperamos de unos minutos... Voilà! Tenemos nuestro DAG corriendo en la nube con Composer!

| <img alt="Walking Composer" src="images/ComposerWalking.png"/> |
|:--:|
|*Walking Composer*|


### 1.9 Conclusiones


&nbsp;&nbsp;&nbsp;&nbsp; Uff que paseo! Airflow es una gran plataforma que nos permite sincronizar y orquestar tareas de casi cualquier manera. 

&nbsp;&nbsp;&nbsp;&nbsp; Son un montón de conceptos y creame que no exploramos un montón de cosas (como otros executors u operadores custom), por lo que no te desanimes si ves abrumador la cantidad de conceptos. De a poco se avanza. Lo importante a saber es que siempre podemos contar con Airflow para que se ocupe de lanzar nuestros flujos de trabajo y poder dormir tranquilos. Al menos hasta que nos envíe un mail...💨📮