In [None]:
#Loading in Packages and Data

#Importing Packages
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import matplotlib.ticker as ticker
import matplotlib.cm as cm
from matplotlib.colors import Normalize
from matplotlib.ticker import MaxNLocator
from matplotlib.ticker import ScalarFormatter
import matplotlib.gridspec as gridspec
import xarray as xr
import os; import time
import pickle
import h5py
###############################################################
def coefs(coefficients,degree):
    coef=coefficients
    coefs=""
    for n in range(degree, -1, -1):
        string=f"({coefficients[len(coef)-(n+1)]:.1e})"
        coefs+=string + f"x^{n}"
        if n != 0:
            coefs+=" + "
    return coefs
###############################################################

#Importing Model Data
check=False
dir='/mnt/lustre/koa/koastore/torri_group/air_directory/DCI-Project/'

# dx = 1 km; Np = 1M; Nt = 5 min
data=xr.open_dataset(dir+'../cm1r20.3/run/cm1out_1km_5min.nc') #***
parcel=xr.open_dataset(dir+'../cm1r20.3/run/cm1out_pdata_1km_5min_1e6.nc') #***
t_res='5min'; res='1km'
Np_str='1e6'

# # dx = 1km; Np = 50M
# #Importing Model Data
# check=False
# dir2='/home/air673/koa_scratch/'
# data=xr.open_dataset(dir2+'cm1out_1km_1min.nc') #***
# parcel=xr.open_dataset(dir2+'cm1out_pdata_1km_1min_50M.nc') #***
# res='1km'; t_res='1min'; Np_str='50e6'

# # dx = 1km; Np = 100M
# #Importing Model Data
# check=False
# dir2='/home/air673/koa_scratch/'
# data=xr.open_dataset(dir2+'cm1out_1km_1min.nc') #***
# parcel=xr.open_dataset(dir2+'cm1out_pdata_1km_1min_100M.nc') #***
# res='1km'; t_res='1min'; Np_str='100e6'


# # dx = 250 m
# #Importing Model Data
# check=False
# dir2='/home/air673/koa_scratch/'
# data=xr.open_dataset(dir2+'cm1out_250m.nc') #***
# parcel=xr.open_dataset(dir2+'cm1out_pdata_250m.nc') #***
# res='250m'
# Np_str='150e6'

In [None]:
#INITIALIZE DATA FUNCTION
###############################################################
def initiate_array(out_file, vars, t_chunk_size, p_chunk_size, t_size=None, p_size=None):
    if t_size is None:
        t_size = len(data['time'])  # Number of timesteps
    if p_size is None:
        p_size = len(parcel['xh'])  # Number of vertical levels

    with h5py.File(out_file, 'w') as f:
        for var_name in vars:
            if var_name not in f:
                # Set dtype conditionally
                if var_name in ['Z', 'Y', 'X']:
                    dtype = np.uint16
                elif var_name in ['A_g', 'A_c']:
                    dtype = np.bool_
                else:
                    dtype = np.float32  # or whatever your default is

                f.create_dataset(
                    var_name,
                    shape=(t_size, p_size),
                    chunks=(t_chunk_size, p_chunk_size),
                    dtype=dtype
                )

In [None]:
# block = 10  # number of time steps to write at a time
# for start in range(0, t_size, block):
#     end = min(start + block, t_size)
#     size = (end - start, p_size)

In [None]:
def check_memory():
    import sys
    ipython_vars = ["In", "Out", "exit", "quit", "get_ipython", "ipython_vars"]
    print("Top 10 objects with highest memory usage")
    # Get a sorted list of the objects and their sizes
    mem = {
        key: round(value/1e6,2)
        for key, value in sorted(
            [
                (x, sys.getsizeof(globals().get(x)))
                for x in globals()
                if not x.startswith("_") and x not in sys.modules and x not in ipython_vars
            ],
            key=lambda x: x[1],
            reverse=True)[:10]
    }
    print({key:f"{value} MB" for key,value in mem.items()})
    print(f"\n{round(sum(mem.values()),2)/1000} GB in use overall")

