# GraphCast con datos ocenaográficos
---
Modifica el script ``nan``

Modelo usado:
* Random
* Remplazando todas las variables por la SST

Se tuvo en cuenta:
* Zona de estudio
* Escalas espaciales
* Zona de estudio

No se tuvo en cuenta:
* Escalas temporales
* Orden de los niveles verticales

Parámetros:
* Aleatorios
* latent size: ``2``
* gnn msg steps: ``4``
* random_mesh_size: ``9``
  
> **Conclusiones:** 
> 

In [4]:
import copernicusmarine
# Este módulo proporciona un decorador y funciones para crear clases con campos de datos, 
# similar a collections.namedtuple.
import dataclasses
# Este módulo proporciona clases para manipular fechas y horas.
import datetime
# Este módulo proporciona funciones de orden superior y operaciones de datos en general.
import functools
# Este módulo proporciona funciones matemáticas.
import math
# Este módulo proporciona operaciones de coincidencia de expresiones regulares.
import re
# Este módulo proporciona soporte para tipos opcionales.
from typing import Optional
# Este paquete proporciona funcionalidad para trabajar con sistemas de referencia de coordenadas en Cartopy, 
# una biblioteca de Python para dibujar mapas geoespaciales.
import cartopy.crs as ccrs
# Este paquete proporciona funcionalidad para interactuar con Google Cloud Storage, 
# un servicio de almacenamiento en la nube de Google.
from google.cloud import storage
# Este módulo proporciona implementaciones de modelos autoregresivos para series temporales.
from graphcast import autoregressive
# Este módulo proporciona funcionalidad para realizar operaciones de casting en datos.
from graphcast import casting
# Este módulo proporciona funcionalidad para el control de puntos de control 
# durante la ejecución de un programa.
from graphcast import checkpoint
# Este módulo proporciona utilidades para el manejo de datos en Graphcast.
from graphcast import data_utils
# Este módulo proporciona la implementación principal de Graphcast.
# from graphcast import graphcast
# Este módulo proporciona funcionalidad para normalizar datos en Graphcast.
from graphcast import normalization
# Este módulo proporciona funcionalidad para realizar operaciones de rollout en Graphcast.
from graphcast import rollout
# Este módulo proporciona funcionalidad para trabajar con arreglos multidimensionales etiquetados 
# y compatibles con JAX.
from graphcast import xarray_jax
# Este módulo proporciona funcionalidad para trabajar con estructuras de árbol 
# de arreglos multidimensionales etiquetados.
from graphcast import xarray_tree
# Esta clase proporciona funcionalidad para mostrar HTML en el contexto de IPython.
from IPython.display import HTML
# Este paquete proporciona interactividad en el notebook de IPython.
import ipywidgets as widgets
# Este paquete proporciona una biblioteca para construir redes neuronales en JAX.
import haiku as hk
# Este paquete proporciona funcionalidad para realizar cálculos numéricos de alto rendimiento 
# en dispositivos acelerados por GPU y CPU.
import jax
# Este paquete proporciona funcionalidad para la visualización de datos en Python.
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
# Este paquete proporciona funcionalidad para trabajar con arreglos y matrices en Python.
import numpy as np
# Este paquete proporciona funcionalidad para trabajar con arreglos multidimensionales etiquetados.
import xarray as xr


In [34]:
# Importamos paquete local
import sys, os
folders_lis = os.getcwd().split(os.sep)
repo_path = os.sep.join(folders_lis[:folders_lis.index('PhD_repo') + 1])
sys.path.append(repo_path + '\\src')
from art1_tools import graphcast_newvars
from art1_tools import norm

In [49]:
import importlib
# Reload the module
importlib.reload(graphcast_newvars)

<module 'art1_tools.graphcast_newvars' from 'c:\\Users\\gcuervo\\OneDrive - Universidad de Las Palmas de Gran Canaria\\Documents\\Doctorado\\PhD_repo\\src\\art1_tools\\graphcast_newvars.py'>

In [6]:
def parse_file_parts(file_name):
    """
    Parse a file name into parts separated by underscores and hyphens.

    Args:
        file_name (str): The name of the file to parse.

    Returns:
        dict: A dictionary containing the parsed parts of the file name. The keys are the parts before the first hyphen,
              and the values are the parts after the first hyphen in each section of the file name.

    Example:
        >>> parse_file_parts("example_part1-part2_part3-part4.txt")
        {'example_part1': 'part2', 'part3': 'part4'}
    """
    return dict(part.split("-", 1) for part in file_name.split("_"))

In [7]:
# Crear un cliente de Google Cloud Storage anónimo.
# gcs_client = storage.Client.create_anonymous_client()

