# Conversion of Data to a Machine Learning Friendly Format

This notebook demonstrates taking a single NetCDF file and converting the file into analysis ready numpy arrays stored in a zarr file for later use in neural network training.

Specifically, after having loaded multiple NetCDF files from UM model data into a single Iris CubeList, and saving this CubeList to disk, this notebook will:<ul>

<li>Load the single NetCDF file back from disk.</li>
<li>Extract the desired cubes: cloud volume fraction, specific humidity, air pressure, and air temperature.</li>
<li>Combine cubes of the same feature where metadata differences have prevented concatenation.</li>
<li>Convert the cubes to numpy arrays.</li>
<li>Format the arrays into a desirable dimension: (Sample Number, Height Level, Feature).</li>
<li>Generate data for the desired target we want to make a prediction on (cloud base height at a level in a sample).</li>
<li>Normalize data where necessary.</li>
<li>Save the data to disk for later loading to perform ML tasks.</li></ul> 

Define imports:

In [1]:
import glob
import os
import pathlib
import re

import dask
import iris
import numpy as np
import zarr
from dask.diagnostics import ProgressBar, ResourceProfiler

import cbh_data_definitions  # used for testing the load back in of the data

### The original files are split on time, to make processing easier, concatenate the individual files together in iris and save back to disk:

In [2]:
CONCATENATE_INITIAL_FILES = False
if CONCATENATE_INITIAL_FILES:
    dev_indiv_files = os.environ["SCRATCH"] + "/cbh_data/dev_indiv/" + "*.pp"
    dev_indiv_paths = glob.glob(dev_indiv_files)
    dev_cubes = iris.load(dev_indiv_paths)
    dev_save_large_path = os.environ["SCRATCH"] + "/cbh_data/dev/dev_large.nc"
    iris.save(dev_cubes, dev_save_large_path)

    train_inidiv_files = (
        os.environ["SCRATCH"] + "/cbh_data/train_individual_files/" + "*.pp"
    )
    train_indiv_paths = glob.glob(train_inidiv_files)
    train_cubes = iris.load(train_indiv_paths)
    train_save_large_path = os.environ["SCRATCH"] + "/cbh_data/train/train_large.nc"
    iris.save(train_cubes, train_save_large_path)

### Define file paths:

In [3]:
root_data_directory = pathlib.Path(os.environ["SCRATCH"]) / "cbh_data"

paths_to_load = (
    root_data_directory / "dev" / "dev_large.nc"
)  # one large nc file of iris' concatenation of all small nc files
path_to_save_result = (
    root_data_directory / "analysis_ready" / "dev_eg.npz"
)  # ouput for numpy arrays
path_to_save_zarr = (
    root_data_directory / "analysis_ready" / "dev_eg.zarr"
)  # output for zarr files

Settings for the notebook, with each constant given a comment above for a purpose description:

In [4]:
# generates a positional encoding array for a feature of each height layer in the sample
GENERATE_POSITIONAL_ENCODING_ARRAYS = False
# adds the height layer number to every feature vector in the input array
if GENERATE_POSITIONAL_ENCODING_ARRAYS:
    CONCATENATE_POSITIONAL_ENCONDING_TO_FEATURE_VECTOR = False

# realises the input array computation in two halves to avoid memory constraints of large computation
COMPUTE_INPUT_ARRAY_IN_HALVES = False

FREE_UP_MEMORY_AFTER_TARGET_COMPUTATION = False

# show all samples where clouds exist in the final layer (none)
# the final layer is used as the desired classification in the case of no cloud base existance prediction
VERIFY_NO_FINAL_LAYER_CLOUDS = False

# do extra compute to find the number of samples with cloud bases in the dataset
COMPUTE_CLOUD_BASE_SAMPLE_NUMBER = True

# save npz or not
SAVE_NPZ = False

# perform some computations that may take a long while, but give more information for general understanding
PERFORM_LONG_COMPUTATIONS_FOR_EXTRA_INFO = True

# save the cloud base position as a class label, or a onehot vector
SAVE_ONEHOT_INSTEAD_OF_CLASS_LABEL = False
# one can easily be converted to the other, so only saving one is necessary

# Normalize the input data to a range [0,1]
NORMALIZE_INPUT_DATA = False
# defaults to false as models should be desined to normalize the data, and avoids issues like an unknown global maximum
# e.g. the max value of temperature in the loaded data can be different to the max temp of all possible input temps for your model

## Loading in the Cloud Base Height Data

In [5]:
cubes = iris.load(str(paths_to_load))

print("Find files complete, list of paths:", paths_to_load)

Find files complete, list of paths: /scratch/hsouth/cbh_data/train/train_large.nc


  cubes = iris.load(str(paths_to_load))
  cubes = iris.load(str(paths_to_load))
  cubes = iris.load(str(paths_to_load))
  cubes = iris.load(str(paths_to_load))
  cubes = iris.load(str(paths_to_load))
  cubes = iris.load(str(paths_to_load))


Show cube names:

In [6]:
print("Cube names:\n", [str(cube.name()) for cube in cubes])

print("\n" + "Example of cube metadata:", cubes[2].summary())

Cube names:
 ['m01s05i250', 'cloud_volume_fraction_in_atmosphere_layer', 'specific_humidity', 'air_pressure', 'air_temperature', 'convective_rainfall_flux', 'convective_snowfall_flux']

