[![cloudevel](img/cloudevel.png)](https://www.cloudevel.com)

# Apache Beam y Cloud Dataflow.

## Apache Beam

Apache Beam es un modelo de programación de código abierto que te permite definir y ejecutar pipelines (tuberías) de procesamiento de datos. Ofrece una abstracción que simplifica las tareas de manejar datos tanto en procesamiento por lotes (batch) como en tiempo real (streaming).

Proporciona SDKs para varios lenguajes de programación, incluyendo Java, Python, Go, entre otros. Esto te da flexibilidad para trabajar con el lenguaje que prefieras.

Los pipelines creados con Apache Beam se pueden ejecutar en distintos entornos de procesamiento distribuido (runners) como Apache Flink, Apache Spark, Google Cloud Dataflow, y otros. Esto significa que es posible desarrollar pipelines una vez y luego desplegarlos en la plataforma que mejor convenga.

**Características**

* **Batch y Streaming en Uno:** Unifica el desarrollo de pipelines para procesar datos históricos (por lotes) y datos en tiempo real (streaming) con una sola API.
* **Extensible:** Permite crear transformaciones personalizadas e integraciones con otras tecnologías, expandiendo sus posibilidades.
* **Escalabilidad**:  Los runners distribuidos se encargan de la ejecución paralela y a gran escala de los pipelines, haciendo que Apache Beam pueda procesar enormes cantidades de datos.
* **Tolerancia a fallos:** Gestiona automáticamente la recuperación ante fallos de nodos o problemas durante el procesamiento, garantizando la consistencia de tus datos.

**¿Por qué usar Apache Beam?**

* **Simplifica el desarrollo:** Al unificar los dos paradigmas principales de procesamiento de datos, ayuda a reducir la complejidad asociada a la gestión de tecnologías por separado.
* **Código independiente de la plataforma:** Facilita la migración a diferentes entornos de ejecución si fuese necesario.
* **Versátil y potente:** Gracias a sus SDKs multi-lenguaje y su extensibilidad, se adapta a una amplia gama de casos de uso en procesamiento de datos.
* **Comunidad activa:** Siendo un proyecto de Apache Software Foundation, cuenta con una gran comunidad de desarrolladores y constante actualización.

[https://beam.apache.org/](https://beam.apache.org/) 

[https://beam.apache.org/releases/pydoc/2.55.1/](https://beam.apache.org/releases/pydoc/2.55.1/)

## Elementos para construir DAGs.

Los DAGs (Directed Acyclic Graphs o Gráficos Acíclicos Dirigidos) en Apache Beam se construyen utilizando principalmente dos tipos de componentes:

**1. PCollections (Colecciones de canalización)**

* Representan conjuntos de datos que fluyen a través de la canalización.
* Se pueden crear a partir de diversas fuentes, como archivos, bases de datos o mensajes de cola.
* Las PCollections son inmutables, lo que significa que no se pueden modificar después de crearse.

**2. Transformaciones**

* Son las operaciones que se aplican a las PCollections para procesar los datos.
* Existen dos tipos principales de transformaciones:
    * **Transformaciones de un solo elemento:** Procesan cada elemento de la PCollection de forma independiente.
    * **Transformaciones de ventana:** Agrupan los elementos de la PCollection en ventanas temporales y luego aplican una transformación a cada ventana.

**Componentes adicionales**

Además de PCollections y transformaciones, los DAGs en Apache Beam también pueden incluir:

* **Fuentes:** Leen datos de un origen externo, como un archivo o una base de datos.
* **Receptores:** Escriben datos en un destino externo, como un archivo o una base de datos.
* **Triggers:** Controlan cuándo se ejecutan las transformaciones.
* **Flujos laterales:** Permiten que los datos fluyan entre diferentes partes de la canalización.

**Ejemplo de un DAG**

Un DAG simple que lee datos de un archivo, los filtra y luego los escribe en otro archivo podría verse así:

```
datos_sin_filtrar = (
    ReadFromText("data.txt")
    | FlatMap(lambda linea: linea.split())
)

datos_filtrados = (
    datos_sin_filtrar
    | Filter(lambda palabra: len(palabra) > 5)
)

WriteToFile(datos_filtrados, "data_filtrada.txt")
```

En este ejemplo:

* `ReadFromText` es una fuente que lee datos del archivo "data.txt".
* `FlatMap` es una transformación que divide cada línea del archivo en palabras individuales.
* `Filter` es una transformación que filtra las palabras que tienen más de 5 caracteres.
* `WriteToFile` es un receptor que escribe los datos filtrados en el archivo "data_filtrada.txt".

## Flujos de stream.

## Procesamiento basado en streams en Apache Beam

Apache Beam ofrece un modelo de programación unificado para procesar datos tanto en **batch** (por lotes) como en **streaming**. El procesamiento en streaming permite procesar datos en tiempo real, a medida que llegan, lo que es ideal para aplicaciones como el análisis de datos en tiempo real, la detección de fraudes o la monitorización de sistemas.

**Ventanas en Apache Beam**

Para procesar datos en streaming, Apache Beam utiliza el concepto de **ventanas**. Las ventanas son bloques de tiempo que agrupan los elementos de un stream. Esto permite aplicar transformaciones a los datos de forma agregada, como calcular promedios, sumas o conteos.

**Tipos de ventanas en Apache Beam**

Apache Beam proporciona varios tipos de ventanas para diferentes casos de uso:

* **Ventanas fijas:** Dividen el tiempo en intervalos de tamaño fijo, como 1 minuto, 5 minutos o 1 hora.
* **Ventanas deslizantes:** Se superponen entre sí, lo que permite procesar los datos de forma continua.
* **Ventanas por sesión:** Agrupan los elementos del stream en función de la actividad del usuario o de la máquina.
* **Ventanas personalizadas:** Permiten crear ventanas personalizadas utilizando funciones lambda.

**Ejemplo de procesamiento en streaming**

El siguiente ejemplo muestra cómo calcular la media de los valores de temperatura cada minuto a partir de un stream de datos de sensores:

```python
import apache_beam as beam

with beam.Pipeline() as pipeline:
  # Leer datos del stream de sensores
  datos_sensores = (
      pipeline
      | "LeerDatosSensores" >> beam.io.ReadFromPubSub("topic-sensores")
      | "DecodificarDatos" >> beam.Map(lambda dato: parsear_dato_sensor(dato))
  )

  # Calcular la media de la temperatura por minuto
  medias_temperatura = (
      datos_sensores
      | "VentanasFijas" >> beam.WindowInto(beam.window.FixedWindows(60))
      | "CalcularMediaTemperatura" >> beam.CombinePerKey(beam.combiners.MeanCombine())
  )

  # Escribir las medias de temperatura en un archivo
  medias_temperatura | "EscribirAArchivo" >> beam.io.WriteToText("medias_temperatura.txt")
```

En este ejemplo:

* `ReadFromPubSub` lee datos del stream de sensores en un tema de Pub/Sub.
* `DecodificarDatos` decodifica los datos del sensor en un formato utilizable.
* `FixedWindows` divide el stream en ventanas de un minuto.
* `CalcularMediaTemperatura` calcula la media de la temperatura para cada ventana.
* `WriteToText` escribe las medias de temperatura en un archivo.

**Procesamiento de eventos fuera de orden**

Apache Beam también admite el procesamiento de eventos fuera de orden. Esto significa que los eventos del stream pueden llegar en un orden diferente al que fueron generados. Apache Beam utiliza técnicas como **watermarking** para identificar el tiempo real de cada evento y procesarlo correctamente.

## Dataflow.

Aquí te explico todo sobre Cloud Dataflow:

**¿Qué es Cloud Dataflow?**

* Es un servicio totalmente gestionado de Google Cloud Platform (GCP) diseñado para la ejecución de pipelines de datos escalables y de alto rendimiento.
* Está basado en el modelo de programación y el framework de código abierto Apache Beam.
* Esencialmente, Dataflow toma tus pipelines de Apache Beam y los ejecuta de manera optimizada en la infraestructura de Google Cloud.

**Características clave**

* **Procesamiento unificado:** Maneja tanto datos en lotes (batch) como en tiempo real (streaming) con una misma programación simplificada.
* **Totalmente gestionado:** Reduce la carga operativa al encargarse del aprovisionamiento automático de recursos, escalado, monitoreo y tolerancia a fallas. No tienes que preocuparte por la administración de clusters. 
* **Escalabilidad sin esfuerzo:** Se adapta a las demandas de datos, aumentando o disminuyendo recursos de procesamiento de forma automática. Esto garantiza el rendimiento y te permite procesar desde pequeños flujos hasta cargas de trabajo masivas.
* **Integración nativa con GCP:** Se conecta fácilmente con otros servicios de Google Cloud como BigQuery, Pub/Sub, Cloud Storage, etc., formando un ecosistema de procesamiento de datos eficiente.
* **Rentable:** Modelo de precios basado en el uso, lo que significa que solo pagas por los recursos que efectivamente consumes.

**¿Cuándo usar Cloud Dataflow?**

* **Procesamiento complejo de datos:** Si tus necesidades involucran transformaciones de datos desafiantes a gran escala, Dataflow proporciona la potencia y escalabilidad requeridas.
* **Agilidad ETL/ELT:** Ideales para ejecutar tareas de extracción, transformación y carga (ETL) o su variante de transformación primero (ELT) en la nube.
* **Análisis de streaming:** Para procesar datos en tiempo real y obtener información valiosa al instante.
* **Migración de cargas de trabajo locales:** Simplifica migrar tus pipelines de Apache Beam existentes desde entornos on-premise a la nube.

**¿Cómo funciona?**

1. **Desarrollo de pipelines:** Creas tus pipelines utilizando los SDK de Apache Beam (Java, Python, Go, entre otros).
2. **Ejecución en Dataflow:** Envías tu pipeline al servicio Cloud Dataflow.
3. **Aprovisionamiento de recursos:** Dataflow administra y aprovisiona de forma automática los recursos necesarios en Google Cloud (normalmente, instancias de Compute Engine).
4. **Procesamiento distribuido:** Dataflow ejecuta tu pipeline de forma paralela y escalable en los recursos asignados.
5. **Monitoreo y Optimización:** Puedes supervisar el funcionamiento del pipeline y Dataflow optimiza la ejecución del mismo según su desempeño.

[https://cloud.google.com/dataflow](https://cloud.google.com/dataflow)


## Conectores.

Apache Beam ofrece una amplia gama de conectores para leer y escribir datos desde y hacia diversos sistemas, tanto de Google Cloud Platform (GCP) como de terceros. Estos conectores facilitan la integración de Apache Beam con diferentes fuentes y destinos de datos, lo que lo convierte en una herramienta versátil para el procesamiento de datos a gran escala.

**Categorías principales de conectores:**

1. **Conectores I/O:** Sirven para leer y escribir datos en diferentes formatos y ubicaciones, como archivos locales, bases de datos, sistemas de mensajería y servicios en la nube. Algunos ejemplos:
    * **Leer desde texto:** `ReadFromText`, `ReadFromParquet`, `ReadFromPubSub`
    * **Escribir en texto:** `WriteToText`, `WriteToParquet`, `WriteToPubSub`
    * **Bases de datos:** `ReadFromJdbc`, `WriteToJdbc`
    * **Almacenamiento en la nube:** `ReadFromBigtable`, `WriteToBigtable`, `ReadFromGcs`, `WriteToGcs`
2. **Conectores de transformación:** Permiten aplicar transformaciones personalizadas a los datos durante el procesamiento. Entre ellos:
    * **Combiners:** Combinan elementos de un PCollection en un valor único.
    * **Windowing:** Agrupan elementos de un PCollection en ventanas temporales.
    * **Coding:** Serializan y deserializan los datos para su transmisión y almacenamiento.
3. **Conectores de monitoreo:** Facilitan el seguimiento del rendimiento y la ejecución de los pipelines de Apache Beam. Algunos ejemplos:
    * **Cloud Monitoring:** Envía métricas al sistema de monitoreo de GCP.
    * **Logback:** Registra eventos en archivos de registro.
4. **Conectores de SDK:** Proporcionan interfaces para interactuar con SDKs de terceros y ampliar las capacidades de Apache Beam.

**Conectores destacados:**

* **Google Cloud Pub/Sub:** Para leer y escribir mensajes en un tema de Pub/Sub.
* **Apache Kafka:** Para interactuar con un broker de Kafka y procesar streams de datos.
* **Cloud Spanner:** Para leer y escribir datos en una base de datos Cloud Spanner.
* **Cloud Bigtable:** Para leer y escribir datos en una base de datos NoSQL Cloud Bigtable.
* **Cloud Datastore:** Para leer y escribir datos en una base de datos NoSQL Cloud Datastore.
* **AWS S3:** Para leer y escribir datos en un bucket de Amazon S3.
* **Hadoop HDFS:** Para leer y escribir datos en un sistema de archivos distribuido HDFS.

**Nota:** La lista completa de conectores disponibles para Apache Beam es extensa y se actualiza constantemente. Puedes consultar la documentación oficial para ver la lista actualizada: [https://beam.apache.org/documentation/io/connectors/](https://beam.apache.org/documentation/io/connectors/

## Ejemplo.

Este ejemplo simula un proceso ETL (Extracción, Transformación y Carga) utilizando Apache Beam en Python. Los datos se obtienen desde un stream de Kafka, se limpian, se agrega un promedio y se almacenan en buckets de Cloud Storage.

**Pasos:**

1. **Importaciones:**
   Se importan las librerías necesarias para trabajar con Apache Beam, Kafka y Cloud Storage.

```python
import apache_beam as beam
from apache_beam.io.kafka import KafkaIO
from apache_beam.io.gcp.gcs import GCSFileSystem
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.aggregation import CombinePerKey
```

2. **Definición de la ventana:**
   Se establece la ventana de tiempo fija que se utilizará para agrupar los datos. En este caso, la ventana será de 1 minuto.

```python
window_size = 60  # Segundos
ventana = FixedWindows(window_size)
```

3. **Lectura de datos desde Kafka:**
   Se define un lector de Kafka para consumir los datos del stream. Se especifica el tema de Kafka y el formato de los mensajes (JSON).

```python
kafka_source = (
    KafkaIO.read_from_topic("topic-entrada")
    | "DecodificarMensaje" >> beam.Map(lambda mensaje: json.loads(mensaje))
)
```

4. **Limpieza de datos:**
   Se elimina cualquier registro que contenga valores nulos en el campo "valor".

```python
datos_sin_nulos = (
    kafka_source
    | "FiltrarValoresNulos" >> beam.Filter(lambda dato: dato["valor"] is not None)
)
```

5. **Cálculo del promedio:**
   Se agrupa la información por clave ("sensor") y se calcula el promedio del valor dentro de cada ventana.

```python
promedio_por_sensor = (
    datos_sin_nulos
    | "Ventanas" >> beam.WindowInto(ventana)
    | "CalcularPromedio" >> beam.CombinePerKey(CombinePerKey(mean))
)
```

6. **Formato de salida:**
   Se transforma la información en un formato adecuado para guardarla en Cloud Storage (JSON).

```python
datos_formato_salida = (
    promedio_por_sensor
    | "PrepararSalida" >> beam.Map(lambda dato: json.dumps(dato))
)
```

7. **Escritura en Cloud Storage:**
   Se utiliza un escritor de Cloud Storage para almacenar los datos en buckets. Se especifica el bucket y el nombre del archivo.

```python
gcs_writer = GCSFileSystem.create(bucket="bucket-destino")
destino = gcs_writer.join("datos_procesados/fecha={}.json".format(str(datetime.datetime.now())))

datos_formato_salida | "EscribirACloudStorage" >> beam.io.WriteToFile(destino)
```

**Ejecución del pipeline:**

Para ejecutar este ejemplo, necesitarás tener Apache Beam instalado y configurado para acceder a tu entorno de Kafka y Cloud Storage. Una vez hecho esto, puedes ejecutar el siguiente comando:

```
beam.Pipeline(options=PipelineOptions()).run(pipeline)
```

**Explicación:**

El código anterior define un pipeline de Apache Beam que realiza las siguientes tareas:

1. **Lee datos desde un stream de Kafka:** El lector de Kafka consume los mensajes del tema especificado y los convierte en objetos Python.
2. **Limpia los datos:** Se eliminan los registros que contienen valores nulos en el campo "valor".
3. **Calcula el promedio:** Se agrupa la información por clave ("sensor") y se calcula el promedio del valor dentro de cada ventana de 1 minuto.
4. **Formatea la salida:** Se transforma la información en un formato JSON para que sea fácil de almacenar.
5. **Escribe en Cloud Storage:** Se escribe la información formateada en un archivo JSON en un bucket de Cloud Storage.

Este es un ejemplo básico de un ETL con Apache Beam. Puedes modificarlo para adaptarlo a tus necesidades específicas, como cambiar la ventana de tiempo, agregar más transformaciones o usar otros conectores.


<p style="text-align: center"><a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Licencia Creative Commons" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/80x15.png" /></a><br />Esta obra está bajo una <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Licencia Creative Commons Atribución 4.0 Internacional</a>.</p>
<p style="text-align: center">&copy; José Luis Chiquete Valdivieso. 2024.</p>