In [1]:
import pandas as pd
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from tqdm import trange
import re

import os
import sys
from pathlib import Path

os.chdir(Path(sys.path[0]).parent)
import modules.utils as utils

import multiprocess as mp
import time

### Without parallel computing

In [2]:
Directory = "data"
L = 32

files = [name for name in os.listdir(Directory) if  name.__contains__("uxy")]
temp = [int(re.split('(\d+)', name)[3]) for name in files]
times = [t for n, t in enumerate(temp) if t not in temp[:n]]
times.sort()

for t in times:
    u_ds = utils.concatenate_alt(0,Directory, 'u', t)
    v_ds = utils.concatenate_alt(0,Directory, 'v', t)
    w_ds = utils.concatenate_alt(0,Directory, 'w', t)
    theta_ds = utils.concatenate_alt(0,Directory, 'theta', t)
    assert u_ds.shape == v_ds.shape == w_ds.shape == theta_ds.shape, 'u,v,w,theta have different shape'

    u_coarse = utils.coarse_array(u_ds, L)
    v_coarse = utils.coarse_array(v_ds, L)
    w_coarse = utils.coarse_array(w_ds, L)
    theta_coarse = utils.coarse_array(theta_ds, L)

    wtheta_ds = w_ds*theta_ds
    output_ds = utils.variable_samples(utils.coarse_array(wtheta_ds, L))
    tke_ds = utils.coarse_array(u_ds*u_ds, L) - u_coarse*u_coarse + utils.coarse_array(v_ds*v_ds, L) - v_coarse*v_coarse + utils.coarse_array(w_ds*w_ds, L) - w_coarse*w_coarse
    tke_in = utils.variable_samples(tke_ds)

    variables = ['u', 'v', 'w', 'theta']  # add 's'
    datasets = [u_coarse, v_coarse, w_coarse, theta_coarse]  # add 's'
    input_ds = utils.input_dataset(datasets)
    tot_ds = np.concatenate((np.concatenate((input_ds,tke_in), axis=1), output_ds), axis=1)

    variables.append('tke')
    variables.append('wtheta')
    print(variables)

    utils.write_nc_file(tot_ds,L,variables,t,4)

['u', 'v', 'w', 'theta', 'tke', 'wtheta']
writing out


### With synchronous parallel computing

In [4]:
Directory = "data"  #"/glade/scratch/sshamekh/LES_512_ug16wtspt01_data"
L = 32
variables = ['u', 'v', 'w', 'theta']  # add 's' 

files = [name for name in os.listdir(Directory) if  name.__contains__("uxy")]
temp = [int(re.split('(\d+)', name)[3]) for name in files]
times = [t for n, t in enumerate(temp) if t not in temp[:n]]
times.sort()

for t in times:
    pool=mp.Pool(mp.cpu_count())
    raw_ds = [pool.apply(utils.concatenate_alt, args=(0,Directory,variables[i],t)) for i in trange(len(variables))]
    pool.close()

    assert raw_ds[0].shape == raw_ds[1].shape == raw_ds[2].shape == raw_ds[3].shape ,'u,v,w,theta have different shape'

    pool=mp.Pool(mp.cpu_count())
    coarse_ds = [pool.apply(utils.coarse_array, args=(raw_ds[i], L)) for i in trange(len(raw_ds))]
    pool.close()

    wtheta_ds = raw_ds[2]*raw_ds[3]
    output_ds = utils.variable_samples(utils.coarse_array(wtheta_ds, L))

    tke_ds = utils.coarse_array(raw_ds[0]*raw_ds[0], L) - coarse_ds[0]*coarse_ds[0] + utils.coarse_array(raw_ds[1]*raw_ds[1], L) - coarse_ds[1]*coarse_ds[1] + utils.coarse_array(raw_ds[2]*raw_ds[2], L) - coarse_ds[2]*coarse_ds[2]
    tke_in = utils.variable_samples(tke_ds)

    datasets = [u_coarse, v_coarse, w_coarse, theta_coarse]  # add 's'
    input_ds = utils.input_dataset(datasets)
    tot_ds = np.concatenate((np.concatenate((input_ds,tke_in), axis=1), output_ds), axis=1)

    variables.append('tke')
    variables.append('wtheta')
    print(variables)

    utils.write_nc_file(tot_ds,L,variables,t,4)

100%|██████████| 4/4 [00:00<00:00,  8.93it/s]
100%|██████████| 4/4 [00:00<00:00, 27.05it/s]


['u', 'v', 'w', 'theta', 'tke', 'wtheta']
writing out


### With Asynchronous parallel computing

In [10]:
Directory = "data"  #"/glade/scratch/sshamekh/LES_512_ug16wtspt01_data"
L = 32
variables = ['u', 'v', 'w', 'theta']  # add 's' 

files = [name for name in os.listdir(Directory) if  name.__contains__("uxy")]
temp = [int(re.split('(\d+)', name)[3]) for name in files]
times = [t for n, t in enumerate(temp) if t not in temp[:n]]
times.sort()

for t in times:
    pool=mp.Pool(mp.cpu_count())
    result_objects_raw = [pool.apply_async(utils.concatenate_alt, args=(i,Directory,variables[i],t,False)) for i in trange(len(variables))]
    raw_ds = [r.get()[1] for r in result_objects_raw]
    pool.close()
    pool.join()

    assert raw_ds[0].shape == raw_ds[1].shape == raw_ds[2].shape == raw_ds[3].shape ,'u,v,w,theta have different shape'

    pool=mp.Pool(mp.cpu_count())
    result_objects_coarse = [pool.apply_async(utils.coarse_array2, args=(i,raw_ds[i], L)) for i in trange(len(raw_ds))]
    coarse_ds = [r.get()[1] for r in result_objects_coarse]
    pool.close()
    pool.join()

    wtheta_ds = raw_ds[2]*raw_ds[3]
    output_ds = utils.variable_samples(utils.coarse_array(wtheta_ds, L))

    tke_ds = utils.coarse_array(raw_ds[0]*raw_ds[0], L) - coarse_ds[0]*coarse_ds[0] + utils.coarse_array(raw_ds[1]*raw_ds[1], L) - coarse_ds[1]*coarse_ds[1] + utils.coarse_array(raw_ds[2]*raw_ds[2], L) - coarse_ds[2]*coarse_ds[2]
    tke_in = utils.variable_samples(tke_ds)

    datasets = [u_coarse, v_coarse, w_coarse, theta_coarse]  # add 's'
    input_ds = utils.input_dataset(datasets)
    tot_ds = np.concatenate((np.concatenate((input_ds,tke_in), axis=1), output_ds), axis=1)

    variables.append('tke')
    variables.append('wtheta')
    print(variables)

    utils.write_nc_file(tot_ds,str(L),variables,t,4)

100%|██████████| 4/4 [00:00<00:00, 2235.77it/s]
100%|██████████| 4/4 [00:00<00:00, 313.99it/s]


['u', 'v', 'w', 'theta', 'tke', 'wtheta']
writing out


## Check results

In [4]:
nz=4
len_samples = nz*len(variables)
tot_ds = df_init.to_numpy()
n_samples = len(tot_ds)//len_samples
tot_ds = tot_ds.reshape(n_samples, len_samples)
tot_ds.shape

(256, 24)

In [5]:
Directory = "data"
L = 32

var = 'u'

ds = utils.concatenate_time(Directory, var)

coarse_ds = utils.coarse_array(ds, L)

utils.write_coarse_file(coarse_ds,str(L),var,4)

writing out
