In [1]:
# first thing we do is to set the environment variable CUDA_VISIBLE_DEVICES to the GPU we want to use
import os
#Only CPUs
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# Specify the GPUs to use
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2"
nGPUs = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))
print('Using GPUs:', os.environ["CUDA_VISIBLE_DEVICES"], 'Total:', nGPUs)

Using GPUs: 0,1,2 Total: 3


In [2]:
import xarray as xr
import numpy as np
import zarr
import os, sys, time, glob, re

import tensorflow as tf
print(tf.version)
from tensorflow import keras
print(keras.__version__)

from keras import layers
from keras.layers import Layer
from keras import models, losses
from keras.regularizers import l1,l2
from keras.optimizers import Optimizer, Adam
from keras.callbacks import EarlyStopping
strategy = tf.distribute.MirroredStrategy()
# Set the logging level to suppress warnings
tf.get_logger().setLevel(tf.compat.v1.logging.ERROR)

# set the directory to current directory
root_dir = '/data/harish/'

sys.path.append(root_dir)
from libraries import *

2024-09-22 14:23:57.729010: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-22 14:23:57.740610: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-22 14:23:57.744378: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-22 14:23:57.753749: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


<module 'tensorflow._api.v2.version' from '/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/tensorflow/_api/v2/version/__init__.py'>
3.5.0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2')


I0000 00:00:1727029438.941665  176396 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1727029438.941981  176396 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1727029438.942294  176396 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1727029438.989717  176396 cuda_executor.cc:1015] successful NUMA node read from SysFS ha

<module 'tensorflow._api.v2.version' from '/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/tensorflow/_api/v2/version/__init__.py'>
3.5.0


In [3]:
# Checking if the GPU is available and if it is being used
def run_on_gpu(device_index):
    device_name = f'/device:GPU:{device_index}'
    with tf.device(device_name):
        a = tf.random.uniform([2, 2])
        b = tf.random.uniform([2, 2])
        c = tf.matmul(a, b)
        print(f"Running on {device_name}:")
        print(c.numpy())

# List available GPUs
gpus = tf.config.list_physical_devices('GPU')

# Run the function for each GPU
for i, gpu in enumerate(gpus):
    run_on_gpu(i)

Running on /device:GPU:0:
[[0.8024075  0.74058527]
 [0.79024094 0.72814053]]
Running on /device:GPU:1:
[[0.35040656 0.26347527]
 [0.5287429  0.41112804]]
Running on /device:GPU:2:
[[0.91092753 0.7168515 ]
 [1.219977   0.8869949 ]]


# Defining architecture and necessary functions

In [4]:
#----------------------------------------------------------------
# Build base U-net architecture
#----------------------------------------------------------------

def actv_switch(switch, negative_slope):
    '''
    Non-linear Activation switch - 0: linear; 1: non-linear
    If non-linear, then Leaky ReLU Activation function with negative_slope value as input
    '''
    if switch == 0:
        actv = "linear"
    else:
        actv = layers.LeakyReLU(negative_slope=negative_slope)
    return actv

class ReflectPadding2D(Layer):
    '''
    Reflection padding mode. 
    In reflection padding, the padded values are a reflection of the edge values of the input tensor. 
    For example, if the input is [1, 2, 3], reflection padding would give [2, 1, 1, 2, 3, 2].
    This padding is necessary when the filter size is beyond 3. 
    Since the input tensor is convolved with the filter, the filter goes out of bounds of the input tensor at the edges.
    Typically, it will increase the input tensor size by 2*pad_size in each dimension. 
    But after convolution, the output tensor size will be the same as the initial input tensor size.
    '''
    def __init__(self, pad_size, **kwargs):
        self.pad_size = pad_size
        super(ReflectPadding2D, self).__init__(**kwargs)

    def call(self, inputs):
        return tf.pad(inputs, [[0, 0], [self.pad_size, self.pad_size], [self.pad_size, self.pad_size], [0, 0]], mode='REFLECT')

    def get_config(self):
        config = super(ReflectPadding2D, self).get_config()
        config.update({"pad_size": self.pad_size})
        return config
    
