## Sorted temperature folder

In [17]:
import os
import re
from collections import defaultdict
from datetime import datetime, timedelta

In [18]:
def analizar_nombres_archivos(directorio):
    # Modificamos el patrón para aceptar versiones con uno o más dígitos después del punto
    patron = re.compile(r'^(Temperature-Air-2m-(?:Min|Mean|Max)-24h)_(C3S-glob-agric)_(AgERA5)_(\d{8})_(final-v\d+\.\d+(?:\.\d+)?)\.nc$')
    
    archivos = [f for f in os.listdir(directorio) if f.endswith('.nc')]
    
    resultados = {
        'total_archivos': len(archivos),
        'archivos_validos': 0,
        'archivos_invalidos': [],
        'tipos_temperatura': defaultdict(int),
        'versiones': defaultdict(int),
        'fechas': defaultdict(lambda: {'count': 0, 'tipos': set()}),
        'fechas_faltantes': set(),
        'fechas_incompletas': [],
        'fechas_duplicadas': [],
        'fecha_inicio': None,
        'fecha_fin': None
    }
    
    todas_las_fechas = set()
    
    for archivo in archivos:
        match = patron.match(archivo)
        if match:
            resultados['archivos_validos'] += 1
            tipo_temp, _, _, fecha, version = match.groups()
            
            fecha_dt = datetime.strptime(fecha, "%Y%m%d")
            if resultados['fecha_inicio'] is None or fecha_dt < resultados['fecha_inicio']:
                resultados['fecha_inicio'] = fecha_dt
            if resultados['fecha_fin'] is None or fecha_dt > resultados['fecha_fin']:
                resultados['fecha_fin'] = fecha_dt
            
            resultados['tipos_temperatura'][tipo_temp] += 1
            resultados['versiones'][version] += 1
            resultados['fechas'][fecha]['count'] += 1
            resultados['fechas'][fecha]['tipos'].add(tipo_temp)
            
            todas_las_fechas.add(fecha)
            
            if resultados['fechas'][fecha]['count'] > 3:
                resultados['fechas_duplicadas'].append(fecha)
        else:
            resultados['archivos_invalidos'].append(archivo)
    
    # Verificar fechas faltantes e incompletas
    if resultados['fecha_inicio'] and resultados['fecha_fin']:
        fecha_actual = resultados['fecha_inicio']
        while fecha_actual <= resultados['fecha_fin']:
            fecha_str = fecha_actual.strftime("%Y%m%d")
            if fecha_str not in todas_las_fechas:
                resultados['fechas_faltantes'].add(fecha_str)
            elif len(resultados['fechas'][fecha_str]['tipos']) < 3:
                resultados['fechas_incompletas'].append(fecha_str)
            fecha_actual += timedelta(days=1)
    
    return resultados

In [19]:
def imprimir_informe(resultados):
    print(f"Informe de análisis de nombres de archivos:")
    print(f"Total de archivos analizados: {resultados['total_archivos']}")
    print(f"Archivos con formato válido: {resultados['archivos_validos']}")
    print(f"Archivos con formato inválido: {len(resultados['archivos_invalidos'])}")
    
    if resultados['fecha_inicio'] and resultados['fecha_fin']:
        print(f"\nFecha de inicio: {resultados['fecha_inicio'].strftime('%Y-%m-%d')}")
        print(f"Fecha de fin: {resultados['fecha_fin'].strftime('%Y-%m-%d')}")
        rango_dias = (resultados['fecha_fin'] - resultados['fecha_inicio']).days + 1
        print(f"Rango total: {rango_dias} días")
    
    if resultados['archivos_invalidos']:
        print("\nArchivos con formato inválido:")
        for archivo in resultados['archivos_invalidos']:
            print(f"  - {archivo}")
    
    print("\nDistribución de tipos de temperatura:")
    for tipo, cantidad in resultados['tipos_temperatura'].items():
        print(f"  - {tipo}: {cantidad}")
    
    print("\nVersiones encontradas:")
    for version, cantidad in resultados['versiones'].items():
        print(f"  - {version}: {cantidad}")
    
    if resultados['fechas_incompletas']:
        print("\nFechas con archivos incompletos (menos de 3 tipos):")
        for fecha in sorted(resultados['fechas_incompletas']):
            tipos = resultados['fechas'][fecha]['tipos']
            print(f"  - {fecha}: {len(tipos)} tipos - {', '.join(tipos)}")
    
    if resultados['fechas_faltantes']:
        print("\nFechas faltantes:")
        for fecha in sorted(resultados['fechas_faltantes']):
            print(f"  - {fecha}")
    
    if resultados['fechas_duplicadas']:
        print("\nFechas con más de 3 archivos:")
        for fecha in resultados['fechas_duplicadas']:
            print(f"  - {fecha}: {resultados['fechas'][fecha]['count']} archivos")