# Obtener el bucket "dm_graphcast" del cliente de Google Cloud Storage.
# gcs_bucket = gcs_client.get_bucket("dm_graphcast")

## Cargamos Modelo random
---

In [8]:
def data_valid_for_model(
        file_name: str, model_config: graphcast_newvars.ModelConfig, task_config: graphcast_newvars.TaskConfig):
    """
    Verificar si los datos del archivo son válidos para el modelo y la tarea configurados.

    Args:
        file_name (str): El nombre del archivo de datos.
        model_config (graphcast.ModelConfig): La configuración del modelo.
        task_config (graphcast.TaskConfig): La configuración de la tarea.

    Returns:
        bool: True si los datos son válidos para el modelo y la tarea configurados, False de lo contrario.

    """
    file_parts = parse_file_parts(file_name.removesuffix(".nc"))
    return (
        model_config.resolution in (0, float(file_parts["res"])) and
        len(task_config.pressure_levels) == int(file_parts["levels"]) and
        (
            ("total_precipitation_6hr" in task_config.input_variables and
             file_parts["source"] in ("era5", "fake")) or
            ("total_precipitation_6hr" not in task_config.input_variables and
             file_parts["source"] in ("hres", "fake"))
        )
    )

## Cargamos un batch de datos
---

In [9]:
# https://data.marine.copernicus.eu/product/SST_ATL_SST_L4_REP_OBSERVATIONS_010_026/download?dataset=cmems-IFREMER-ATL-SST-L4-REP-OBS_FULL_TIME_SERIE_202012
satelite = "cmems-IFREMER-ATL-SST-L4-REP-OBS_FULL_TIME_SERIE"
min_lon, max_lon = (-20.97, -4.5) # IBI solo va hasta esta lon: -20.97
min_lat, max_lat = (19.55, 34.6)
start_date, end_date = ("2020-12-18", "2020-12-31") 
# Load xarray dataset
SST_SAT = copernicusmarine.open_dataset(dataset_id=satelite,
                                        minimum_longitude=min_lon, 
                                        maximum_longitude=max_lon,
                                        minimum_latitude=min_lat,
                                        maximum_latitude=max_lat,
                                        start_datetime=start_date,
                                        end_datetime=end_date,
                                        )
SST_SAT

Fetching catalog: 100%|██████████| 4/4 [00:17<00:00,  4.30s/it]


INFO - 2024-03-20T13:50:50Z - Dataset version was not specified, the latest one was selected: "202012"
INFO - 2024-03-20T13:50:50Z - Dataset part was not specified, the first one was selected: "default"




INFO - 2024-03-20T13:50:51Z - Service was not specified, the default one was selected: "arco-geo-series"


In [10]:
def scale(
    data: xr.Dataset,
    center: Optional[float] = None,
    robust: bool = False,
) -> tuple[xr.Dataset, matplotlib.colors.Normalize, str]:
    """
    Escala los datos para la visualización.

    Args:
        data (xr.Dataset): El conjunto de datos a escalar.
        center (Optional[float], optional): El centro para la escala. Por defecto es None.
        robust (bool, optional): Indica si se debe utilizar una escala robusta. Por defecto es False.

    Returns:
        tuple[xr.Dataset, matplotlib.colors.Normalize, str]: Una tupla que contiene los datos escalados, 
        el objeto de normalización y el mapa de colores.

    """
    vmin = np.nanpercentile(data, (2 if robust else 0))
    vmax = np.nanpercentile(data, (98 if robust else 100))
    if center is not None:
        diff = max(vmax - center, center - vmin)
        vmin = center - diff
        vmax = center + diff
    return (data, matplotlib.colors.Normalize(vmin, vmax),
            ("RdBu_r" if center is not None else "viridis"))

def select(
    data: xr.Dataset,
    variable: str,
    level: Optional[int] = None,
    max_steps: Optional[int] = None
) -> xr.Dataset:
    """
    Seleccione y filtre datos de un conjunto de datos Xr.

    Args:
        data (xr.Dataset): El conjunto de datos Xr del cual seleccionar.
        variable (str): El nombre de la variable a seleccionar.
        level (Optional[int], optional): El nivel específico a seleccionar (si corresponde). Por defecto es None.
        max_steps (Optional[int], optional): El número máximo de pasos de tiempo a seleccionar. Por defecto es None.

    Returns:
        xr.Dataset: El conjunto de datos seleccionado.

    """
    data = data[variable]
    if "batch" in data.dims:
        data = data.isel(batch=0)
    if max_steps is not None and "time" in data.sizes and max_steps < data.sizes["time"]:
        data = data.isel(time=range(0, max_steps))
    if level is not None and "level" in data.coords:
        data = data.sel(level=level)
    return data