def con2d(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value):
    '''
    input: input tensor
    out_channels: number of output feature maps
    filter: filter size
    dilation_rate: dilation rate
    stride: stride
    switch: activation switch
    negative_slope: negative slope for Leaky ReLU
    regularize_value: regularizer factor
    '''
    # Manually apply reflect padding
    pad_size = filter // 2
    # Use ReflectPadding2D instead of custom padding directly
    inp_padded = ReflectPadding2D(pad_size)(input)
    # Apply the convolutional layer with 'valid' padding since we've already padded the input
    return layers.Conv2D(out_channels, (filter, filter), dilation_rate=(dilation_rate, dilation_rate),
                                  strides=stride,
                                  activation=actv_switch(switch, negative_slope),
                                  padding="valid", #padding="same",
                                  use_bias=True,
                                  kernel_regularizer=l2(regularize_value),
                                  bias_regularizer=l2(regularize_value))(inp_padded)

# Residual convolution block
def Res_conv_block(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value):
    '''
    input: input tensor
    out_channels: number of output feature maps
    filter: filter size
    dilation_rate: dilation rate
    stride: stride
    switch: activation switch
    negative_slope: negative slope for Leaky ReLU
    regularize_value: regularizer factor
    '''
    y = con2d(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)
    y = con2d(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)
    # Residual connection
    y = layers.Add()([y, con2d(input, out_channels, 1, dilation_rate, stride, switch, negative_slope, regularize_value)])

    return y

# Convolution downsampling block
def Conv_down_block(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value):

    # Downsampling using the stride 2
    y = con2d(input, out_channels, filter, dilation_rate, 2, switch, negative_slope, regularize_value)
    y = con2d(y, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)
    y = con2d(y, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)

    return y

# Attention block
def Attention(input, num_heads, key_dim):

    layer = layers.MultiHeadAttention(num_heads, key_dim, attention_axes=None)
    y = layer(input, input)

    return y

# Convolution upsampling block using bilinear interpolation
def Conv_up_block(input, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value):

    # Upsampling using the stride 2 with transpose convolution
    y = layers.UpSampling2D(size=(2, 2), interpolation='bilinear')(input)
    y = con2d(y, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)
    y = con2d(y, out_channels, filter, dilation_rate, stride, switch, negative_slope, regularize_value)

    return y

#2D Pooling layer - inputs[0:avg;1:max, input tensor, pool size]
def pool2d(i, input, pool_size):
    if i == 0:
        return layers.AveragePooling2D(pool_size=(pool_size, pool_size),
                                            strides=None,
                                            padding='same',
                                            data_format=None)(input)
    else:
        return layers.MaxPooling2D(pool_size=(pool_size, pool_size),
                                            strides=None,
                                            padding='same',
                                            data_format=None)(input)