In [None]:
#JOB ARRAY SETUP
job_array=False
job_array=True

if job_array==True:

    num_jobs=60 #180 #how many total jobs are being run? i.e. array=1-100 ==> num_jobs=100 #***
    total_elements=len(data['time']) #total num of variables

    if num_jobs >= total_elements:
        raise ValueError("Number of jobs cannot be greater than or equal to total elements.")

    
    job_range = total_elements // num_jobs  # Base size for each chunk
    remaining = total_elements % num_jobs   # Number of chunks with 1 extra 
    
    # Function to compute the start and end for each job_id
    def get_job_range(job_id, num_jobs):
        job_id-=1
        # Add one extra element to the first 'remaining' chunks
        start_job = job_id * job_range + min(job_id, remaining)
        end_job = start_job + job_range + (1 if job_id < remaining else 0)
    
        if job_id == num_jobs - 1: 
            end_job = total_elements #- 1
        return start_job, end_job
    # def job_testing():
    #     #TESTING
    #     start=[];end=[]
    #     for job_id in range(1,num_jobs+1):
    #         start_job, end_job = get_job_range(job_id, num_jobs)
    #         print(start_job,end_job)
    #         start.append(start_job)
    #         end.append(end_job)
    #     print(np.all(start!=end))
    #     print(len(np.unique(start))==len(start))
    #     print(len(np.unique(end))==len(end))
    # job_testing()
    
    job_id = int(os.environ.get('SLURM_ARRAY_TASK_ID', 0)) #this is the current SBATCH job id
    if job_id==0: job_id=73
    start_job, end_job = get_job_range(job_id, num_jobs)
    index_adjust=start_job
    print(f'start_job = {start_job}, end_job = {end_job}')

In [None]:
if job_array==True:
    #Indexing Array with JobArray
    data=data.isel(time=slice(start_job,end_job))
    parcel=parcel.isel(time=slice(start_job,end_job))
    #(for 150_000_000 parcels use 500-1000 jobs)

if job_array==False:
    start_job=0;end_job=len(data['time']);index_adjust=0

In [None]:
###########################################################################################################################################################################

In [None]:
#MAKING LAGRANGIAN BINARY ARRAY
###############################################################

In [None]:
#Lagrangian Position Arrays
##############
def grid_location(z,y,x):
    zf=data['zf'].values*1000; which_zh=np.clip(np.searchsorted(zf,z)-1,0,None).astype(np.uint16)
    #which_zh=np.where(which_zh == -1, 0, which_zh) 
    
    yf=data['yf'].values*1000; which_yh=np.clip(np.searchsorted(yf,y)-1,0,None).astype(np.uint16) 
    #which_yh=np.where(which_yh == -1, 0, which_yh) 
    
    xf=data['xf'].values*1000; which_xh=np.clip(np.searchsorted(xf,x)-1,0,None).astype(np.uint16)
    #which_xh=np.where(which_xh == -1, 0, which_xh) 
    
    return which_zh,which_yh,which_xh

print('Loading Lagrangian x,y,z into Memory\n')
x=parcel['x'].data;y=parcel['y'].data;z=parcel['z'].data

print('Creating Lagrangian X,Y,Z Binary Arrays\n')
Z,Y,X=grid_location(z,y,x)

check_memory()

In [None]:
#GETTING u and w for Lagrangian_Tracking later
u=parcel['u'].data
w=parcel['w'].data

In [None]:
def call_variable(varname):
    var_data=data[varname].data
    return var_data