Example of cube metadata: specific_humidity / (unknown)       (time: 364; model_level_number: 70; latitude: 480; longitude: 640)
    Dimension coordinates:
        time                             x                        -             -               -
        model_level_number               -                        x             -               -
        latitude                         -                        -             x               -
        longitude                        -                        -             -               x
    Attributes:
        source                      'Data from Met Office Unified Model'
        um_version                  '10.9'


## Preprocess the data

### Extract the desired cubes: cloud volume fraction, specific humidity, air pressure, and air temperature

Cloud volume fraction will be used as our target for the problem, and the rest of the cubes are used as input.

In [7]:
def create_dataset(cubes):
    list_of_input_cubes = ["air_temperature", "air_pressure", "specific_humidity"]
    target_cube_name = ["cloud_volume_fraction_in_atmosphere_layer"]

    target_cube = iris.cube.CubeList(
        [cube for cube in cubes if (cube.long_name in target_cube_name)]
    )
    inp_cube = iris.cube.CubeList(
        [cube for cube in cubes if (cube.standard_name in list_of_input_cubes)]
    )

    return inp_cube, target_cube

Call the function defined above and verify success:

In [8]:
inp_cube, tar_cube = create_dataset(cubes)

print("input cube:\n", inp_cube, "\n")
print("target cubes:\n", tar_cube)

input cube:
 0: air_pressure / (Pa)                 (time: 364; model_level_number: 70; latitude: 480; longitude: 640)
1: air_temperature / (K)               (time: 364; model_level_number: 70; latitude: 480; longitude: 640) 

target cubes:
 0: cloud_volume_fraction_in_atmosphere_layer / (1) (time: 364; model_level_number: 70; latitude: 480; longitude: 640)


### Combine cubes of the same feature where metadata differences have prevented concatenation, while also extracting the numpy array of each cube

if duplicate cubes exist, concatenate them using numpy to avoid metadata matching issues:

In [9]:
def order_two_objects_by_len_ascending(obj1, obj2):
    len1 = len(obj1[1])
    len2 = len(obj2[1])
    if len1 >= len2:
        return obj2, obj1
    else:
        return obj1, obj2


def concatenate_same_cubes(cube_list):

    cube_name_dictionary = {}

    for cube in cube_list:
        # print('start cube load')
        cube_np_array = cube.core_data()
        # print('end load')

        cube_name = cube.name()

        try:
            # concat along the differing axis, forcast reference time
            # MUST CONCATENATE IN THE SAME ORDER FOR EACH ARRAY (Since dim len is diffent each array, we can have the
            short_arr, long_arr = order_two_objects_by_len_ascending(
                cube_np_array, cube_name_dictionary[cube_name]
            )
            cube_name_dictionary[cube_name] = np.concatenate(
                (short_arr, long_arr), axis=1
            )

            # print(cube_name_dictionary[cube_name].shape)

        except KeyError:
            cube_name_dictionary[cube_name] = cube_np_array

    return cube_name_dictionary

Call the function defined above and verify success:

In [10]:
inp_dict = concatenate_same_cubes(inp_cube)
tar_dict = concatenate_same_cubes(tar_cube)

print("Air Pressure array shape:", inp_dict["air_pressure"].shape)
print(
    "Cloud Volume array shape:",
    tar_dict["cloud_volume_fraction_in_atmosphere_layer"].shape,
)
print("Array types:", type(inp_dict["air_pressure"]))
print("Input cube arrays found:", inp_dict.keys())
print("Target cube arrays found:", tar_dict.keys())

Air Pressure array shape: (364, 70, 480, 640)
Cloud Volume array shape: (364, 70, 480, 640)
Array types: <class 'dask.array.core.Array'>
Input cube arrays found: dict_keys(['air_pressure', 'air_temperature'])
Target cube arrays found: dict_keys(['cloud_volume_fraction_in_atmosphere_layer'])


Combine dictionary elements to one array:

In [11]:
def combine_feats(dict_of_feats):

    add_dim_for_feature = [np.expand_dims(x, axis=0) for x in dict_of_feats.values()]
    feat_concat_array = np.concatenate(add_dim_for_feature, axis=0)
    return feat_concat_array

In [12]:
inp_array = combine_feats(inp_dict)
tar_array = combine_feats(tar_dict)

# verify and check dims
print("Dimensions to standardize for processing:")
print("Current Input Shape:", inp_array.shape)
print("Current Target Shape:", tar_array.shape)

Dimensions to standardize for processing:
Current Input Shape: (2, 364, 70, 480, 640)
Current Target Shape: (1, 364, 70, 480, 640)


Expand the dimensions of 'short' arrays to work in flattening (this applies in practice the smaller dev set of the data):

In [13]:
if len(inp_array.shape) == 4:
    time_time2_dims_to_add = [1, 2]
    inp_array = np.expand_dims(inp_array, time_time2_dims_to_add)
    tar_array = np.expand_dims(tar_array, time_time2_dims_to_add)
    print("New and correct shapes (should be 6 dims):")
    print(inp_array.shape)
    print(tar_array.shape)
elif len(inp_array.shape) == 5:
    time_time2_dims_to_add = [1]
    inp_array = np.expand_dims(inp_array, time_time2_dims_to_add)
    tar_array = np.expand_dims(tar_array, time_time2_dims_to_add)
    print("New and correct shapes (should be 6 dims):")
    print(inp_array.shape)
    print(tar_array.shape)

New and correct shapes (should be 6 dims):
(2, 1, 364, 70, 480, 640)
(1, 1, 364, 70, 480, 640)


In [14]:
print("Show array storage metadata:")
inp_array