# Generator architecture
def Gen(inp_lat, inp_lon, out_lat, out_lon, chnl, out_vars, filter, dilation_rate, stride, switch, negative_slope, regulazier_value, num_heads, key_dim):
    '''
    inp_lat: input latitude
    inp_lon: input longitude
    out_lat: output latitude
    out_lon: output longitude
    chnl: number of input channels
    out_vars: number of output variables
    filter: filter size
    dilation_rate: dilation rate
    stride: stride
    switch: activation switch
    negative_slope: negative slope for Leaky ReLU
    regulazier_value: regularizer factor
    num_heads: number of attention heads
    key_dim: key dimension
    '''

    input = layers.Input(shape=(inp_lat, inp_lon, chnl))
    #y_static = layers.Input(shape=(out_lat, out_lon, 1))

    #y = layers.Concatenate(axis=-1)([input, y_static])

    y = input

    # Encoding path
    skips = []
    for n_out in [64, 128, 256]:
        y = Res_conv_block(y, n_out, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)
        skips.append(Res_conv_block(y, n_out // 4, filter, dilation_rate, stride, switch, negative_slope, regulazier_value))
        y = Conv_down_block(y, n_out, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)

    # Attention block
    y = Res_conv_block(y, 256, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)
    y = Attention(y, num_heads, key_dim)
    y = Res_conv_block(y, 256, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)
    y = Attention(y, num_heads, key_dim)
    y = Res_conv_block(y, 256, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)

    # Decoding path
    for i, n_out in enumerate([256, 128, 64]):
        y = Conv_up_block(y, n_out, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)
        y = layers.Concatenate(axis=-1)([y, skips[-(i + 1)]])
        y = Res_conv_block(y, n_out, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)

    y = Res_conv_block(y, 32, filter, dilation_rate, stride, switch, negative_slope, regulazier_value)

    y = con2d(y, 32, 1, dilation_rate, stride, 0, 0, regulazier_value)
    y = con2d(y, out_vars, 1, dilation_rate, stride, 0, 0, regulazier_value)

    #return models.Model(inputs=[input,y_static], outputs=y)
    return models.Model(inputs=input, outputs=y)

# Loading data

In [3]:
zarr_path = f'{root_dir}/rtma_i10fg_NYS_subset.zarr'
train_val_dates_range = ('2021-01-01T00', '2022-12-31T23')
# Define input/output window sizes
in_times = 3   # Example: 24 input hours (1 day)
out_times = 1   # Example: 6 output hours (6-hour prediction)

X_train_times, Y_train_times, X_val_times, Y_val_times = RTMA_data_splitting(zarr_path,train_val_dates_range,in_times,out_times,opt_test=False)

In [4]:
test_dates_range = ('2023-01-01T00', '2023-12-31T23')

X_test_times, Y_test_times = RTMA_data_splitting(zarr_path,test_dates_range,in_times,out_times,opt_test=True)

In [5]:
len(X_train_times)%(32*3)

18

In [6]:
ds = xr.open_zarr(zarr_path)
data = ds.i10fg#.transpose(..., 'time')
X_train = data.sel(time=X_train_times).transpose('sample', 'y', 'x','time_window')
Y_train = data.sel(time=Y_train_times).transpose('sample', 'y', 'x','time_window')
# To fit the samples equally into the GPUs and batches, we need to exclude the remaining bathces
X_train = X_train[:-int(len(X_train)%(32*3)),...]
Y_train = Y_train[:-int(len(Y_train)%(32*3)),...]
X_val = data.sel(time=X_val_times).transpose('sample', 'y', 'x','time_window')
Y_val = data.sel(time=Y_val_times).transpose('sample', 'y', 'x','time_window')
X_test = data.sel(time=X_test_times).transpose('sample', 'y', 'x','time_window')
Y_test = data.sel(time=Y_test_times).transpose('sample', 'y', 'x','time_window')

print('X_train',X_train.shape, 'Y_train',Y_train.shape, 
      'X_val',X_val.shape, 'Y_val',Y_val.shape, 
      'X_test',X_test.shape, 'Y_test',Y_test.shape)

X_train (14016, 256, 384, 3) Y_train (14016, 256, 384, 1) X_val (3456, 256, 384, 3) Y_val (3456, 256, 384, 1) X_test (8757, 256, 384, 3) Y_test (8757, 256, 384, 1)


In [8]:
X_train[:32*3,...]

Unnamed: 0,Array,Chunk
Bytes,108.00 MiB,108.00 MiB
Shape,"(96, 256, 384, 3)","(96, 256, 384, 3)"
Dask graph,1 chunks in 8 graph layers,1 chunks in 8 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 108.00 MiB 108.00 MiB Shape (96, 256, 384, 3) (96, 256, 384, 3) Dask graph 1 chunks in 8 graph layers Data type float32 numpy.ndarray",96  1  3  384  256,

Unnamed: 0,Array,Chunk
Bytes,108.00 MiB,108.00 MiB
Shape,"(96, 256, 384, 3)","(96, 256, 384, 3)"
Dask graph,1 chunks in 8 graph layers,1 chunks in 8 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,768.00 kiB
Shape,"(256, 384)","(256, 384)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 768.00 kiB 768.00 kiB Shape (256, 384) (256, 384) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",384  256,

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,768.00 kiB
Shape,"(256, 384)","(256, 384)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,768.00 kiB
Shape,"(256, 384)","(256, 384)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 768.00 kiB 768.00 kiB Shape (256, 384) (256, 384) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",384  256,

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,768.00 kiB
Shape,"(256, 384)","(256, 384)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.25 kiB,2.25 kiB
Shape,"(96, 3)","(96, 3)"
Dask graph,1 chunks in 6 graph layers,1 chunks in 6 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray
"Array Chunk Bytes 2.25 kiB 2.25 kiB Shape (96, 3) (96, 3) Dask graph 1 chunks in 6 graph layers Data type datetime64[ns] numpy.ndarray",3  96,

Unnamed: 0,Array,Chunk
Bytes,2.25 kiB,2.25 kiB
Shape,"(96, 3)","(96, 3)"
Dask graph,1 chunks in 6 graph layers,1 chunks in 6 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray


# Initialize a model

In [8]:
# -------------------------------------------------------------------------
# Model initialising and summary print 
# -------------------------------------------------------------------------
# Build the generator model
generator = Gen(X_train.shape[1], X_train.shape[2], Y_train.shape[1], Y_train.shape[2], 
                  X_train.shape[3], Y_train.shape[3], 5, 1, 1, 1, 0.2, 0, 1, 64)
print(generator.summary())

None


In [9]:
#----------------------------------------------------------------
# Defining loss functions
#----------------------------------------------------------------
MAE = tf.keras.losses.MeanAbsoluteError()
MSE = tf.keras.losses.MeanSquaredError()

early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

generator.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss=MAE,
    metrics=[MSE, MAE]
)

In [10]:
# Now train the model using the shuffled data
generator.fit(
    X_train,
    Y_train,
    batch_size=32,
    epochs=100,
    callbacks=[early_stopping],
    validation_data=(X_val, Y_val)
)

Epoch 1/100


I0000 00:00:1727011268.379322  111276 service.cc:146] XLA service 0x7ed08021cb20 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1727011268.379351  111276 service.cc:154]   StreamExecutor device (0): NVIDIA RTX A6000, Compute Capability 8.6
I0000 00:00:1727011268.379354  111276 service.cc:154]   StreamExecutor device (1): NVIDIA RTX A6000, Compute Capability 8.6
I0000 00:00:1727011268.379356  111276 service.cc:154]   StreamExecutor device (2): NVIDIA RTX A6000, Compute Capability 8.6
2024-09-22 09:21:08.806149: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-09-22 09:21:09.694947: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:531] Loaded cuDNN version 90201