In [None]:
def make_lagrangian_array(varnames):
    # Initialize dictionaries
    var_data_dict = {varname: call_variable(varname) for varname in varnames}
    VAR = {varname: np.zeros_like(Z, dtype='float32') for varname in varnames}

    Nt = len(data['time'])
    Np = len(parcel['xh'])
    for p in np.arange(Np):
        if np.mod(p, 1e6) == 0: 
            print(f"{p}/{len(parcel['xh'])}")

        # Get Indices
        zs = Z[:, p]
        ys = Y[:, p]
        xs = X[:, p]
        ts = np.arange(Nt)

        # Loop over all variables and fill the respective VAR array
        for varname, var_data in var_data_dict.items():
            VAR[varname][:, p] = var_data[ts, zs, ys, xs]

    # Return all the arrays in a list
    return [VAR[varname] for varname in varnames]


In [None]:
print('Making W, QC, and QI Lagrangian Array\n')

varnames=['winterp','qc','qi']
[W,QC,QI]=make_lagrangian_array(varnames); check_memory()

print('Making QC+QI Lagrangian Array\n')
import dask.array as da
Nt=len(data['time'])
QC = da.from_array(QC, chunks=(Nt, 'auto'))
QI = da.from_array(QI, chunks=(Nt, 'auto'))
QCQI=QC+QI
QCQI=QCQI.compute(); check_memory()
array_to_dask=True

In [None]:
#Create Set Thresholds and Create Binary Arrays
w_thresh1=0.1
w_thresh2=0.5

qcqi_thresh1=1e-6
qcqi_thresh2=1e-6

print('Making Lagrangian Binary Arrays\n')
#Apply Thresholds 
if array_to_dask==True:
    import dask.array as da
    Nt=len(data['time'])
    W = da.from_array(W, chunks=(Nt, 'auto'))
    QCQI = da.from_array(QCQI, chunks=(Nt, 'auto'))
    array_to_dask=False

print('Making A_g')
A_g = np.where((W >= w_thresh1) & (QCQI < qcqi_thresh1), True, False)
print('Making A_c')
A_c = np.where((W >= w_thresh2) & (QCQI >= qcqi_thresh2), True, False)
A_g=A_g.compute(); A_c=A_c.compute()
check_memory()

In [None]:
Np=len(parcel['xh'])
Nt=len(parcel['time'])

In [None]:
# Saving Data
##############
print('Saving Data\n')
import h5py
dir2=dir+'Project_Algorithms/Lagrangian_Arrays/job_out/'
out_file=dir2+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}'
if job_array==True:
    out_file+=f'_{job_id}.h5'
elif job_array==False:
    out_file+=f'.h5'

vars=['u','w','z','x','Z','Y','X','W','QCQI','A_g','A_c']
initiate_array(out_file,vars,t_chunk_size=1,p_chunk_size=1_000_000)
with h5py.File(out_file, 'a') as f: 
    f['u'][:]=u
    f['w'][:]=w
    
    f['z'][:]=z
    f['x'][:]=x
    
    f['Z'][:]=Z
    f['Y'][:]=Y
    f['X'][:]=X

    f['W'][:]=W
    f['QCQI'][:]=QCQI
    
    f['A_g'][:]=A_g
    f['A_c'][:]=A_c

In [None]:
check_memory()

In [None]:
#########################################
#RECOMBINE SEPERATE JOB_ARRAYS AFTER
recombine=False #KEEP FALSE WHEN JOB ARRAY IS RUNNING
recombine=True

In [None]:
# if recombine==True:
#     dir2=dir+'Project_Algorithms/Lagrangian_Arrays/job_out/'
#     dir3=dir+'Project_Algorithms/Lagrangian_Arrays/'
#     out_file=dir3+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}.h5'

#     print('initializing array')
#     vars=['u','w','z','Z','Y','X','W','QCQI','A_g','A_c']
#     initiate_array(out_file,vars,t_chunk_size=100,p_chunk_size=500_000)

#     print('recombining')
#     with h5py.File(out_file, 'r+') as f_out:
#         num_jobs=180
#         for job_id in np.arange(1,num_jobs+1): #OG
#         # for job_id in np.arange(1,2+1): #TESTING
#             if np.mod(job_id,5)==0: print(f"job_id = {job_id}")
#             [a,b] = get_job_range(job_id,num_jobs)
    