Show array storage metadata:


Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,82.03 MiB
Shape,"(2, 1, 364, 70, 480, 640)","(1, 1, 1, 70, 480, 640)"
Count,8 Graph Layers,728 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 58.32 GiB 82.03 MiB Shape (2, 1, 364, 70, 480, 640) (1, 1, 1, 70, 480, 640) Count 8 Graph Layers 728 Chunks Type float32 numpy.ndarray",364  1  2  640  480  70,

Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,82.03 MiB
Shape,"(2, 1, 364, 70, 480, 640)","(1, 1, 1, 70, 480, 640)"
Count,8 Graph Layers,728 Chunks
Type,float32,numpy.ndarray


In [15]:
print("Show array storage metadata:")
tar_array

Show array storage metadata:


Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,82.03 MiB
Shape,"(1, 1, 364, 70, 480, 640)","(1, 1, 1, 70, 480, 640)"
Count,4 Graph Layers,364 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 29.16 GiB 82.03 MiB Shape (1, 1, 364, 70, 480, 640) (1, 1, 1, 70, 480, 640) Count 4 Graph Layers 364 Chunks Type float32 numpy.ndarray",364  1  1  640  480  70,

Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,82.03 MiB
Shape,"(1, 1, 364, 70, 480, 640)","(1, 1, 1, 70, 480, 640)"
Count,4 Graph Layers,364 Chunks
Type,float32,numpy.ndarray


### Flatten the arrays

Flatten time and lat/long down to a single dimension, sample number </br>
Function expects 6-d array where each expected dimension is named in the function - cube_num, time, time2, height, lat, long

In [16]:
def flatten_cubes_with_numpy(np_array):

    # print('input dimensions:', np_array.shape)

    cube_num, time, time2, height, lat, long = np_array.shape

    # # verify shape
    # print(np_array.shape)

    # swap axis of time and height to ensure flattening preserves height
    cube_array = np_array.transpose(0, 3, 1, 2, 4, 5)
    cubes_flattened = np.reshape(
        cube_array, (cube_num, height, (time * time2 * lat * long))
    )

    # print('new dimensions', cubes_flattened.shape)

    cube_to_return = cubes_flattened.T
    # remove unnecessary dimensions
    cube_to_return = cube_to_return.squeeze()
    return cube_to_return

In [17]:
dask.config.set(
    {"array.slicing.split_large_chunks": False}
)  # allow the potentially large chunk of data

inp_array = flatten_cubes_with_numpy(inp_array)
tar_array = flatten_cubes_with_numpy(tar_array)

# print('verify squeeze')
print("Shapes of flattened and transposed arrays:")
print("Input:", inp_array.shape)
print("Target:", tar_array.shape)

Shapes of flattened and transposed arrays:
Input: (111820800, 70, 2)
Target: (111820800, 70)


Rechunk large data to ensure large chunks are reduced for easier handling in dask:

In [18]:
tar_array = dask.array.rechunk(tar_array, chunks="auto")
print("Rechunked array storage metadata for target:")
tar_array

Rechunked array storage metadata for target:


Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,82.03 MiB
Shape,"(111820800, 70)","(307200, 70)"
Count,8 Graph Layers,364 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 29.16 GiB 82.03 MiB Shape (111820800, 70) (307200, 70) Count 8 Graph Layers 364 Chunks Type float32 numpy.ndarray",70  111820800,

Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,82.03 MiB
Shape,"(111820800, 70)","(307200, 70)"
Count,8 Graph Layers,364 Chunks
Type,float32,numpy.ndarray


In [19]:
inp_array = dask.array.rechunk(inp_array, chunks="auto")
print("Rechunked array storage metadata for input:")
inp_array

Rechunked array storage metadata for input:


Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,82.03 MiB
Shape,"(111820800, 70, 2)","(307200, 70, 1)"
Count,11 Graph Layers,728 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 58.32 GiB 82.03 MiB Shape (111820800, 70, 2) (307200, 70, 1) Count 11 Graph Layers 728 Chunks Type float32 numpy.ndarray",2  70  111820800,

Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,82.03 MiB
Shape,"(111820800, 70, 2)","(307200, 70, 1)"
Count,11 Graph Layers,728 Chunks
Type,float32,numpy.ndarray


In [20]:
# rechunk enforcing samples are kept together
inp_arr_chunks = inp_array.chunksize
inp_array = inp_array.rechunk((inp_arr_chunks[0] / 10, 70, 3))
inp_array

Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,16.41 MiB
Shape,"(111820800, 70, 2)","(30720, 70, 2)"
Count,12 Graph Layers,3640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 58.32 GiB 16.41 MiB Shape (111820800, 70, 2) (30720, 70, 2) Count 12 Graph Layers 3640 Chunks Type float32 numpy.ndarray",2  70  111820800,

Unnamed: 0,Array,Chunk
Bytes,58.32 GiB,16.41 MiB
Shape,"(111820800, 70, 2)","(30720, 70, 2)"
Count,12 Graph Layers,3640 Chunks
Type,float32,numpy.ndarray


For some more information about dask, display the dask object output for the input array

In [21]:
inp_array.dask

0,1
layer_type  MaterializedLayer  is_materialized  True  number of outputs  1,

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,1

0,1
"layer_type  Blockwise  is_materialized  False  number of outputs  364  shape  (364, 70, 480, 640)  dtype  float32  chunksize  (1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on original-array-043f5e6943b7a33149bf97c372c61ae3",364  1  640  480  70