2024-09-22 09:21:50.849697: E external/local_xla/xla/service/slow_operation_alarm.cc:65] Trying algorithm eng0{} for conv (f32[32,80,260,388]{3,2

ResourceExhaustedError: Graph execution error:

Detected at node StatefulPartitionedCall defined at (most recent call last):
  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/runpy.py", line 197, in _run_module_as_main

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/runpy.py", line 87, in _run_code

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel_launcher.py", line 17, in <module>

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/traitlets/config/application.py", line 1075, in launch_instance

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/kernelapp.py", line 701, in start

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 205, in start

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/asyncio/base_events.py", line 601, in run_forever

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/asyncio/base_events.py", line 1905, in _run_once

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/asyncio/events.py", line 80, in _run

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/kernelbase.py", line 534, in dispatch_queue

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/kernelbase.py", line 523, in process_one

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/kernelbase.py", line 429, in dispatch_shell

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/kernelbase.py", line 767, in execute_request

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/ipkernel.py", line 429, in do_execute

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/ipykernel/zmqshell.py", line 549, in run_cell

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3024, in run_cell

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3079, in _run_cell

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/async_helpers.py", line 129, in _pseudo_sync_runner

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3284, in run_cell_async

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3466, in run_ast_nodes

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3526, in run_code

  File "/tmp/ipykernel_110857/3588731260.py", line 2, in <module>

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/keras/src/backend/tensorflow/trainer.py", line 320, in fit

  File "/home/harish/miniconda3/envs/gUstNET/lib/python3.9/site-packages/keras/src/backend/tensorflow/trainer.py", line 121, in one_step_on_iterator

Out of memory while trying to allocate 30291136632 bytes.
	 [[{{node StatefulPartitionedCall}}]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info. This isn't available when running in Eager mode.
 [Op:__inference_one_step_on_iterator_22932]