In [20]:
if __name__ == "__main__":
    directorio = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/landing'
    resultados = analizar_nombres_archivos(directorio)
    imprimir_informe(resultados)

Informe de análisis de nombres de archivos:
Total de archivos analizados: 48786
Archivos con formato válido: 48786
Archivos con formato inválido: 0

Fecha de inicio: 1980-01-01
Fecha de fin: 2024-07-09
Rango total: 16262 días

Distribución de tipos de temperatura:
  - Temperature-Air-2m-Min-24h: 16262
  - Temperature-Air-2m-Mean-24h: 16262
  - Temperature-Air-2m-Max-24h: 16262

Versiones encontradas:
  - final-v1.1: 48774
  - final-v1.1.1: 12


In [5]:
def analizar_nombres_archivos2(directorio):
    # Modificamos el patrón para aceptar versiones con uno o más dígitos después del punto
    patron = re.compile(r'^(Temperature-Air-2m-(?:Min|Mean|Max)-24h)_(C3S-glob-agric)_(AgERA5)_(\d{8})_(final-v\d+\.\d+(?:\.\d+)?)\.nc$')
    
    archivos = [f for f in os.listdir(directorio) if f.endswith('.nc')]
    
    resultados = {
        'total_archivos': len(archivos),
        'archivos_validos': 0,
        'archivos_invalidos': [],
        'tipos_temperatura': defaultdict(int),
        'versiones': defaultdict(int),
        'fechas': defaultdict(lambda: {'count': 0, 'tipos': set()}),
        'fechas_faltantes': set(),
        'fechas_incompletas': [],
        'fechas_duplicadas': [],
        'fecha_inicio': None,
        'fecha_fin': None
    }
    
    todas_las_fechas = set()
    
    for archivo in archivos:
        match = patron.match(archivo)
        if match:
            resultados['archivos_validos'] += 1
            tipo_temp, _, _, fecha, version = match.groups()
            
            fecha_dt = datetime.strptime(fecha, "%Y%m%d")
            if resultados['fecha_inicio'] is None or fecha_dt < resultados['fecha_inicio']:
                resultados['fecha_inicio'] = fecha_dt
            if resultados['fecha_fin'] is None or fecha_dt > resultados['fecha_fin']:
                resultados['fecha_fin'] = fecha_dt
            
            resultados['tipos_temperatura'][tipo_temp] += 1
            resultados['versiones'][version] += 1
            resultados['fechas'][fecha]['count'] += 1
            resultados['fechas'][fecha]['tipos'].add(tipo_temp)
            
            todas_las_fechas.add(fecha)
            
            if resultados['fechas'][fecha]['count'] > 3:
                resultados['fechas_duplicadas'].append(fecha)
        else:
            resultados['archivos_invalidos'].append(archivo)
    
    # Verificar fechas faltantes e incompletas
    if resultados['fecha_inicio'] and resultados['fecha_fin']:
        fecha_actual = resultados['fecha_inicio']
        while fecha_actual <= resultados['fecha_fin']:
            fecha_str = fecha_actual.strftime("%Y%m%d")
            if fecha_str not in todas_las_fechas:
                resultados['fechas_faltantes'].add(fecha_str)
            elif len(resultados['fechas'][fecha_str]['tipos']) < 3:
                resultados['fechas_incompletas'].append(fecha_str)
            fecha_actual += timedelta(days=1)
    
    return resultados

In [6]:
if __name__ == "__main__":
    directorio = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/landing'
    resultados = analizar_nombres_archivos2(directorio)
    imprimir_informe(resultados)

Informe de análisis de nombres de archivos:
Total de archivos analizados: 48786
Archivos con formato válido: 48786
Archivos con formato inválido: 0

Fecha de inicio: 1980-01-01
Fecha de fin: 2024-07-09
Rango total: 16262 días

Distribución de tipos de temperatura:
  - Temperature-Air-2m-Min-24h: 16262
  - Temperature-Air-2m-Mean-24h: 16262
  - Temperature-Air-2m-Max-24h: 16262

Versiones encontradas:
  - final-v1.1: 48774
  - final-v1.1.1: 12


## Sorted by dates

In [7]:
def extraer_fecha_y_tipo(nombre_archivo):
    match = re.search(r'Temperature-Air-2m-(Max|Mean|Min)-24h.*_(\d{8})_', nombre_archivo)
    if match:
        tipo, fecha = match.groups()
        return fecha, tipo
    return '00000000', ''