0,1
layer_type,Blockwise
is_materialized,False
number of outputs,364
shape,"(364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,original-array-043f5e6943b7a33149bf97c372c61ae3

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  364  shape  (1, 364, 70, 480, 640)  dtype  float32  chunksize  (1, 1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on array-043f5e6943b7a33149bf97c372c61ae3",364  1  640  480  70

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,364
shape,"(1, 364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,array-043f5e6943b7a33149bf97c372c61ae3

0,1
layer_type  MaterializedLayer  is_materialized  True  number of outputs  1,

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,1

0,1
"layer_type  Blockwise  is_materialized  False  number of outputs  364  shape  (364, 70, 480, 640)  dtype  float32  chunksize  (1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on original-array-4d4446267dd746d5fc092c99a872e3fa",364  1  640  480  70

0,1
layer_type,Blockwise
is_materialized,False
number of outputs,364
shape,"(364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,original-array-4d4446267dd746d5fc092c99a872e3fa

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  364  shape  (1, 364, 70, 480, 640)  dtype  float32  chunksize  (1, 1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on array-4d4446267dd746d5fc092c99a872e3fa",364  1  640  480  70

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,364
shape,"(1, 364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,array-4d4446267dd746d5fc092c99a872e3fa

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  728  shape  (2, 364, 70, 480, 640)  dtype  float32  chunksize  (1, 1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on reshape-a6b1da9ea560485c09b5dfd1762ac752  reshape-c2d7feb8b031dbeb06e0cdcbabc4ef0e",364  2  640  480  70

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,728
shape,"(2, 364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,reshape-a6b1da9ea560485c09b5dfd1762ac752
,reshape-c2d7feb8b031dbeb06e0cdcbabc4ef0e

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  728  shape  (2, 1, 364, 70, 480, 640)  dtype  float32  chunksize  (1, 1, 1, 70, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on concatenate-1cf86a0c3a1188376b205e1c12c299b1",364  1  2  640  480  70

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,728
shape,"(2, 1, 364, 70, 480, 640)"
dtype,float32
chunksize,"(1, 1, 1, 70, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,concatenate-1cf86a0c3a1188376b205e1c12c299b1

0,1
"layer_type  Blockwise  is_materialized  False  number of outputs  728  shape  (2, 70, 1, 364, 480, 640)  dtype  float32  chunksize  (1, 70, 1, 1, 480, 640)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on reshape-3463245907aa8925470a557c0a012d9c",1  70  2  640  480  364

0,1
layer_type,Blockwise
is_materialized,False
number of outputs,728
shape,"(2, 70, 1, 364, 480, 640)"
dtype,float32
chunksize,"(1, 70, 1, 1, 480, 640)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,reshape-3463245907aa8925470a557c0a012d9c

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  728  shape  (2, 70, 111820800)  dtype  float32  chunksize  (1, 70, 307200)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on transpose-cd2b957511e76006851918997c478f51",111820800  70  2

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,728
shape,"(2, 70, 111820800)"
dtype,float32
chunksize,"(1, 70, 307200)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,transpose-cd2b957511e76006851918997c478f51

0,1
"layer_type  Blockwise  is_materialized  False  number of outputs  728  shape  (111820800, 70, 2)  dtype  float32  chunksize  (307200, 70, 1)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on reshape-065e04cb952e78e47071a4a85c32675a",2  70  111820800

0,1
layer_type,Blockwise
is_materialized,False
number of outputs,728
shape,"(111820800, 70, 2)"
dtype,float32
chunksize,"(307200, 70, 1)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,reshape-065e04cb952e78e47071a4a85c32675a

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  10920  shape  (111820800, 70, 2)  dtype  float32  chunksize  (30720, 70, 2)  type  dask.array.core.Array  chunk_type  numpy.ndarray  depends on transpose-6f565cd15db3387f371048cefc6e23e3",2  70  111820800

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,10920
shape,"(111820800, 70, 2)"
dtype,float32
chunksize,"(30720, 70, 2)"
type,dask.array.core.Array
chunk_type,numpy.ndarray
depends on,transpose-6f565cd15db3387f371048cefc6e23e3


In [22]:
tar_arr_chunks = tar_array.chunksize
tar_array = tar_array.rechunk((tar_arr_chunks[0] / 10, 70))
tar_array

Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,8.20 MiB
Shape,"(111820800, 70)","(30720, 70)"
Count,9 Graph Layers,3640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 29.16 GiB 8.20 MiB Shape (111820800, 70) (30720, 70) Count 9 Graph Layers 3640 Chunks Type float32 numpy.ndarray",70  111820800,

Unnamed: 0,Array,Chunk
Bytes,29.16 GiB,8.20 MiB
Shape,"(111820800, 70)","(30720, 70)"
Count,9 Graph Layers,3640 Chunks
Type,float32,numpy.ndarray


## Preprocess the data toward ML algorithm input

### Generate data for the target of cloud base at certain height

preprocess the target
for the target, we define a cloud existing in a height layer:
if the cloud volume fraction is greater than 2 out of possible 8 oktas </br>
the cell below finds the first occurrences where the cloud volume is greater than the threshold marking a 1 in the array location, and stores 0 otherwise. </br>
Later, the final height layer will be marker for samples without a cloud base

In [27]:
# See an example of target array values
print(
    "Example of cloud volume samples (first 10 samples, first 30 layers):\n",
    tar_array[0:10, 0:30].compute(),
)
if PERFORM_LONG_COMPUTATIONS_FOR_EXTRA_INFO:
    print("Maximum value in data:", np.max(tar_array).compute())

Example of cloud volume samples (first 10 samples, first 30 layers):
 [[0.140625 0.125    0.09375  0.0625   0.015625 0.       0.       0.
  0.       0.       0.       0.       0.       0.       0.03125  0.140625
  0.359375 0.609375 0.6875   0.53125  0.390625 0.296875 0.234375 0.1875
  0.140625 0.125    0.125    0.125    0.140625 0.203125]
 [0.140625 0.109375 0.09375  0.046875 0.       0.       0.       0.
  0.       0.       0.       0.       0.       0.       0.09375  0.265625
  0.53125  0.671875 0.640625 0.46875  0.359375 0.28125  0.21875  0.171875
  0.125    0.125    0.125    0.125    0.15625  0.21875 ]
 [0.140625 0.125    0.09375  0.046875 0.       0.       0.       0.
  0.       0.       0.       0.       0.       0.       0.078125 0.265625
  0.53125  0.671875 0.640625 0.46875  0.359375 0.28125  0.21875  0.171875
  0.125    0.125    0.125    0.125    0.15625  0.21875 ]
 [0.140625 0.125    0.09375  0.046875 0.       0.       0.       0.
  0.       0.       0.       0.       0.     

In [23]:
cloud_threshold = 2.0 / 8.0
cloud_over_threshold = dask.array.where(tar_array > cloud_threshold)

In [24]:
%%time
# realize the values for the where condition (dask array to numpy array)
print("Start compute")
dask.compute(cloud_over_threshold[0], cloud_over_threshold[1])
sample_with_cloud = cloud_over_threshold[0].compute()
index_on_sample = cloud_over_threshold[1].compute()

Start compute


RuntimeError: NetCDF: Not a valid ID

Remove repeat indicies, e.g. where there are multiple layers above the cloud threshold, we only want the first occurence in a sample (the base):

In [25]:
%%time
_, first_duplicate_indicies = np.unique(sample_with_cloud, return_index=True)

if COMPUTE_CLOUD_BASE_SAMPLE_NUMBER:
    print("Start duplicate indicies compute")
    first_duplicate_indicies = first_duplicate_indicies.compute()
    print("Number of cloud bases found:", first_duplicate_indicies.shape)
    print("Out of samples:", tar_array.shape[0])

NameError: name 'sample_with_cloud' is not defined

For clouds where no base was found, add a marker at the final height layer
(where no cloud volume over threshold appears in the data).

In [31]:
%%time

# encode the cloud in onehot vector
one_hot_encoded_bases = np.zeros(tar_array.shape)
one_hot_encoded_bases[
    sample_with_cloud[first_duplicate_indicies],
    index_on_sample[first_duplicate_indicies],
] = 1
# mark the end (final layer) if no cloud base detected
flip = lambda booleanVal: not booleanVal
vflip = np.vectorize(flip)
one_hot_encoded_bases[np.where(vflip(np.any(one_hot_encoded_bases, axis=1)))[0], -1] = 1

# Now reduce vectors as if each height layer is treated as a class where the model will predict, onehot -> class label e.g. 0,0,1,0, -> 2
class_label_encoded_bases = np.argmax(one_hot_encoded_bases, axis=1)

CPU times: user 111 ms, sys: 61 ms, total: 172 ms
Wall time: 170 ms


In [32]:
print("Target as class label:", class_label_encoded_bases.shape)
print("Output dim:", one_hot_encoded_bases.shape)

Target as class label: (307200,)
Output dim: (307200, 70)


Compute and unmask target array (cloud volume):

In [33]:
%%time
print("Current type of target array:", type(tar_array))
print("Target shape:", tar_array.shape)
tar_array = tar_array.compute()
print("Finished compute of target array")

num_of_masked = np.ma.count_masked(tar_array)
print("Number of masked values after computation:", num_of_masked)
assert num_of_masked == 0

# unmask
tar_array = np.ma.filled(tar_array, np.nan)

Current type of target array: <class 'dask.array.core.Array'>
Target shape: (307200, 70)
Finished compute of target array
Number of masked values after computation: 0
CPU times: user 125 ms, sys: 158 ms, total: 283 ms
Wall time: 321 ms


In [35]:
if VERIFY_NO_FINAL_LAYER_CLOUDS:
    # verify the claim that no cloud bases appear in the final layer
    # can be strengthened to, no clouds exist in the final layer (next line returns 0)
    print(
        "list of clouds at final height level:",
        np.where(tar_array[:, -1] > cloud_threshold),
    )

### Show some samples of what has been produced

In [36]:
one_hot_encoded_bases

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.]])

In [37]:
class_label_encoded_bases

array([16, 15, 15, ...,  0,  0,  0])

In [38]:
tar_array

array([[0.140625, 0.125   , 0.09375 , ..., 0.      , 0.      , 0.      ],
       [0.140625, 0.109375, 0.09375 , ..., 0.      , 0.      , 0.      ],
       [0.140625, 0.125   , 0.09375 , ..., 0.      , 0.      , 0.      ],
       ...,
       [0.90625 , 1.      , 1.      , ..., 0.      , 0.      , 0.      ],
       [1.      , 1.      , 1.      , ..., 0.      , 0.      , 0.      ],
       [0.953125, 1.      , 1.      , ..., 0.      , 0.      , 0.      ]],
      dtype=float32)

In [39]:
# optionally, free up some memory
if FREE_UP_MEMORY_AFTER_TARGET_COMPUTATION:
    del sample_with_cloud
    del cloud_over_threshold
    del first_duplicate_indicies
    del index_on_sample
    del tar_dict
    del tar_cube
    del cubes

In [40]:
if SAVE_ONEHOT_INSTEAD_OF_CLASS_LABEL:
    del class_label_encoded_bases
else:
    del one_hot_encoded_bases

In [41]:
# get information for correct chunking
print(class_label_encoded_bases.shape)
print(tar_array.shape)

(307200,)
(307200, 70)


### Normalize Input Data

For the normalization of input data: we first transpose the input array so that the feature dimension is at the top level of the array, and numpy has an easier time accessing all values of the same feature. Then all values are normalized by being scaled in the range \[0,1\]

(must investigate mistake relating to ptp of local datasets instead of global values and make changes)

In [42]:
print("Current input array type:", type(inp_array))

Current input array type: <class 'dask.array.core.Array'>


In [43]:
%%time
if NORMALIZE_INPUT_DATA:
    inp_array = inp_array.T
    inp_array = (inp_array - np.min(inp_array, axis=(1, 2)).reshape((3, 1, 1))) / (
        np.ptp(inp_array, axis=(1, 2)).reshape((3, 1, 1))
    )
    inp_array = inp_array.T

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 8.58 µs


In [44]:
# show 5 samples, first 5 height layers only, displaying all features in the layer (features not indexed)
# (automatic numpy array display reduction is quite large for this array)
inp_array[0:5, 0:5, :].compute()

masked_array(
  data=[[[6.7985125e+04, 2.2162500e+02, 3.4332275e-05],
         [6.7748250e+04, 2.2237500e+02, 3.9577484e-05],
         [6.7418625e+04, 2.2262500e+02, 4.4167042e-05],
         [6.6998500e+04, 2.2450000e+02, 5.4359436e-05],
         [6.6494125e+04, 2.2787500e+02, 7.3134899e-05]],

        [[6.7993625e+04, 2.2162500e+02, 3.4332275e-05],
         [6.7756000e+04, 2.2300000e+02, 3.9994717e-05],
         [6.7426000e+04, 2.2387500e+02, 4.6372414e-05],
         [6.7006125e+04, 2.2587500e+02, 5.6684017e-05],
         [6.6501500e+04, 2.2937500e+02, 7.4386597e-05]],

        [[6.8000250e+04, 2.2162500e+02, 3.4749508e-05],
         [6.7762250e+04, 2.2262500e+02, 3.9219856e-05],
         [6.7432000e+04, 2.2387500e+02, 4.6372414e-05],
         [6.7012125e+04, 2.2587500e+02, 5.6862831e-05],
         [6.6507375e+04, 2.2925000e+02, 7.4326992e-05]],

        [[6.8005000e+04, 2.2150000e+02, 3.4868717e-05],
         [6.7766875e+04, 2.2237500e+02, 3.8981438e-05],
         [6.7436375e+04, 2.2

#### Save a selection of wanted arrays (inp_array, tar_array, one_hot_encoded_bases)

Now to save the computed array </br>
(will not save one of class label output or one_hot as easy conversion between the two)</br>
(went with saving one-hot to emulate the data produced/used by base solution)

In [45]:
# verify input and output shapes
print("Input dim:", inp_array.shape)

Input dim: (307200, 70, 3)


The following cell is code to create a positional encoding for the height layers in the data, e.g. the data at height layer 0 would have the positional encoding of: 0 as part of the input feature. It is commented out as PyTorch Dataloaders are found to have the capability to produce this information at load-time, which seems like a better option than creating a potentially huge array for each position that is scaled up to the size of the sample number redundantly.

In [46]:
# create an extra positional encoding optionally for input use
if GENERATE_POSITIONAL_ENCODING_ARRAYS:
    sample_num, height_dim, _ = inp_array.shape
    # generate height values
    height_position_vector = np.arange(height_dim)
    # extend dimensions out to match input feats
    height_position_vector = np.repeat([height_position_vector], sample_num, axis=0)

    # verify
    print("shape of encoding vector:", height_position_vector.shape)

    x, y = height_position_vector.shape
    # add a dimension for height to act as a feature
    height_position_vector = height_position_vector.reshape(x, y, 1)

    # fit the dtype of the feature to match the dtype of other feats
    height_position_vector = height_position_vector.astype(inp_array.dtype)

    # combine height feature into input array
    if CONCATENATE_POSITIONAL_ENCONDING_TO_FEATURE_VECTOR:
        inp_array = np.concatenate(
            (height_position_vector, inp_array), axis=2, dtype=np.float32
        )  # leave the concat for within the model after producing embedding

    # verify datatypes
    print("input dtype", inp_array.dtype)
    print("height encoding dtype", height_position_vector.dtype)

In [52]:
%%time
if SAVE_NPZ:
    print("Saving numpy arrays")

    with open(path_to_save_result, "w+b") as f:
        # variable assignment that name the arrays for the saved file
        if SAVE_ONEHOT_INSTEAD_OF_CLASS_LABEL:
            output_labels = one_hot_encoded_bases
        else:
            output_labels = class_label_encoded_bases
        output_cloud_volume = tar_array

        if GENERATE_POSITIONAL_ENCODING_ARRAYS:
            if CONCATENATE_POSITIONAL_ENCONDING_TO_FEATURE_VECTOR:
                np.savez(
                    f,
                    input_x=inp_array,
                    output_cloud_volume=output_cloud_volume,
                    output_labels=output_labels,
                )
            else:
                np.savez(
                    f,
                    input_x=inp_array,
                    output_cloud_volume=output_cloud_volume,
                    output_labels=output_labels,
                    height_position_vector=height_position_vector,
                )
        else:
            input_x = inp_array.compute()

            np.savez(
                f,
                input_x=input_x,
                output_cloud_volume=output_cloud_volume,
                output_labels=output_labels,
            )

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.2 µs


## Convert numpy arrays to zarr files

Store the numpy arrays into zarr, which will chunk and compress each array:

In [44]:
# the ncdf files are stored in chunks of 307200
# we want our zarr to chunk across dimensions reasonably small
sample_chunking = 102400  # hardcoded 1/3rd of a days data
height_sample = int(tar_array.shape[1])  # want to keep height layers together
feat_num_for_chunks = 3  # keeping features together on input

In [45]:
tar_array

array([[0.140625, 0.125   , 0.09375 , ..., 0.      , 0.      , 0.      ],
       [0.140625, 0.109375, 0.09375 , ..., 0.      , 0.      , 0.      ],
       [0.140625, 0.125   , 0.09375 , ..., 0.      , 0.      , 0.      ],
       ...,
       [0.90625 , 1.      , 1.      , ..., 0.      , 0.      , 0.      ],
       [1.      , 1.      , 1.      , ..., 0.      , 0.      , 0.      ],
       [0.953125, 1.      , 1.      , ..., 0.      , 0.      , 0.      ]],
      dtype=float32)

In [46]:
print(class_label_encoded_bases.shape)
class_label_encoded_bases

(307200,)


array([16, 15, 15, ...,  0,  0,  0])

In [47]:
inp_array.rechunk(sample_chunking, height_sample, feat_num_for_chunks)

Unnamed: 0,Array,Chunk
Bytes,246.09 MiB,82.03 MiB
Shape,"(307200, 70, 3)","(102400, 70, 3)"
Count,16 Graph Layers,3 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 246.09 MiB 82.03 MiB Shape (307200, 70, 3) (102400, 70, 3) Count 16 Graph Layers 3 Chunks Type float32 numpy.ndarray",3  70  307200,

Unnamed: 0,Array,Chunk
Bytes,246.09 MiB,82.03 MiB
Shape,"(307200, 70, 3)","(102400, 70, 3)"
Count,16 Graph Layers,3 Chunks
Type,float32,numpy.ndarray


In [48]:
%%time
store = zarr.DirectoryStore(path_to_save_zarr)
# define objected for arrays to be grouped under

zarr_grouping = zarr.group(store=store, overwrite=True)

# initialize and then write on zarr arrays for all desired arrays to be saved

cloud_volume_fraction_y = zarr_grouping.zeros(
    shape=tar_array.shape,
    dtype=tar_array.dtype,
    name="cloud_volume_fraction_y.zarr",
    chunks=(sample_chunking, height_sample),
)
print("Start cloud volume save")
cloud_volume_fraction_y[:] = tar_array

if SAVE_ONEHOT_INSTEAD_OF_CLASS_LABEL:
    cloud_base_onehot_y = zarr_grouping.zeros(
        shape=one_hot_encoded_bases.shape,
        dtype=one_hot_encoded_bases.dtype,
        name="cloud_base_onehot_y.zarr",
        chunks=(sample_chunking),
    )
    print("Start base label save")
    cloud_base_onehot_y[:] = one_hot_encoded_bases
else:
    cloud_base_label_y = zarr_grouping.zeros(
        shape=class_label_encoded_bases.shape,
        dtype=class_label_encoded_bases.dtype,
        name="cloud_base_label_y.zarr",
        chunks=(sample_chunking),
    )
    print("Start base label save")
    cloud_base_label_y[:] = class_label_encoded_bases

Start cloud volume save
Start base label save
CPU times: user 130 ms, sys: 6.41 ms, total: 136 ms
Wall time: 75.5 ms


In [51]:
if type(inp_array) == np.ndarray:
    humidity_temp_pressure_x = zarr_grouping.zeros(
        shape=inp_array.shape,
        dtype=inp_array.dtype,
        name="humidity_temp_pressure_x.zarr",
        chunks=(sample_chunking, height_sample, feat_num_for_chunks),
    )
    print("Start input save")
    humidity_temp_pressure_x[:] = inp_array
else:
    print("Start input save")
    with ProgressBar(), ResourceProfiler(5):
        inp_array.to_zarr(
            path_to_save_zarr,
            "humidity_temp_pressure_x.zarr",
            overwrite=True,
            compute=True,
            return_stored=False,
        )

Start input save
[########################################] | 100% Completed | 933.24 ms


In [None]:
if GENERATE_POSITIONAL_ENCODING_ARRAYS:
    if not CONCATENATE_POSITIONAL_ENCONDING_TO_FEATURE_VECTOR:
        height_position_x = zarr_grouping.zeros(
            shape=height_position_vector.shape,
            dtype=height_position_vector.dtype,
            name="height_position_x.zarr",
            chunks=(sample_chunking, height_sample, feat_num_for_chunks),
        )

        print("Start encoded height save")
        height_position_x[:] = height_position_vector

In [52]:
# output some summary for zarr
# view group values
printF = lambda obj: print(obj)
print("Elements of zarr group:")
zarr_grouping.visitvalues(printF)
# view group tree
print("\nTree of zarr group:\n", zarr_grouping.tree())
# see chunk size
print("\nShape array example:", cloud_volume_fraction_y.shape)
print("\nZarr chunking shape of an array:", cloud_base_label_y.chunks)

Elements of zarr group:
<zarr.core.Array '/cloud_base_label_y.zarr' (307200,) int64>
<zarr.core.Array '/cloud_volume_fraction_y.zarr' (307200, 70) float32>
<zarr.core.Array '/humidity_temp_pressure_x.zarr' (307200, 70, 3) float32>

Tree of zarr group:
 /
 ├── cloud_base_label_y.zarr (307200,) int64
 ├── cloud_volume_fraction_y.zarr (307200, 70) float32
 └── humidity_temp_pressure_x.zarr (307200, 70, 3) float32

Shape array example: (307200, 70)

Zarr chunking shape of an array: (102400,)


In [53]:
def load_data_from_zarr(path):

    store = zarr.DirectoryStore(path)
    zarr_group = zarr.group(store=store)
    print("Loaded zarr, file information:\n", zarr_group.info, "\n")

    x = dask.array.from_zarr(zarr_group["humidity_temp_pressure_x.zarr"])
    x.rechunk(zarr_group["humidity_temp_pressure_x.zarr"].chunks)

    y_lab = dask.array.from_zarr(zarr_group["cloud_base_label_y.zarr"])
    y_lab.rechunk(zarr_group["cloud_base_label_y.zarr"].chunks)

    y_cont = dask.array.from_zarr(zarr_group["cloud_volume_fraction_y.zarr"])
    y_cont.rechunk(zarr_group["cloud_volume_fraction_y.zarr"].chunks)

    return x, y_lab, y_cont

In [54]:
# Now, verify that a load back in of the data preserves desired qualities
x, lab, y = load_data_from_zarr("/scratch/hsouth/cbh_data/analysis_ready/dev.zarr")

Loaded zarr, file information:
 Name        : /
Type        : zarr.hierarchy.Group
Read-only   : False
Store type  : zarr.storage.DirectoryStore
No. members : 3
No. arrays  : 3
No. groups  : 0
Arrays      : cloud_base_label_y.zarr, cloud_volume_fraction_y.zarr,
            : humidity_temp_pressure_x.zarr
 



## Testing the output

Do some manual checking of the saved data to ensure the data behaves as we expect, e.g. the same indices of different arrays correspond to the same sample in from the cube

In [55]:
# Do the samples match up across groups?
threshold = 2.0 / 8.0

# same sample number
assert len(x) == len(y) == len(lab)
# preserved order (checked with between label and volume comparison
one_percent_selection = int(0.01 * len(x))
indices_to_test = np.random.choice(np.arange(len(x)), size=one_percent_selection)
print("First 20 random indices:", indices_to_test[0:20])
for i in range(len(indices_to_test)):
    vol = y[indices_to_test[i]].compute()
    base_label_position = lab[indices_to_test[i]].compute()
    # print(vol)
    thresh_overcome = np.where(vol > threshold)

    # print(thresh_overcome)
    try:
        vol_base = thresh_overcome[0][0]
    except:
        vol_base = len(vol) - 1
    # print('vol_base', vol_base, 'base_label_position', base_label_position)
    assert vol_base == base_label_position, (
        "base mismatch",
        vol_base,
        "vs",
        base_label_position,
        "vol=",
        vol,
    )
print("Pass")

First 20 random indices: [135068 218225 191350 164196 264271  19978 164181 253616 230562 305849
 224044 123984 256390   4713 122654 304085 300088 162224 125867 258681]


KeyboardInterrupt: 

In [57]:
cubes = iris.load(str(paths_to_load))
print(cubes)  # shorter comes first
inp_cube_humid = cubes[6]
tar_cube = cubes[1]
inp_cube_wrong = cubes[6]

0: m01s05i250 / (unknown)              (model_level_number: 70; latitude: 480; longitude: 640)
1: cloud_volume_fraction_in_atmosphere_layer / (1) (model_level_number: 70; latitude: 480; longitude: 640)
2: air_pressure / (Pa)                 (model_level_number: 70; latitude: 480; longitude: 640)
3: air_temperature / (K)               (model_level_number: 70; latitude: 480; longitude: 640)
4: convective_rainfall_flux / (kg m-2 s-1) (latitude: 480; longitude: 640)
5: convective_snowfall_flux / (kg m-2 s-1) (latitude: 480; longitude: 640)
6: specific_humidity / (kg kg-1)       (model_level_number: 70; latitude: 480; longitude: 640)
7: stratiform_rainfall_flux / (kg m-2 s-1) (latitude: 480; longitude: 640)
8: stratiform_snowfall_flux / (kg m-2 s-1) (latitude: 480; longitude: 640)
9: upward_air_velocity / (m s-1)       (model_level_number: 70; latitude: 480; longitude: 640)


In [61]:
print(inp_cube_wrong[0][0][0].data)

3.4332275e-05


In [62]:
for i in range(30):
    print(
        "CUBE INP:",
        inp_cube_humid[0][0][i].data,
        "SAVED INP:",
        x[i, 0, 2].compute(),
        "CUBE OUT:",
        tar_cube[0][0][i].data,
        "SAVED OUT:",
        y[i, 0].compute(),
    )

CUBE INP: 3.4332275e-05 SAVED INP: 3.4332275e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.4332275e-05 SAVED INP: 3.4332275e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.4749508e-05 SAVED INP: 3.4749508e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.4868717e-05 SAVED INP: 3.4868717e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.504753e-05 SAVED INP: 3.504753e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.4928322e-05 SAVED INP: 3.4928322e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.540516e-05 SAVED INP: 3.540516e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.5226345e-05 SAVED INP: 3.5226345e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.5107136e-05 SAVED INP: 3.5107136e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.5107136e-05 SAVED INP: 3.5107136e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.5107136e-05 SAVED INP: 3.5107136e-05 CUBE OUT: 0.140625 SAVED OUT: 0.140625
CUBE INP: 3.5107136e-05 SAVED INP: 3