#             in_file=dir2+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}_{job_id}.h5' 
#             with h5py.File(in_file, 'r') as f_in: 
#                 for var in vars:
#                     f_out[var][a:b]=f_in[var][:]

In [None]:
#DASK-ENABLED
import glob
import re
def recombine_func(in_files,out_file):
    # matching_files = sorted(glob.glob(in_files))
    matching_files = sorted(
    glob.glob(in_files),
    key=lambda f: int(re.search(r'_(\d+)\.h5$', f).group(1))
)
    
    print('recombining')
    out=xr.open_mfdataset(matching_files,engine='h5netcdf',concat_dim='phony_dim_0',combine='nested',phony_dims='sort')
    from dask.diagnostics import ProgressBar
    with ProgressBar():
        out.to_netcdf(out_file, engine='h5netcdf')
    
if recombine==True:
    dir2=dir+'Project_Algorithms/Lagrangian_Arrays/job_out/'
    dir3=dir+'Project_Algorithms/Lagrangian_Arrays/'
    in_files = dir2 + f'lagrangian_binary_array_{res}_{t_res}_{Np_str}_*.h5'
    out_file=dir3+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}.h5' 
    
    recombine_func(in_files,out_file)

In [None]:
#########################################
# Reading Back Data Later

In [None]:
# # Reading Back Data Later
# ##############
# def make_data_dict(var_names,read_type):
#     if read_type=='h5py':
#         with h5py.File(in_file, 'r') as f:
#             data_dict = {var_name: f[var_name][:,start_job:end_job] for var_name in var_names} #USE THIS ONE
#             data_dict = {var_name: f[var_name][:] for var_name in var_names}
            
#     elif read_type=='xarray':
#         in_data = xr.open_dataset(
#             in_file,
#             engine='h5netcdf',
#             phony_dims='sort',
#             chunks={'phony_dim_0': 100, 'phony_dim_1': 1_000_000} 
#         )
#         # data_dict = {k: in_data[k][:,start_job:end_job].compute().data for k in var_names} #USE THIS ONE
#         data_dict = {k: in_data[k][:].compute().data for k in var_names}
#     return data_dict

# # read_type='xarray'
# read_type='h5py'

In [None]:
# import h5py
# dir2=dir+'Project_Algorithms/Lagrangian_Arrays/'
# in_file=dir2+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}.h5'

# var_names = ['A_g', 'A_c', 'Z', 'Y', 'X','W']
# data_dict = make_data_dict(var_names,read_type)
# A_g, A_c, Z, Y, X, W = (data_dict[k] for k in var_names)

# # #Making Time Matrix
# Nt=len(data['time'])
# T = np.broadcast_to(np.arange(Nt)[:, None], A_c.shape)  # shape: (Nt, p)

# check_memory(globals())

In [None]:
#############################################
#TESTING
start_job=500;end_job=510
# start_job=int(500/5);end_job=int(510/5)
dir2=dir+'Project_Algorithms/Lagrangian_Arrays/'
with h5py.File(dir2+f'lagrangian_binary_array_{res}_{t_res}_{Np_str}.h5', 'r') as f:
    # Load the dataset by its name
    A_c = f['A_c'][start_job:end_job]
    W = f['W'][start_job:end_job]
    QCQI = f['QCQI'][start_job:end_job]
    Z = f['Z'][start_job:end_job]
    Y = f['Y'][start_job:end_job]
    X = f['X'][start_job:end_job]

data2=data.isel(time=slice(start_job,end_job))

In [None]:
t,p=0,1000
def test(t,p,VAR,var_name):
    z=Z[t,p];y=Y[t,p];x=X[t,p]
    if var_name=='qcqi':
        out=data2['qc'].isel(time=t,zh=z,yh=y,xh=x).data+data2['qi'].isel(time=t,zh=z,yh=y,xh=x).data
    else:
        out=data2[var_name].isel(time=t,zh=z,yh=y,xh=x).data
    print(VAR[t,p],out)

test(t,p,W,'winterp')
test(t,p,QCQI,'qcqi')