def ordenar_archivos(directorio):
    archivos = [f for f in os.listdir(directorio) if f.endswith('.nc')]
    archivos_por_fecha = defaultdict(dict)
    for archivo in archivos:
        fecha, tipo = extraer_fecha_y_tipo(archivo)
        archivos_por_fecha[fecha][tipo] = archivo
    fechas_ordenadas = sorted(archivos_por_fecha.keys())
    archivos_ordenados = []
    for fecha in fechas_ordenadas:
        for tipo in ['Max', 'Mean', 'Min']:
            if tipo in archivos_por_fecha[fecha]:
                archivos_ordenados.append(archivos_por_fecha[fecha][tipo])
    return archivos_ordenados

In [8]:
archivo = 'Temperature-Air-2m-Min-24h_C3S-glob-agric_AgERA5_20240131_final-v1.1.nc'

In [9]:
landing = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/landing'
test = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/test'

In [10]:
ordenar_archivos(test)[0]

'Temperature-Air-2m-Max-24h_C3S-glob-agric_AgERA5_20240101_final-v1.1.nc'

In [6]:
import os
import re
from collections import defaultdict

In [7]:
def extraer_fecha_y_tipo(nombre_archivo):
    match = re.search(r'Temperature-Air-2m-(Max|Mean|Min)-24h.*_(\d{8})_', nombre_archivo)
    if match:
        tipo, fecha = match.groups()
        return fecha, tipo
    return '00000000', ''

def ordenar_archivos(directorio):
    archivos = [f for f in os.listdir(directorio) if f.endswith('.nc')]
    archivos_por_fecha = defaultdict(lambda: {'Max': None, 'Mean': None, 'Min': None})
    
    for archivo in archivos:
        fecha, tipo = extraer_fecha_y_tipo(archivo)
        if fecha != '00000000' and tipo:
            ruta_completa = os.path.join(directorio, archivo)
            archivos_por_fecha[fecha][tipo] = ruta_completa
    
    # Convertir el defaultdict en un diccionario normal
    archivos_por_fecha = dict(archivos_por_fecha)
    
    return archivos_por_fecha

In [None]:
ordenar_archivos(ruta_local_entrada)

In [14]:
ruta_local_entrada = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/test'
bucket_salida = 'climate-action-datalake'
ruta_salida_s3 = 'zone=raw/source=agera5-v1-1/variable=Temperature_Air_2m_24h.zarr'

In [13]:
import os
from glob import glob

In [15]:
todos_archivos = glob(os.path.join(ruta_local_entrada, '*.nc'))

## Desarrollo de Automatización

In [16]:
%%capture
!pip install luigi netCDF4 h5netcdf scipy dask[complete] boto3 s3fs numpy zarr pyyaml tqdm

In [18]:
%%capture
!pip install "xarray[complete]"==2023.8.0 s3fs --user

In [4]:
%%capture
!pip install python-blosc

In [2]:
import xarray as xr
import s3fs
import numpy as np
import pandas as pd
import boto3
import os
from glob import glob
from botocore.exceptions import NoCredentialsError
import zarr
from datetime import datetime, timedelta

In [None]:
def kelvin_a_celsius(temp_k):
    return xr.where(np.isnan(temp_k), temp_k, temp_k - 273.15)

def procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3):
    # Patrones para los archivos de cada tipo de temperatura
    patrones = {
        'min': '*Min*.nc',
        'mean': '*Mean*.nc',
        'max': '*Max*.nc'
    }
    
    datasets = []
    
    for tipo, patron in patrones.items():
        archivos = glob(os.path.join(ruta_local_entrada, patron))
        archivos.sort()  # Asegurar que los archivos estén en orden
        
        # Abrir todos los archivos de este tipo como un único dataset
        ds = xr.open_mfdataset(archivos, combine='by_coords')
        
        # Convertir de Kelvin a Celsius
        var_name = f'Temperature_Air_2m_{tipo.capitalize()}_24h'
        ds[var_name] = kelvin_a_celsius(ds[var_name])
        ds[var_name].attrs['units'] = 'C'
        
        datasets.append(ds)
    
    # Combinar todos los datasets
    ds_final = xr.merge(datasets)
    
    # Configurar el sistema de archivos S3
    s3 = s3fs.S3FileSystem(anon=False)
    
    # Escribir a Zarr en S3
    zarr_store = s3fs.S3Map(root=f's3://{bucket_salida}/{ruta_salida_s3}', s3=s3)
    ds_final.to_zarr(store=zarr_store, mode='a')
    
    print(f"Datos procesados y guardados en S3: s3://{bucket_salida}/{ruta_salida_s3}")

