 # üìå PipeLine para descarga y atomizaci√≥n de Datos

üîπEste documento describe la arquitectura utilizada para descargar datos crudos desde KoboToolbox, almacenarlos en Google Cloud Storage, y posteriormente atomizarlos y cargarlos en BigQuery mediante Cloud Functions.


-------------------
### üìÅ Perfiladores de Corriente 
- La estructura de Cloud Functions requiere de dos archivos: **main.py** y **requirement.txt**
- *main.py* - C√≥digo con la funci√≥n para acceder a los datos de la aplicaci√≥n KoboToolbox y guardar los registros (datos crudos) en un Bucket de *Cloud Storage*
- *requirement.txt* - Debe de contener:

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Perfiladores_de_Corriente

#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/akth74ZsstPt82WXhLUKmN/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_jf5li71" (Informaci√≥n del sensor)
            grupo_sensor = item.get("group_jf5li71", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n del sensor (grupo_jf5li71) ---
                "coordenadas_sensor": coordenadas,
                "frecuencia_operacion": grupo_sensor.get("Frecuencia_de_operaci_n"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Perfiladores_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Perfiladores_de_Corriente \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ Par√°metros de Agua
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Par√°metros_de_Agua
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aZcdmftcBtnpfW8HvV78as/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_jf5li71" (Informaci√≥n de calidad de agua)
            grupo_agua = item.get("group_kv0zx70", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de calidad de agua (grupo_kv0zx70) ---
                "Temperatura": grupo_agua.get("Temp_C"),  
                "Salinidad": grupo_agua.get("Salinidad_PSU"),
                "Conductividad": grupo_sensor.get("Conductividad_mS_cm"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Parametros_agua_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Par√°metros_de_Agua \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ C√°maras Terrestre
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= C√°maras_Terrestres
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aMQsvne8A7orHTdj6Yfh5x/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:

            # Extraer datos del grupo "group_kv0zx70" (Informaci√≥n de instalaci√≥n)
             grupo_intalaci√≥n = item.get("group_kv0zx70", {})

            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_rw9dd47/group_it8nx96" (Informaci√≥n de OP)
            grupo_op = item.get("group_kv0zx70", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de operaci√≥n y mantenimiento (group_rw9dd47/group_it8nx96) ---
                "Modo de Operaci√≥n": grupo_op.get("Modo_de_operaci_n_001"),  
                "Se√±al": grupo_op.get("Intensidad_de_la_se_al"),

                # ---- Informaci√≥n de Instalaci√≥n (group_kv0zx70)

                "Altura del sensor": grupo_instlaci√≥n.get("Altura_del_sensor_m_desde_el"),  
                "Coordenadas": grupo_op.get("Latitud_y_Longitud_del_sensor"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "C√°maras_terrestre_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource C√°maras_Terrestres \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ Zooplancton
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Zooplancton
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aojT2JZfgdqjTsxNszRdVz/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_kv0zx70" (Informaci√≥n de la muestra)
            grupo_muestra = item.get("group_kv0zx70", {})
        
            # Extraer datos del grupo "group_jf5li71" (Informaci√≥n de registro CTD)
            grupo_registro = item.get("group_zq1vg22", {})
            data_limpia.append({
                
                # --- Informaci√≥n de la muestras (group_kv0zx70) ---
                "ID Muestra": grupo_muestra.get("ID_Muestra"),
                "Estado de la Marea": grupo_muestra.get("Estado_de_la_marea"),
            
                
                # --- Informaci√≥n de registro de CTD (group_zq1vg22) ---
                "Temperatura": grupo_registro.get("Temp_C"),  
                "Salinidad": grupo_registro.get("Salinidad_ppt"),
                "Profundidad": grupo_registro.get("Profundidad_m"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Zooplancton_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Zooplancton \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ Vuelo de dron
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Vuelo_de_dron
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aBwshXwrJ77EyRLtrx2s8z/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_kv0zx70" (Informaci√≥n del vuelo de dron)
            grupo_vuelo = item.get("group_kv0zx70", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de calidad de agua (grupo_kv0zx70) ---
                "Tipo de sensor": item.get("Tipo_de_sensor"),  # Fuera de grupo
                "Indice": grupo_vuelo.get("_ndice"),
                "Conductividad": grupo_vuelo.get("Conductividad_mS_cm"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Vuelo_de_dron_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Vuelo_de_dron \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ Estaciones Meteorol√≥gicas
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Estaciones_Meteorol√≥gicas 
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/an57S6QY6v3RwTWDnM2ev4/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_jf5li71" (Informaci√≥n de calidad de agua)
            grupo_instalacion = item.get("group_kv0zx70", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de instalaci√≥n de sensor (grupo_kv0zx70) ---
                "Tipo de sensor": item.get("Tipo_de_sensor"),  # Fuera de grupo
                "Coordenadas": grupo_instalacion.get("Latitud_y_Longitud_del_sensor"),
                "Altura del sensor": grupo_instalacion.get("Altura_del_sensor_m_desde_el"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Estaciones_meteorol√≥gicas_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Estaciones_Meteorol√≥gicas \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ CTD's
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= CTDS
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aAjboEpfSEbFjTZBxF8CKR/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_wh77o68" (Informaci√≥n de OP)
            grupo_op = item.get("group_wh77o68", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de OP (group_wh77o68) ---
                "Tipo de sensor": item.get("Tipo_de_sensor"),  # Fuera de grupo
                "Mantenimiento a": grupo_op.get("Mantenimiento_a"),
                "¬øNecesita ser recuperado?": grupo_op.get("_El_sensor_necesita_ser_recupe"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "CTDS_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource CTDS \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````


## üìÅ Estructura Forestal 
--------

### ‚ö° Descarga de datos crudos
- *requirement.txt* 

 

1) requests ‚Üí Para hacer solicitudes HTTP a la API de KoBoToolbox.

2) google.cloud.storage ‚Üí Para conectarse al Bucket de Cloud Storage.

3) functions_framework ‚Üí Permite ejecutar la funci√≥n como Cloud Function.

Para configurar la funci√≥n en Google Cloud Functions (GCF), es necesario establecer las variables de entorno requeridas por el c√≥digo principal, lo que simplifica su despliegue y mantenimiento.

- KOBO_TOKEN= "d0e1084994983390bd6f50e3ee61e9c522aee152"
- BUCKET_NAME= Estructura_forestal
#### üöÄ C√≥digo *main.py*
```` python
import os
import json
import requests
from google.cloud import storage
from datetime import datetime
import functions_framework

@functions_framework.http # Permite que la funci√≥n se ejecute cuando recibe una petici√≥n web y convierte la funci√≥n en una Cloud Function HTTP
def main(request):
    
    # --- Configuraci√≥n ---
    KOBO_URL = "https://kf.kobotoolbox.org/api/v2/assets/aenA9VYsH68Csi8QkDCntr/data.json" #Endpoint espec√≠fico de la API de KoboToolbox para un asset/proyecto
    KOBO_TOKEN = os.getenv("KOBO_TOKEN")
    BUCKET_NAME = os.getenv("BUCKET_NAME")

    # --- Descargar datos desde KoboToolbox ---
    headers = {"Authorization": f"Token {KOBO_TOKEN}"} # Configura autenticaci√≥n Bearer Token para la API
    response = requests.get(KOBO_URL, headers=headers) # Ejecuta la petici√≥n GET y almacena la respuesta

    if response.status_code != 200:
        return f"Error {response.status_code}: {response.text}", response.status_code

    data = response.json() # Convertir respuesta a JSON

    # --- Nombrar archivo por fecha ---
    fecha = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") # Se obtiene la fecha y se le da formato
    filename = f"kobo_raw_{fecha}.json" # Se nombra el archivo

    # --- Guardar archivo en Cloud Storage ---
    
    storage_client = storage.Client() # Crea una instancia del cliente oficial de Python para interactuar con Google Cloud Storage 
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(filename) # Representa el archivo que se crear√° en el bucket
    blob.upload_from_string(json.dumps(data), content_type="application/json")

    print(f"Archivo subido: gs://{BUCKET_NAME}/{filename}")
    return f"Datos guardados en {filename}", 200

````

üîπPara realizar la automatizaci√≥n de la descarga de datos se tiene que declarar un mensaje de activaci√≥n, el disparador (trigger) y por ultimo se declaran las condiciones para que se ejecute en automatico. 

1. Mensaje de activaci√≥n funciona como intermediario entre Scheduler y Cloud Function:\
   gcloud pubsub topics create kobo-extractor-trigger
   
2. Trrigger despliega una funci√≥n que se ejecuta autom√°ticamente cuando llega un mensaje al topic: \
   gcloud functions deploy extraer_datos_kobo \
  --runtime python311 \
  --trigger-topic kobo-extractor-trigger \
  --region=us-central1

   
4. Programar la tarea: \
   gcloud scheduler jobs create pubsub ejecutar-extraccion-kobo \
  --schedule="0 8 1 * *" \
  --topic=kobo-extractor-trigger \
  --message-body="Iniciar extracci√≥n de KoboToolbox" \
  --time-zone="America/Mexico_City"


----------------------


### üìã Atomizaci√≥n de datos
Una vez generada la dase de datos cruda en Cloud Storage se despliega otra funci√≥n donde a partir de los datos crudos se haga la atomizaci√≥n de los registros y se genere el Query. Su respectivo requirements.txt debe de contener: 
1) google-cloud-storage
2) google-cloud-bigquery


#### üöÄ C√≥digo *main.py*
```` python
    import json
    from google.cloud import storage, bigquery

    def atomizar_y_cargar(event, context):
        """Se ejecuta autom√°ticamente cuando se sube un nuevo archivo al bucket."""
        bucket_name = event['bucket']
        file_name = event['name']
        print(f"Nuevo archivo detectado: gs://{bucket_name}/{file_name}")

        # --- Leer el archivo desde el bucket ---
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name) 
        blob = bucket.blob(file_name) # Referencia al archivo espec√≠fico
        raw_data = blob.download_as_text()
        json_data = json.loads(raw_data) #Convierte el texto JSON a objeto Python

        # --- Extraer los registros del JSON ---
        registros = json_data.get("results", [])

        # --- Limpiar y estructurar los datos ---
        data_limpia = []
        
        for item in registros:
            # Extraer datos del grupo "group_dr3zv09" (Informaci√≥n general)
            grupo_principal = item.get("group_dr3zv09", {})
        
            # Extraer datos del grupo "group_rh5ho64/group_yh9gj88" (Medidas de Arbol)
            grupo_arbol = item.get("group_rh5ho64/group_yh9gj88", {})
            data_limpia.append({
                
                # --- Informaci√≥n general (grupo_dr3zv09) ---
                "nombre_completo": grupo_principal.get("Nombre_Completo"),
                "fecha_recoleccion": grupo_principal.get("Fecha_y_hora_de_recolecci_n"),
                "reserva": grupo_principal.get("reserva"),
                "sitio": grupo_principal.get("sitio"),
                "codigo_sitio": grupo_principal.get("codigo"),
                
                # --- Informaci√≥n de medidas de Arbol (group_wh77o68) ---
                "Tipo de encuesta": item.get("Tipo_de_encuesta"),  # Fuera de grupo
                "Especies Individuales": grupo_arbol.get("Especies_individuales"),
                "Altura del √°rbol": grupo_op.get("Altura_del_individuo_m"),

        })


        # --- Subir los datos a BigQuery ---
        bq_client = bigquery.Client()
        table_id = "Estructura_forestal_atomizado"

        job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",  # agrega datos nuevos sin borrar los anteriores
        autodetect=True # BigQuery infiere autom√°ticamente el esque de datos
        )

        job = bq_client.load_table_from_json(data_limpia, table_id, job_config=job_config)
        job.result()  # Esperar a que finalice la carga, dtiene la ejecuci√≥n en CloudFunctions

        print(f"{len(data_limpia)} registros cargados a {table_id}")

````

Desde Terminal se gener√° el ***Disparador*** de la funci√≥n para que se relice de forma automatica cada vez que se detecten nuevos datos en el bucket

````pyhton 
gcloud functions deploy atomizar_y_cargar \
  --runtime python311 
  --trigger-resource Estructura_forestal \
  --trigger-event google.storage.object.finalize \
  --region=us-central1
````