In [11]:
def plot_data(
    data: dict[str, xr.Dataset],
    fig_title: str,
    plot_size: float = 5,
    robust: bool = False,
    cols: int = 4
) -> tuple[xr.Dataset, matplotlib.colors.Normalize, str]:
    """
    Visualiza los datos en un gráfico de animación.

    Args:
        data (dict[str, xr.Dataset]): Un diccionario de datos para visualizar.
        fig_title (str): El título de la figura.
        plot_size (float, optional): El tamaño de la trama. Por defecto es 5.
        robust (bool, optional): Indica si se debe utilizar una escala robusta. Por defecto es False.
        cols (int, optional): El número de columnas en el diseño de la trama. Por defecto es 4.

    Returns:
        tuple[xr.Dataset, matplotlib.colors.Normalize, str]: Una tupla que contiene los datos escalados, 
        el objeto de normalización y el mapa de colores.

    """
    first_data = next(iter(data.values()))[0]
    max_steps = first_data.sizes.get("time", 1)
    assert all(max_steps == d.sizes.get("time", 1)
               for d, _, _ in data.values())

    cols = min(cols, len(data))
    rows = math.ceil(len(data) / cols)
    # figure = plt.figure(figsize=(plot_size * 2 * cols,
    #                              plot_size * rows))
    figure = plt.figure(figsize=(15, 7))
    figure.suptitle(fig_title, fontsize=16)
    figure.subplots_adjust(wspace=0.05, hspace=0)
    figure.tight_layout()

    images = []
    for i, (title, (plot_data, norm, cmap)) in enumerate(data.items()):
        ax = figure.add_subplot(rows, cols, i+1,)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_title(title)
        im = ax.imshow(
            plot_data.isel(time=0, missing_dims="ignore"), norm=norm,
            origin="lower", cmap=cmap, aspect='auto')
        plt.colorbar(
            mappable=im,
            ax=ax,
            orientation="vertical",
            pad=0.02,
            aspect=16,
            shrink=0.75,
            cmap=cmap,
            extend=("both" if robust else "neither"))
        images.append(im)

    def update(frame):
            
        """
        Actualiza los datos mostrados en la animación.

        Args:
            frame (int): El número del frame a mostrar.

        """
        if "time" in first_data.dims:
            td = datetime.timedelta(
                microseconds=first_data["time"][frame].item() / 1000)
            figure.suptitle(f"{fig_title}, {td}", fontsize=16)
        else:
            figure.suptitle(fig_title, fontsize=16)
        for im, (plot_data, norm, cmap) in zip(images, data.values()):
            im.set_data(plot_data.isel(time=frame, missing_dims="ignore"))

    ani = animation.FuncAnimation(
        fig=figure, func=update, frames=max_steps, interval=250)
    folder = "C:\\Users\\gcuervo\\OneDrive - Universidad de Las Palmas de Gran Canaria\\Documents\\Doctorado\\images\\"
    ani.save(filename=folder + "test_5.gif", writer="pillow")
    plt.close(figure.number)
    return HTML(ani.to_jshtml())

In [13]:
SST_SAT

In [15]:
# Graficar datos de ejemplo
example_batch = SST_SAT
# Tamaño de la trama para la visualización.
plot_size = 7
plot_example_variable = "analysed_sst"
plot_example_level = 50
# Seleccionar los datos de ejemplo para graficar.
data = {
    " ": scale(select(example_batch, plot_example_variable, plot_example_level, 12),
               robust="Robust"),
}

# Título de la figura basado en la variable y nivel seleccionados.
fig_title = plot_example_variable
if "level" in example_batch[plot_example_variable].coords:
    fig_title += f" a {plot_example_level} m"

# Graficar los datos de ejemplo.
plot_data(data, fig_title, plot_size, "Robust")

## Matrices para normalización
---

In [27]:
folder = "C:\\Users\\gcuervo\\OneDrive - Universidad de Las Palmas de Gran Canaria\\Documents\\Doctorado\\DB\\GraphCast_data\\"
SST_SAT_mean_1982_2021 = xr.load_dataset(folder + "mean_sat_1982_2021.nc")
SST_SAT_std_1982_2021 = xr.load_dataset(folder + "std_sat_1982_2021.nc")
SST_SAT_diff_std_1982_2021 = xr.load_dataset(folder + "diff_std_sat_1982_2021.nc")

In [33]:
SST_SAT

## Confirguración del modelo
---