In [None]:
# Arturo
    compressor = zarr.Blosc(cname='lz4', clevel= 1, shuffle=False)
    blosc.set_nthreads(8) 
    encoding = {vname: {'compressor': compressor,'chunks': (1,1,2000,7200)} for vname in nc.data_vars}
    with ProgressBar():
        #print("Hellooooo")
        nc.to_zarr(ageraS3,  mode='a', append_dim='time', consolidated=True)

In [4]:
import zarr
from dask.diagnostics import ProgressBar
import s3fs
import xarray as xr
import os
from glob import glob
from numcodecs import blosc

def kelvin_a_celsius(temp_kelvin):
    return temp_kelvin - 273.15

def procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3):
    # Patrones para los archivos de cada tipo de temperatura
    patrones = {
        'min': '*Min*.nc',
        'mean': '*Mean*.nc',
        'max': '*Max*.nc'
    }
    
    datasets = []
    
    for tipo, patron in patrones.items():
        archivos = glob(os.path.join(ruta_local_entrada, patron))
        archivos.sort()  # Asegurar que los archivos estén en orden temporal
        
        # Abrir todos los archivos de este tipo como un único dataset
        ds = xr.open_mfdataset(archivos, combine='by_coords')
        
        # Convertir de Kelvin a Celsius
        var_name = f'Temperature_Air_2m_{tipo.capitalize()}_24h'
        ds[var_name] = kelvin_a_celsius(ds[var_name])
        ds[var_name].attrs['units'] = 'C'
        
        datasets.append(ds)
    
    # Combinar todos los datasets
    ds_final = xr.merge(datasets)
    
    # Configuración de la compresión y chunks para Zarr
    compressor = zarr.Blosc(cname='lz4', clevel=1, shuffle=False)
    blosc.set_nthreads(8)
    
    # Configuración de encoding para cada variable de datos
    encoding = {var: {'compressor': compressor, 'chunks': (1, 1, 2000, 7200)} for var in ds_final.data_vars}
    
    # Configurar el sistema de archivos S3
    s3 = s3fs.S3FileSystem(anon=False)
    zarr_store = s3fs.S3Map(root=f's3://{bucket_salida}/{ruta_salida_s3}', s3=s3)
    
    # Escribir a Zarr en S3 con compresión y manejo de la dimensión temporal
    with ProgressBar():
        ds_final.to_zarr(zarr_store, mode='a', append_dim='time', consolidated=True, encoding=encoding)
    
    print(f"Datos procesados y guardados en S3: s3://{bucket_salida}/{ruta_salida_s3}")

In [None]:
ruta_local_entrada = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/test'
bucket_salida = 'climate-action-datalake'
ruta_salida_s3 = 'zone=raw/source=agera5-v1-1/variable=Temperature_Air_2m_24h.zarr'

procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3)

In [None]:
import zarr
from dask.diagnostics import ProgressBar
import s3fs
import xarray as xr
import os
import re
from collections import defaultdict
from numcodecs import blosc

def kelvin_a_celsius(temp_kelvin):
    return temp_kelvin - 273.15

def extraer_fecha_y_tipo(nombre_archivo):
    match = re.search(r'Temperature-Air-2m-(Max|Mean|Min)-24h.*_(\d{8})_', nombre_archivo)
    if match:
        tipo, fecha = match.groups()
        return fecha, tipo
    return '00000000', ''

def ordenar_archivos(directorio):
    archivos = [f for f in os.listdir(directorio) if f.endswith('.nc')]
    archivos_por_fecha = defaultdict(lambda: {'Max': None, 'Mean': None, 'Min': None})
    
    for archivo in archivos:
        fecha, tipo = extraer_fecha_y_tipo(archivo)
        if fecha != '00000000' and tipo:
            ruta_completa = os.path.join(directorio, archivo)
            archivos_por_fecha[fecha][tipo] = ruta_completa
    
    return dict(archivos_por_fecha)

def procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3, fecha_inicio, fecha_fin):
    # Convert fecha_inicio and fecha_fin to strings if they are integers
    fecha_inicio = str(fecha_inicio)
    fecha_fin = str(fecha_fin)
    
    archivos_por_fecha = ordenar_archivos(ruta_local_entrada)
    
    # Filtrar fechas dentro del rango especificado
    fechas_filtradas = {fecha: archivos for fecha, archivos in archivos_por_fecha.items()
                        if fecha_inicio <= fecha <= fecha_fin}
    
    # Configurar el sistema de archivos S3
    s3 = s3fs.S3FileSystem(anon=False)
    zarr_store = s3fs.S3Map(root=f's3://{bucket_salida}/{ruta_salida_s3}', s3=s3)
    
    # Configuración de la compresión para Zarr
    compressor = zarr.Blosc(cname='lz4', clevel=1, shuffle=False)
    blosc.set_nthreads(8)
    
    for fecha, archivos in sorted(fechas_filtradas.items()):
        print(f"Procesando fecha: {fecha}")
        datasets = []
        
        for tipo, ruta_archivo in archivos.items():
            if ruta_archivo is None:
                print(f"Advertencia: Falta archivo {tipo} para la fecha {fecha}")
                continue
            
            try:
                ds = xr.open_dataset(ruta_archivo)
                var_name = f'Temperature_Air_2m_{tipo}_24h'
                ds[var_name] = kelvin_a_celsius(ds[var_name])
                ds[var_name].attrs['units'] = 'C'
                datasets.append(ds)
            except Exception as e:
                print(f"Error al procesar {ruta_archivo}: {str(e)}")
        
        if not datasets:
            print(f"No se pudieron procesar datos para la fecha {fecha}")
            continue
        
        ds_dia = xr.merge(datasets)
        
        # Configurar encoding para cada variable
        chunk_sizes = {dim: min(2000, len(ds_dia[dim])) for dim in ds_dia.dims}
        encoding = {var: {'compressor': compressor, 'chunks': tuple(chunk_sizes[dim] for dim in ds_dia[var].dims)} 
                    for var in ds_dia.data_vars}
        
        # Escribir a Zarr en S3
        with ProgressBar():
            ds_dia.to_zarr(zarr_store, mode='a', append_dim='time', consolidated=True, encoding=encoding)
        
        print(f"Datos para la fecha {fecha} procesados y guardados en S3")
    
    print(f"Todos los datos procesados y guardados en S3: s3://{bucket_salida}/{ruta_salida_s3}")

def main():
    ruta_local_entrada = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/landing'
    bucket_salida = 'climate-action-datalake'
    ruta_salida_s3 = 'zone=raw/source=agera5-v1-1/variable=Temperature_Air_2m_24h.zarr'
    
    # Solicitar al usuario las fechas de inicio y fin
    fecha_inicio = input("Ingrese la fecha de inicio (YYYYMMDD): ")
    fecha_fin = input("Ingrese la fecha de fin (YYYYMMDD): ")
    
    # Procesar los archivos
    procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3, fecha_inicio, fecha_fin)

if __name__ == "__main__":
    main()

In [15]:
# Configurar la conexión a S3
s3 = s3fs.S3FileSystem(anon=False)

# Especificar la ubicación del archivo Zarr en S3
bucket_name = 'climate-action-datalake'
zarr_path = 'zone=raw/source=agera5-v1-1/variable=Temperature_Air_2m_24h.zarr'
s3_url = f's3://{bucket_name}/{zarr_path}'

# Abrir el archivo Zarr
ds = xr.open_zarr(s3fs.S3Map(s3_url, s3=s3))

# Mostrar información básica sobre el dataset
print(ds)

<xarray.Dataset>
Dimensions:                      (time: 17, lat: 1801, lon: 3600)
Coordinates:
  * lat                          (lat) float64 90.0 89.9 89.8 ... -89.9 -90.0
  * lon                          (lon) float64 -180.0 -179.9 ... 179.8 179.9
  * time                         (time) datetime64[ns] 2024-01-01 ... 2024-01-17
Data variables:
    Temperature_Air_2m_Max_24h   (time, lat, lon) float32 dask.array<chunksize=(1, 226, 900), meta=np.ndarray>
    Temperature_Air_2m_Mean_24h  (time, lat, lon) float32 dask.array<chunksize=(1, 226, 900), meta=np.ndarray>
    Temperature_Air_2m_Min_24h   (time, lat, lon) float32 dask.array<chunksize=(1, 226, 900), meta=np.ndarray>
Attributes:
    Conventions:  CF-1.7


In [None]:
# Procesar los archivos
ruta_local_entrada = '/home/ec2-user/SageMaker/datalake/data/agera/temperature/test'
bucket_salida = 'climate-action-datalake'
ruta_salida_s3 = 'zone=raw/source=agera5-v1-1/variable=Temperature_Air_2m_24h.zarr'
fecha_inicio = 19800101
fecha_fin = 19800131
procesar_archivos_nc(ruta_local_entrada, bucket_salida, ruta_salida_s3, fecha_inicio, fecha_fin)

if __name__ == "__main__":
    main()