In [1]:
import zarr
import dask.array as da
import numpy as np
import psutil

In [2]:
def slice_by_lat_index(lats, n_files, n_x):

    absolute_differences = np.abs(lats - 70.)
    nh_closest_index = np.argmin(absolute_differences)
    absolute_differences = np.abs(lats - 70*-1.0)
    sh_closest_index = np.argmin(absolute_differences)
    
    slice_size = n_files * n_x
    start_idx = sh_closest_index * slice_size
    end_idx = nh_closest_index + slice_size

    return int(start_idx), int(end_idx)

In [3]:
zarr_train = zarr.open(
    "/ocean/projects/ees220005p/gmooers/GM_Data/training_data/Training_Parts/New_Full_Training_dataset.zarr", mode='r')
zarr_test = zarr.open(
    "/ocean/projects/ees220005p/gmooers/GM_Data/training_data/Training_Parts/New_Full_Test_dataset.zarr", mode='r')

In [5]:
lats = zarr_train['lat']
lons = zarr_train['lon']
num_lats = len(zarr_train['lat'])
num_lons = len(zarr_train['lon'])
num_files_train = len(zarr_train['sample'])
num_files_test = len(zarr_test['sample'])

In [6]:
num_train_hours = num_files_train / (num_lats * num_lons)

In [7]:
start_idx, end_idx = slice_by_lat_index(lats[:], int(num_files_train/(num_lats*num_lons)), num_lons)

In [8]:
variables = ['Tin','qin','terra','sfc_pres',]
#'Tout', 'T_adv_out','q_adv_out','q_auto_out','q_sed_flux_tot']

In [9]:
splice = 100000

In [10]:
stats = {}
scaled_train_arrays = {}

for var in variables:
    # Load variable as Dask array
    train_dask_array = da.from_zarr(zarr_train[var])
    mean = train_dask_array.mean()
    std = train_dask_array.std()
    start_idx, end_idx = slice_by_lat_index(lats[:], num_train_hours, num_lons)
    train_dask_array[:start_idx] = 0
    train_dask_array[end_idx:] = 0
    if len(train_dask_array.shape) == 2:
        train_dask_array = train_dask_array[:, :splice]
    if len(train_dask_array.shape) == 1:
        train_dask_array = train_dask_array[:splice]

    # Compute mean and std
    stats[var] = {
        'mean': mean,
        'std': std,
    }

    # Apply scaling to the test array
    scaled_train_dask_array = (train_dask_array - mean) / std
    
    # Store the scaled array
    scaled_train_arrays[var] = scaled_train_dask_array

In [11]:
scaled_train_arrays

{'Tin': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'qin': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'terra': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'sfc_pres': dask.array<truediv, shape=(100000,), dtype=float32, chunksize=(100000,), chunktype=numpy.ndarray>}

In [39]:
dask_arrays = []
for key, array in scaled_train_arrays.items():
    if array.ndim == 1:
        array = array[np.newaxis, :]
    dask_arrays.append(array)

# Concatenate along axis 0
combined_array = da.concatenate(dask_arrays, axis=0)

In [42]:
type(combined_array)

dask.array.core.Array

In [41]:
combined_array.shape

(148, 100000)

In [34]:
type(train_dask_array)

dask.array.core.Array

In [35]:
scaled_train_arrays

{'Tin': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'qin': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'terra': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'sfc_pres': dask.array<truediv, shape=(100000,), dtype=float32, chunksize=(100000,), chunktype=numpy.ndarray>,
 'Tout': dask.array<truediv, shape=(49, 100000), dtype=float64, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'T_adv_out': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'q_adv_out': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'q_auto_out': dask.array<truediv, shape=(49, 100000), dtype=float32, chunksize=(49, 100000), chunktype=numpy.ndarray>,
 'q_sed_flux_tot': dask.array<truediv, shape=(49, 100000), dtype=float32, c

In [None]:
scaled_test_arrays = {}

# Scale the test set variables using the computed statistics
for var in variables:
    # Load variable as Dask array from the test set
    test_dask_array = da.from_zarr(zarr_test[var])
    
    # Retrieve mean and std from the train set
    mean = stats[var]['mean']
    std = stats[var]['std']
    
    # Apply scaling to the test array
    scaled_test_dask_array = (test_dask_array - mean) / std
    
    # Store the scaled array
    scaled_test_arrays[var] = scaled_test_dask_array

In [None]:
scaled_train_zarr_store = zarr.open(
    "/ocean/projects/ees220005p/gmooers/GM_Data/training_data/Training_Parts/New_Scaled_Full_Training_dataset.zarr", mode='w')
scaled_test_zarr_store = zarr.open(
    "/ocean/projects/ees220005p/gmooers/GM_Data/training_data/Training_Parts/New_Scaled_Full_Test_dataset.zarr", mode='w')

for var, scaled_array in scaled_train_arrays.items():
    scaled_array.to_zarr(scaled_train_zarr_store, component=var, overwrite=True)

In [None]:
for var, scaled_array in scaled_test_arrays.items():
    scaled_array.to_zarr(scaled_test_zarr_store, component=var, overwrite=True)