In [61]:
SST_SAT["longitude"].data.ptp() / SST_SAT["longitude"].size

0.04984802431610942

In [64]:
# Obtener la fuente seleccionada en las pestañas de selección.
# Obtener la fuente seleccionada en las pestañas de selección.
source = "Random"
random_mesh_size = 9 # How many times to split each triangle.
random_latent_size = 2 ** 1 #  MLP output layer size (embedding)
random_gnn_msg_steps = 4
# random_levels = 13
resolution = SST_SAT["longitude"].data.ptp() / SST_SAT["longitude"].size
# Comprobar la fuente seleccionada y configurar el modelo en consecuencia.
if source == "Random":
    # Configurar parámetros para el modelo aleatorio.
    params = None  # Se llenará a continuación
    state = {}
    model_config = graphcast_newvars.ModelConfig(
        resolution=resolution,
        mesh_size=random_mesh_size,
        latent_size=random_latent_size,
        gnn_msg_steps=random_gnn_msg_steps,
        hidden_layers=1,
        radius_query_fraction_edge_length=0.6)
    task_config = graphcast_newvars.TaskConfig(
        input_variables=graphcast_newvars.TASK_SST.input_variables,
        target_variables=graphcast_newvars.TASK_SST.target_variables,
        forcing_variables=graphcast_newvars.TASK_SST.forcing_variables,
        pressure_levels=graphcast_newvars.PRESSURE_LEVELS_SST,
        input_duration=graphcast_newvars.TASK_SST.input_duration,
    )

# Devolver la configuración del modelo.
print(model_config)

# Extraer datos de entrenamiento y evaluación
train_steps = 12
eval_steps = 12
# Extraer datos de entrada, objetivos y forzamientos para entrenamiento.
train_inputs, train_targets, train_forcings = data_utils.extract_inputs_targets_forcings(
    example_batch, target_lead_times=slice("6h", f"{train_steps * 6}h"),
    **dataclasses.asdict(task_config))

# Extraer datos de entrada, objetivos y forzamientos para evaluación.
eval_inputs, eval_targets, eval_forcings = data_utils.extract_inputs_targets_forcings(
    example_batch, target_lead_times=slice("6h", f"{eval_steps * 6}h"),
    **dataclasses.asdict(task_config))


ModelConfig(resolution=0.04984802431610942, mesh_size=9, latent_size=2, gnn_msg_steps=4, hidden_layers=1, radius_query_fraction_edge_length=0.6, mesh2grid_edge_normalization_factor=None)


TypeError: 'int' object is not iterable

In [None]:

# Construir funciones jitted, y posiblemente inicializar pesos aleatorios

def construct_wrapped_graphcast(
        model_config: graphcast.ModelConfig,
        task_config: graphcast.TaskConfig):
    """Construye y envuelve el Predictor de GraphCast."""
    # Predictor más profundo de un paso.
    predictor = graphcast.GraphCast(model_config, task_config)

    # Modifica las entradas/salidas a `graphcast.GraphCast` para manejar la conversión de/a
    # float32 de/a BFloat16.
    predictor = casting.Bfloat16Cast(predictor)

    # Modifica las entradas/salidas a `casting.Bfloat16Cast` para que la conversión de/a
    # BFloat16 ocurra después de aplicar la normalización a las entradas/objetivos.
    predictor = normalization.InputsAndResiduals(
        predictor,
        diffs_stddev_by_level=diffs_stddev_by_level,
        mean_by_level=mean_by_level,
        stddev_by_level=stddev_by_level)

    # Envuelve todo para que el modelo de un paso pueda producir trayectorias.
    predictor = autoregressive.Predictor(
        predictor, gradient_checkpointing=True)
    return predictor

# Transformación de Haiku que envuelve el predictor para ejecutar hacia adelante.
@hk.transform_with_state
def run_forward(model_config, task_config, inputs, targets_template, forcings):
    """Realiza una ejecución hacia adelante del modelo predictor."""
    predictor = construct_wrapped_graphcast(model_config, task_config)
    return predictor(inputs, targets_template=targets_template, forcings=forcings)


# Transformación de Haiku que envuelve la función de pérdida.
@hk.transform_with_state
def loss_fn(model_config, task_config, inputs, targets, forcings):
    """Calcula la pérdida del modelo predictor."""
    predictor = construct_wrapped_graphcast(model_config, task_config)
    loss, diagnostics = predictor.loss(inputs, targets, forcings)
    return xarray_tree.map_structure(
        lambda x: xarray_jax.unwrap_data(x.mean(), require_jax=True),
        (loss, diagnostics))


