## Convert all netCDF NCAR timestep files to Zarr 512 arrays, with Grouped Velocity components, with (64,64,64) chunk size, round-robined across FileDB nodes (spatially using Z-order)

<font color="red">Old Dask version gives this error https://github.com/dask/distributed/issues/3955</font>

<font color='orange'>Note: Careful when Setting Dask `local_directory` to remote server (e.g. Temporary) will HUGELY slow down functions</font>

<font color='cyan'>Parallel version needs Large job</font>

In [1]:
desired_cube_side = 512
chunk_size = 64
raw_ncar_folder_path = '/home/idies/workspace/turb/data02_02/ncar-high-rate-fixed-dt'
use_dask = True
dest_folder_name = "sabl2048b" # B is the high-rate data
write_type = "prod" # or "back"

n_dask_workers = 4 # For Dask rechunking
dask_local_dir = '/home/idies/workspace/turb/data02_02'

n_pool_workers = 4 # For writing to disk

timestep_nr = 0
# timestep_range = range(1) # Ned's new High-rate fixed-dt has only 5 timesteps
# timestep_range = range(3,5)

In [2]:
# !pip install "dask[complete]"
# !pip install "xarray[complete]"
# !pip install morton-py

In [3]:
%cd /home/idies/workspace/Storage/ariel4/persistent/ncar-zarr-code/zarr_writing

/home/idies/workspace/Storage/ariel4/persistent/ncar-zarr-code/zarr_writing


In [4]:
import xarray as xr
from utils import write_tools
import dask
import os

### Get target Folder list

In [5]:
folders=write_tools.list_fileDB_folders()

# Avoiding 7-2 and 9-2 - they're too full as of May 2023
folders.remove("/home/idies/workspace/turb/data09_02/zarr/")
folders.remove("/home/idies/workspace/turb/data07_02/zarr/")

for i in range(len(folders)):
    folders[i] += dest_folder_name + "_" + str(i + 1).zfill(2) + "_" + write_type + "/"

# folders[:5]

# Create top-level dirs

# for folder_path in folders:
#     os.makedirs(folder_path, exist_ok=False)

<font color="orange">Don't delete the CD cell!</font>

In [6]:
%cd /home/idies/workspace/turb/data02_02/ncar-high-rate-fixed-dt

/home/idies/workspace/turb/data02_02/ncar-high-rate-fixed-dt


In [7]:
# for timestep_nr in timestep_range:
data_xr = xr.open_dataset(raw_ncar_folder_path + "/jhd." + str(timestep_nr).zfill(3) + ".nc")

# Group 3 velocity components together
# This fails with Dask bcs. of write permission error on SciServer Job
# Never use dask with remote location on this!!
merged_velocity = write_tools.merge_velocities(data_xr, dask_local_dir=dask_local_dir
                                               , chunk_size_base=chunk_size, use_dask=True, n_dask_workers=n_dask_workers)


# Unabbreviate 'e', 'p', 't' variable names
merged_velocity = merged_velocity.rename({'e': 'energy', 't': 'temperature', 'p': 'pressure'})


dims = [dim for dim in data_xr.dims]
dims.reverse() # use (nnz, nny, nnx) instead of (nnx, nny, nnz)

# Split 2048^3 into smaller 512^3 arrays
smaller_groups, _ = write_tools.split_zarr_group(merged_velocity, desired_cube_side, dims)

# Given up in favor of Ryan's node coloring technique
#     z_order = write_tools.morton_order_cube(cube_side=4)
node_assignments = write_tools.node_assignment(cube_side=4)


cubes = smaller_groups

encoding={
    "velocity": dict(chunks=(chunk_size, chunk_size, chunk_size, 3), compressor=None),
    "pressure": dict(chunks=(chunk_size, chunk_size, chunk_size, 1), compressor=None),
    "temperature": dict(chunks=(chunk_size, chunk_size, chunk_size, 1), compressor=None),
    "energy": dict(chunks=(chunk_size, chunk_size, chunk_size, 1), compressor=None)
}

print('Done preparing data. Starting to write...')

Done preparing data. Starting to write...


# New method took only 15min to MergeVelocity!

Maybe save merged_velocity to file?

In [None]:
# Distribute 512^3 across FileDB

# tasks = []
# for i in range(len(cubes)):
#     for j in range(len(cubes[i])):
#         for k in range(len(cubes[i][j])):
#             filedb_index = node_assignments[i][j][k]# % len(folders) # ryan's node assignment accounts for nr. nodes on filedb
#             current_array = cubes[i][j][k]

#             chunk_nr = 16 * i + 4 * j + k

#             # turb/data02_02/sabl2048b_prod/ + sabl2048b + 05 + _ + 001.zarr
#             dest_groupname = os.path.join(folders[filedb_index - 1], dest_folder_name + str(chunk_nr + 1).zfill(2) + "_" + str(timestep_nr).zfill(3) + ".zarr")
            
#             write_tools.write_to_disk(dest_groupname, current_array, encoding)

In [None]:
import threading
import queue

def write_to_disk(q):
    while True:
        try:
            chunk, dest_groupname, encoding = q.get(timeout=10)  # Adjust timeout as necessary
            chunk.to_zarr(store=dest_groupname, mode="w", encoding=encoding)
        except queue.Empty:
            break
        finally:
            q.task_done()

# Create a queue
q = queue.Queue()

# Populate the queue with tasks
for i in range(len(cubes)):
    for j in range(len(cubes[i])):
        for k in range(len(cubes[i][j])):
            filedb_index = node_assignments[i][j][k]
            current_array = cubes[i][j][k]
            dest_groupname = os.path.join(folders[filedb_index - 1], dest_folder_name + str(node_assignments[i][j][k]).zfill(2) + "_" + str(timestep_nr).zfill(3) + ".zarr")
            q.put((current_array, dest_groupname, encoding))

# Create threads and start them
num_threads = 8  # Adjust based on the number of cores and expected I/O operations
threads = []
for _ in range(num_threads):
    t = threading.Thread(target=write_to_disk, args=(q,))
    t.start()
    threads.append(t)

# Wait for all tasks to be processed
q.join()

# Wait for all threads to finish
for t in threads:
    t.join()


In [None]:
import os
from multiprocessing import Pool, cpu_count

def write_task(args):
    i, j, k, node_assignments, cubes, folders, dest_folder_name, timestep_nr, encoding = args

    filedb_index = node_assignments[i][j][k]
    current_array = cubes[i][j][k]
    
    chunk_nr = 16 * i + 4 * j + k
    
    dest_groupname = os.path.join(folders[filedb_index - 1], dest_folder_name + str(chunk_nr + 1).zfill(2) + "_" + str(timestep_nr).zfill(3) + ".zarr")
    

    # Assuming write_tools.write_to_disk is imported
    write_tools.write_to_disk(dest_groupname, current_array, encoding)


tasks = []
for i in range(len(cubes)):
    for j in range(len(cubes[i])):
        for k in range(len(cubes[i][j])):
            tasks.append((i, j, k, node_assignments, cubes, folders, dest_folder_name, timestep_nr, encoding))

# Create a multiprocessing pool with 4 workers
with Pool(n_pool_workers) as pool:
    pool.map(write_task, tasks)
