# 🧪 Laboratorio: Inicio Rápido con Plantillas de Dataflow

**Google Cloud Dataflow** es un servicio serverless para procesar datos en tiempo real o por lotes, utilizando pipelines escalables. En este laboratorio, crearás un pipeline de streaming utilizando la plantilla **Pub/Sub to BigQuery**, que lee mensajes JSON desde un tema de **Pub/Sub** y los inserta en una tabla de **BigQuery**. Este enfoque es ideal para procesar datos en tiempo real, como transacciones financieras o registros de auditoría.

💡 **Beneficio empresarial**: **Dataflow** permite a las empresas procesar grandes volúmenes de datos financieros en tiempo real, como registros de transacciones, asegurando análisis rápidos, escalabilidad y cumplimiento normativo con mínima configuración manual.

Para más información, consulta la [Documentación de Google Cloud Dataflow](https://cloud.google.com/dataflow/docs) y la [Documentación de Google Cloud BigQuery](https://cloud.google.com/bigquery/docs).

## 🎯 Objetivo principal
- Crear un pipeline de streaming utilizando la plantilla **Pub/Sub to BigQuery**, que:
  1. Lee mensajes JSON desde un tema de **Pub/Sub** (mensajería en tiempo real).
  2. Inserta los mensajes en una tabla de **BigQuery** (base de datos analítica).

## 🧭 Actividades
- ✔️ Crear un dataset y tabla en **BigQuery**.
- ✔️ Crear un bucket en **Cloud Storage**.
- ✔️ Configurar y ejecutar un pipeline de streaming con **Dataflow**.

## 🧩 Tarea 1: Reiniciar la API de Dataflow

Para asegurar que la **API de Dataflow** esté habilitada y operativa:

1. En la barra superior de búsqueda de **Google Cloud Console**, escribe `Dataflow API`.
2. Haz clic en el resultado y selecciona **Manage**.
3. Haz clic en **Disable API** y confirma.
4. Luego, haz clic en **Enable API** para reactivarla.

**Explicación**:
- Reiniciar la API asegura que **Dataflow** esté correctamente habilitado en el proyecto.

💡 **Contexto empresarial**: Habilitar la API es como activar un módulo de software contable para procesar transacciones automáticamente.

✅ **Verificación**: Haz clic en **Check my progress** para validar.

## 🗂️ Tarea 2: Crear dataset, tabla en BigQuery y bucket con Cloud Shell

En esta tarea, prepararás los recursos necesarios usando comandos en **Cloud Shell**.

### Paso 2.1: Crear dataset en BigQuery

1. En **Cloud Shell**, ejecuta:

In [None]:
bq mk taxirides

**Desglose del comando**:
- `bq`: Herramienta de línea de comandos para **BigQuery**.
- `mk`: Comando para crear ("make").
- `taxirides`: Nombre del dataset.

**Explicación**:
- El dataset `taxirides` será el contenedor para la tabla que almacenará los datos del pipeline.

💡 **Contexto empresarial**: Un dataset es como una carpeta en un sistema contable donde se organizan los registros financieros.

### Paso 2.2: Crear tabla en BigQuery

1. Ejecuta:

In [None]:
bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer -t taxirides.realtime

**Desglose del comando**:
| **Parte** | **Significado** |
|-----------|-----------------|
| `bq mk` | Crear una tabla en **BigQuery**. |
| `--time_partitioning_field timestamp` | Particiona los datos por el campo `timestamp` para optimizar consultas. |
| `--schema ...` | Define el esquema con columnas y tipos de datos: `ride_id:string`, `point_idx:integer`, `latitude:float`, `longitude:float`, `timestamp:timestamp`, `meter_reading:float`, `meter_increment:float`, `ride_status:string`, `passenger_count:integer`. |
| `-t taxirides.realtime` | Crea la tabla `realtime` dentro del dataset `taxirides`. |

**Ejemplo de campo**:
- `ride_id:string`: Identificador único del viaje (texto).
- `latitude:float`: Coordenada de latitud (decimal).

**Explicación**:
- La tabla `realtime` almacenará los datos de los viajes de taxi en tiempo real, con particionamiento por tiempo para mejorar el rendimiento de las consultas.

💡 **Contexto empresarial**: Crear una tabla es como definir un libro contable con columnas específicas para registrar transacciones financieras.

### Paso 2.3: Crear un bucket en Cloud Storage

1. Guarda el nombre del bucket como variable (reemplaza `your-project-id` con tu ID de proyecto):

In [None]:
export BUCKET_NAME="your-project-id"

2. Crea el bucket:

In [None]:
gsutil mb gs://$BUCKET_NAME/

**Desglose del comando**:
- `gsutil`: Herramienta de línea de comandos para **Cloud Storage**.
- `mb`: Comando para crear un bucket ("make bucket").
- `gs://$BUCKET_NAME/`: Ruta del bucket usando la variable definida.

**Explicación**:
- El bucket almacenará archivos temporales generados por el pipeline de **Dataflow**.

💡 **Contexto empresarial**: Un bucket es como un archivo digital donde se guardan documentos financieros temporales durante el procesamiento.

## 🖱️ Tarea 3 (Alternativa): Crear todo usando Google Cloud Console

Si prefieres usar la interfaz gráfica en lugar de comandos:

### BigQuery
1. Ve a **Navigation menu** > **BigQuery**.
2. Haz clic en **⋮** junto al nombre del proyecto > **Create dataset**.
3. Configura:
   - **ID**: `taxirides`
   - **Region**: `us`
4. Crea la tabla `realtime`:
   - Haz clic en el dataset `taxirides` > **Create table**.
   - Usa la opción **Edit as text** para ingresar el esquema:
     ```
     ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
     ```
   - Habilita **Time partitioning** > selecciona el campo `timestamp`.

### Cloud Storage
1. Ve a **Navigation menu** > **Storage** > **Buckets** > **Create bucket**.
2. Configura:
   - **Name**: Usa el mismo nombre que tu ID de proyecto.
   - Mantén la configuración por defecto.
3. Haz clic en **Create**.

**Explicación**:
- Esta alternativa es útil para usuarios que prefieren interfaces gráficas sobre comandos.

💡 **Contexto empresarial**: Usar la consola es como configurar un sistema contable mediante menús en lugar de scripts, facilitando la adopción para equipos no técnicos.

## 🚀 Tarea 4: Ejecutar el pipeline de Dataflow

En esta tarea, ejecutarás un pipeline de streaming utilizando la plantilla **Pub/Sub to BigQuery**.

1. En **Cloud Shell**, ejecuta:

In [None]:
gcloud dataflow jobs run iotflow \
--gcs-location gs://dataflow-templates-"Region"/latest/PubSub_to_BigQuery \
--region "Region" \
--worker-machine-type e2-medium \
--staging-location gs://$BUCKET_NAME/temp \
--parameters inputTopic=projects/pubsub-public-data/topics/taxirides-realtime,outputTableSpec="PROJECT_ID:taxirides.realtime"

**Desglose del comando**:
| **Parte** | **Significado** |
|-----------|-----------------|
| `gcloud dataflow jobs run iotflow` | Crea un trabajo de **Dataflow** llamado `iotflow`. |
| `--gcs-location ...` | Ubicación de la plantilla pública **Pub/Sub to BigQuery**. Reemplaza `Region` con la región de tu proyecto (por ejemplo, `us-central1`). |
| `--region` | Región donde se ejecutará el pipeline (por ejemplo, `us-central1`). |
| `--worker-machine-type e2-medium` | Tipo de máquina para procesar los datos. |
| `--staging-location` | Ubicación temporal en el bucket para archivos intermedios. |
| `--parameters` | Especifica el tema de **Pub/Sub** (`inputTopic`) y la tabla de **BigQuery** (`outputTableSpec`). Reemplaza `PROJECT_ID` con tu ID de proyecto. |

**Explicación**:
- El pipeline lee datos de un tema público de **Pub/Sub** (`taxirides-realtime`) y los inserta en la tabla `taxirides.realtime` en **BigQuery**.

💡 **Contexto empresarial**: Este pipeline es como un sistema automatizado que registra transacciones financieras en tiempo real en un libro contable digital.

## 🔍 Tarea 5: Consultar los datos en BigQuery

En esta tarea, verificarás los datos insertados por el pipeline.

1. Ve a **BigQuery** en **Google Cloud Console**.
2. Ejecuta la siguiente consulta SQL:

In [None]:
SELECT * FROM `your-project-id.taxirides.realtime` LIMIT 1000

**Explicación**:
- Muestra los primeros 1000 registros de la tabla `realtime`, que contienen los datos de los viajes de taxi procesados por el pipeline.

💡 **Contexto empresarial**: Esta consulta es como revisar un informe financiero para confirmar que las transacciones se han registrado correctamente.

## ❓ Tarea 6: Preguntas de repaso

| **Pregunta** | **Respuesta** |
|--------------|---------------|
| ¿Google Cloud Dataflow permite procesamiento por lotes? | ✅ Verdadero |
| ¿Cuál fue la plantilla usada en el laboratorio? | ✅ Pub/Sub to BigQuery |

## 🚀 Resumen

| **Concepto** | **Explicación contable simplificada** |
|--------------|--------------------------------------|
| **Dataflow** | Sistema automatizado que procesa transacciones financieras en tiempo real o por lotes. |
| **Pub/Sub** | Cola de mensajes que recibe datos financieros en tiempo real, como transacciones. |
| **BigQuery** | Base de datos analítica que almacena registros financieros para análisis. |
| **Pipeline** | Proceso automatizado que traslada datos financieros de una fuente a un destino. |

💡 **Conclusión empresarial**: La plantilla **Pub/Sub to BigQuery** de **Dataflow** permite a las empresas procesar y almacenar datos en tiempo real, como registros de transacciones, de forma escalable y eficiente. Esto asegura análisis rápidos, cumplimiento normativo y optimización de costos operativos.

Para más información, consulta la [Documentación de Google Cloud Dataflow](https://cloud.google.com/dataflow/docs) y la [Documentación de Google Cloud BigQuery](https://cloud.google.com/bigquery/docs).