def grads_fn(params, state, model_config, task_config, inputs, targets, forcings):
    """Calcula los gradientes de la función de pérdida."""
    def _aux(params, state, i, t, f):
        (loss, diagnostics), next_state = loss_fn.apply(
            params, state, jax.random.PRNGKey(0), model_config, task_config,
            i, t, f)
        return loss, (diagnostics, next_state)
    (loss, (diagnostics, next_state)), grads = jax.value_and_grad(
        _aux, has_aux=True)(params, state, inputs, targets, forcings)
    return loss, diagnostics, next_state, grads


def with_configs(fn):
    """Envuelve una función con las configuraciones del modelo y la tarea."""
    return functools.partial(
        fn, model_config=model_config, task_config=task_config)


def with_params(fn):
    """Envuelve una función con los parámetros y el estado."""
    return functools.partial(fn, params=params, state=state)


def drop_state(fn):
    """Elimina el estado de salida de una función."""
    return lambda **kw: fn(**kw)[0]


# Compilar la inicialización de la transformación run_forward con Haiku y JAX JIT.
init_jitted = jax.jit(with_configs(run_forward.init))

# Si los parámetros no están inicializados, inicializa los parámetros y el estado.
if params is None:
    params, state = init_jitted(
        rng=jax.random.PRNGKey(0),
        inputs=train_inputs,
        targets_template=train_targets,
        forcings=train_forcings)

# Compilar la función de pérdida y los gradientes con Haiku, JAX JIT y las configuraciones del modelo y la tarea.
loss_fn_jitted = drop_state(with_params(jax.jit(with_configs(loss_fn.apply))))
grads_fn_jitted = with_params(jax.jit(with_configs(grads_fn)))

# Compilar la transformación run_forward con Haiku, JAX JIT y las configuraciones del modelo y la tarea.
run_forward_jitted = drop_state(with_params(jax.jit(with_configs(
    run_forward.apply))))
# Autoregressive rollout (bucle en Python)

# Verificar que la resolución del modelo coincida con la resolución de los datos.
# assert model_config.resolution in (0, 360. / eval_inputs.sizes["lon"]), (
assert model_config.resolution in (0, atmos_canarias.lon.data.ptp() / eval_inputs.sizes["lon"]), (
    "La resolución del modelo no coincide con la resolución de los datos. Probablemente desee "
    "volver a filtrar la lista de conjuntos de datos y descargar los datos correctos.")

# Imprimir las dimensiones de las entradas, objetivos y forzamientos de evaluación.

# Realizar la predicción a través de un proceso de despliegue autoregresivo en bloques.
# eval_inputs: 6H antes hasta tiempo actual t_o
# eval_targets: tiempo actual t_o hasta 3d despues
# eval_forcings: radiacion solar tiempo actual t_o hasta 3d despues
predictions = rollout.chunked_prediction(
    run_forward_jitted,
    rng=jax.random.PRNGKey(0),
    inputs=eval_inputs,
    targets_template=eval_targets * np.nan,
    forcings=eval_forcings)
predictions

# Calcular la pérdida utilizando la función de pérdida autoregresiva compilada
loss, diagnostics = loss_fn_jitted(
    rng=jax.random.PRNGKey(0),
    inputs=train_inputs,
    targets=train_targets,
    forcings=train_forcings)

# Imprimir la pérdida calculada
print("Pérdida:", float(loss))
# Plot predictions

# Tamaño del gráfico
plot_size = 5
plot_pred_max_steps = 12
plot_pred_variable = "2m_temperature"
plot_pred_level = 50
plot_pred_robust = "Robust"
# Determinar el número máximo de pasos de tiempo a graficar
plot_max_steps = min(predictions.dims["time"], plot_pred_max_steps)

# Seleccionar datos para graficar: objetivos, predicciones y diferencia entre objetivos y predicciones
data = {
    "Targets": scale(select(eval_targets, plot_pred_variable, plot_pred_level, plot_max_steps), robust=plot_pred_robust),
    "Predictions": scale(select(predictions, plot_pred_variable, plot_pred_level, plot_max_steps), robust=plot_pred_robust),
    "Diff": scale((select(eval_targets, plot_pred_variable, plot_pred_level, plot_max_steps) -
                select(predictions, plot_pred_variable, plot_pred_level, plot_max_steps)),
                robust=plot_pred_robust, center=0),
}

# Título de la figura
fig_title = plot_pred_variable
if "level" in predictions[plot_pred_variable].coords:
    fig_title += f" at {plot_pred_level} hPa"

# Llamar a la función para graficar los datos
msg_steps_dic[n] = [data, fig_title, plot_size, plot_pred_robust]