In [1]:
# Instalación de librerias necesarias
#!pip install sodapy # Utilidad para acceder a los datos publicados en datos.org
#!pip install dask # Utilidad para paralelizar el proceso de descarga

In [1]:
# Importar librerias que se necesitan para la ejecución del codigo
import pandas as pd
from sodapy import Socrata
from google.cloud import bigquery
from dask import dataframe as dd
from dask.diagnostics import ProgressBar
import dask
dask.config.set({'logging.distributed': 'error'})

<dask.config.set at 0x7f260de2a620>

In [4]:
# Consulta SQL para obtener los códigos de las estaciones
query_stations = """ 
    SELECT LPAD(codigo, 10, '0') AS codigo_estacion
    FROM `ideam.stations_catalog`
"""

# URL base del servicio de datos
url = "www.datos.gov.co"

# Endpoints para diferentes datos climáticos
# - "sbwg-7ju4" para temperatura
# - "s54a-sgyg" para precipitación
# Nota: La variable global 'endpoint' varía según el dato climático a descargar.
endpoint = "s54a-sgyg"

# Ruta del bucket en Google Cloud Storage donde se almacenarán los datos descargados
bucket_store = "gs://datalake-ideam/zone=landing/source=datos-org/"

# Tipo de variable climática a descargar
variable = "precipitacion"

In [5]:
# Funciones para descargaar los datos

def get_query_results(sql: str) -> pd.DataFrame:
    """
    Ejecuta una consulta SQL en BigQuery y devuelve los resultados como un DataFrame de pandas.

    Args:
        sql (str): La consulta SQL a ejecutar.

    Returns:
        pd.DataFrame: Un DataFrame que contiene los resultados de la consulta.
    """
    # Crea un cliente BigQuery
    client = bigquery.Client()

    # Ejecuta la consulta y convierte los resultados en un DataFrame
    query = client.query(sql)
    return query.result().to_dataframe()

def get_soap_data(station_code: str, bucket_store: str):
    """
    Obtiene datos de una API SOAP usando el código de una estación y los guarda como un archivo Parquet en un bucket de almacenamiento.

    Args:
        station_code (str): Código de la estación para filtrar los datos.
        bucket_store (str): Ruta al bucket donde se almacenará el archivo Parquet.

    Returns:
        None
    """
    try:
        # Crea un cliente para la API SOAP
        client = Socrata(url, None, timeout=180)
        
        # Obtiene los resultados de la API filtrados por el código de la estación
        results = client.get(endpoint, limit=20000, where=f"codigoestacion = '{station_code}'")
        
        # Convierte los resultados a un DataFrame de pandas
        results_df = pd.DataFrame.from_records(results)
        
        # Si hay datos disponibles, guárdalos como un archivo Parquet
        if results_df.shape[0] > 0:
            results_df.to_parquet(bucket_store + f'{variable}_{station_code}.parquet')
    except Exception as e:
        # Maneja cualquier excepción y muestra un mensaje de error
        print(f"Algo ha ocurrido: {e}")

In [6]:
stations_df = get_query_results(query_stations)
dask_stations_df = dd.from_pandas(stations_df
                                  , npartitions = 200)

In [None]:

with ProgressBar():
    dask_stations_df.map_partitions(lambda df: df.codigo_estacion.apply(lambda x:
                                                 get_soap_data(x
                                                               ,bucket_store))       
                                                       ,meta = {'name':object}
                                                       ).compute(scheduler='processes')