In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import dask
import pandas as pd
import xarray as xr
import dask.array as da
import numpy as np
from re import split
from scipy.interpolate import griddata
from scipy.spatial import cKDTree as KDTree
import random
from skimage.filters import gaussian, threshold_otsu
from skimage import measure
from dask import delayed
from dask_image.ndfilters import uniform_filter as uf
from dask_image.ndmeasure import variance as varian
import dask.dataframe as dd
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
from datetime import datetime, timezone

sys.path.insert(1, f"{os.path.abspath(os.path.join(os.path.abspath(''), '../'))}")
from src.utils import get_pars_from_ini
# from src.ufucnt_xr import lee_filter_new_

location = split(', |_|-|!', os.popen('hostname').read())[0].replace("\n", "")
path_data = get_pars_from_ini(campaign='loc')[location]['path_data']
path_proj = get_pars_from_ini(campaign='loc')[location]['path_proj']



In [2]:
# @jit
def get_col_row(x, size=30):
    ncols = da.ptp(x) / size
    return int(ncols)


def excluding_mesh(x, y, nx=30, ny=30):
    """
    Construct a grid of points, that are some distance away from points (x,
    """

    dx = x.ptp() / nx
    dy = y.ptp() / ny

    xp, yp = np.mgrid[x.min() - 2 * dx:x.max() + 2 * dx:(nx + 2) * 1j,
             y.min() - 2 * dy:y.max() + 2 * dy:(ny + 2) * 1j]
    xp = xp.ravel()
    yp = yp.ravel()

    tree = KDTree(np.c_[x, y])
    dist, j = tree.query(np.c_[xp, yp], k=1)

    # Select points sufficiently far away
    m = (dist > np.hypot(dx, dy))
    return xp[m], yp[m]


def regridd(data, x, y, size=30):
    """
    data = xarray datarray
    size = desired pixel size in meters
    """
    if data.ndim > 2:
        x = da.moveaxis(x.reshape(-1, x.shape[-1]), 0, -1)
        y = da.moveaxis(y.reshape(-1, y.shape[-1]), 0, -1)
        ncols_n = max(np.apply_along_axis(get_col_row, arr=x, axis=1))
        nrows_n = max(np.apply_along_axis(get_col_row, arr=y, axis=1))
        x_new_n = da.from_array(np.moveaxis(np.linspace(np.amin(x, -1), np.amax(x, -1), ncols_n), 0, -1))
        y_new_n = da.from_array(np.moveaxis(np.linspace(np.amax(y, -1), np.amin(y, -1), nrows_n), 0, -1))
        mesh = [delayed(da.meshgrid)(x_new_n[i], y_new_n[i]) for i in range(x_new_n.shape[0])]

        z_n = da.rollaxis(data.reshape(-1, data.shape[-1]), 1)
        idx_n = x.argsort(axis=-1)
        x = np.take_along_axis(x, idx_n, axis=-1)
        y = np.take_along_axis(y, idx_n, axis=-1)
        z_n = np.take_along_axis(z_n, idx_n, axis=-1)

        vp_n = dask.compute(*[delayed(excluding_mesh)(x[i], y[i]) for i in range(x.shape[0])])
        xn = [vp_n[i][0] for i in range(len(vp_n))]
        yn = [vp_n[i][1] for i in range(len(vp_n))]
        zn = dask.compute(*[delayed(da.zeros_like)(xn[i]) for i in range(x.shape[0])])
        xi_ = [mesh[i][0] for i in range(len(vp_n))]
        xi_ = dask.compute(*[da.from_delayed(v, shape=(x.shape[0], np.nan), dtype=float) for v in xi_])
        yi_ = [mesh[i][1] for i in range(len(vp_n))]
        yi_ = dask.compute(*[da.from_delayed(v, shape=(x.shape[0], np.nan), dtype=float) for v in yi_])
        zr = [delayed(griddata)((np.r_[x[i, :], xn[i]], np.r_[y[i, :], yn[i]]), np.r_[z_n[i, :], zn[i]],
                                (xi_[i], yi_[i]), method='linear', fill_value=0)
              for i in range(x.shape[0])]
        zr = da.dstack(dask.compute(*zr))
        xi_ = da.rollaxis(da.rollaxis(da.asarray(xi_), axis=-1), axis=-1)
        yi_ = da.rollaxis(da.rollaxis(da.asarray(yi_), axis=-1), axis=-1)
        return zr, xi_, yi_

    else:
        x_s = x.flatten()
        y_s = y.flatten()
        data = data.compute().flatten()
        idx = x_s.argsort()
        x_s, y_s = np.take_along_axis(x_s, idx, axis=0), np.take_along_axis(y_s, idx, axis=0)
        data = np.take_along_axis(data, idx, axis=0)
        ncols = get_col_row(x=x_s, size=size)
        nrows = get_col_row(x=y_s, size=size)
        x_new = np.linspace(x_s.min(), x_s.max(), int(ncols))
        y_new = np.linspace(y_s.max(), y_s.min(), int(nrows))
        xi, yi = np.meshgrid(x_new, y_new)
        xp, yp = excluding_mesh(x_s, y_s, nx=35, ny=35)
        zp = np.nan + np.zeros_like(xp)
        z0 = griddata((np.r_[x_s, xp], np.r_[y_s, yp]), np.r_[data, zp], (xi, yi), method='linear', fill_value=-9999)
        return z0, xi, yi


def lee_filter_new(img, size, tresh=-150):
    if img.ndim == 2:
        shape = (size, size)
    else:
        shape = (size, size, 1)
    img = da.where(da.logical_or(da.isnan(img), da.equal(img, -9999)), tresh, img)
    img_mean = uf(img, shape)
    img_sqr_mean = uf(da.power(img, 2), shape)
    img_variance = img_sqr_mean - da.power(img_mean, 2)
    overall_variance = varian(img)
    img_weights = img_variance / (img_variance + overall_variance)
    img_output = img_mean + img_weights * (img - img_mean)
    img_output = da.where(img_output > 0, img_output, 0)
    return img_output


def process_new(zhh14, x, y, time):
    x = x[:, 0, :, :]
    img_filtered = lee_filter_new(zhh14, size=3, tresh=-200)
    img, xi, yi = regridd(img_filtered, x, y)
    px_tot, _, _ = regridd(np.ones_like(zhh14), x, y)
    px_tot = np.where(px_tot < 0.99, 0, px_tot)
    px_tot = np.apply_along_axis(np.count_nonzero, arr=px_tot, axis=1)
    num_px = np.apply_along_axis(np.count_nonzero, arr=img, axis=1)
    img = np.where(img > 0., img, 0.)
    blurred = gaussian(img, sigma=0.8)
    binary = blurred > threshold_otsu(blurred)
    labels = measure.label(binary)
    if labels.ndim > 2:
        max_zhh14 = np.apply_along_axis(np.max, arr=img, axis=0).compute()
        max_zhh14 = np.apply_along_axis(np.max, arr=max_zhh14, axis=0).compute()
        props = [measure.regionprops(labels[:, :, i]) for i in range(labels.shape[-1])]
        _props_all = [[[j.area for j in prop], [j.perimeter for j in prop], [j.major_axis_length for j in prop],
                       [j.minor_axis_length for j in prop], [j.bbox for j in prop], num_px[:, i].compute().tolist(),
                       px_tot[:, i].compute().tolist(), np.round(max_zhh14[i], 2).tolist()]
                      for i, prop in enumerate(props)]

        df = pd.DataFrame(data=_props_all, columns=['area', 'perimeter', 'axmax', 'axmin', 'bbox', 'num_px', 'tot_px',
                                                    'max_zhh'], index=pd.to_datetime(time))
    else:
        max_zhh = np.max(img)
        props = measure.regionprops(labels)
        _props_all = [[[prop.area], [prop.perimeter], [prop.major_axis_length], [prop.minor_axis_length],
                       [prop.bbox], [num_px], [px_tot], [max_zhh]] for prop in props]
        df = pd.DataFrame(data=_props_all, columns=['area', 'perimeter', 'axmax', 'axmin', 'bbox', 'num_px', 'tot_px',
                                                    'max_zhh'], index=pd.to_datetime(time))
    dates = datetime.now(timezone.utc)
    df.to_csv(f'../results/all_filtered_{dates:%Y}{dates:%m}{dates:%d}{dates:%H}{dates:%M}.csv')
    return df.area, df.perimeter, df.axmax, df.axmin, df.bbox, df.num_px, df.tot_px, df.max_zhh


def ufunc_wrapper(data):
    x = data.range * data.DR * np.sin(np.deg2rad(data.azimuth))  # add roll
    y = data.alt3d
    zhh = data.zhh14.where(data.alt3d > 500)
    _data = [zhh, x, y, data.time]
    icd = [list(i.dims) for i in _data]
    dfk = {'allow_rechunk': True, 'output_sizes': {}}
    a, p, mx, mn, bbox, npx, tot, mx_zhh = xr.apply_ufunc(process_new,
                                                          *_data,
                                                          input_core_dims=icd,
                                                          output_core_dims=[["time"], ["time"], ["time"], ["time"],
                                                                            ["time"], ["time"], ["time"], ["time"]],
                                                          dask_gufunc_kwargs=dfk,
                                                          dask='parallelized',
                                                          vectorize=True,
                                                          output_dtypes=[(object), (object), (object), (object),
                                                                         (object),(object), (object), (object)]
                                                          )
    ds_out = a.to_dataset(name='area')
    ds_out['perimeter'] = p
    ds_out['ax_max'] = mx
    ds_out['ax_min'] = mn
    ds_out['bbox'] = bbox
    ds_out['num_px'] = npx
    ds_out['tot'] = tot
    ds_out['max_zhh'] = mx_zhh
    return ds_out


In [3]:
# client.close()
# cluster.close()

In [4]:
cluster = SLURMCluster(queue="seseml",
                       memory='200GB',
                       cores=40,
                       processes=1,
                       walltime='48:00:00',
                       scheduler_options={'host': '172.22.179.3:7222', 'dashboard_address': ':7778'})

In [5]:
# cluster.scale(2)
cluster.adapt(minimum=0, maximum=4)
cluster

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [6]:
%%bash
squeue -u alfonso8

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)


In [7]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.22.179.3:7778/status,

0,1
Dashboard: http://172.22.179.3:7778/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://172.22.179.3:7222,Workers: 0
Dashboard: http://172.22.179.3:7778/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [16]:
%%time
ds_xr = xr.open_zarr(f'{path_data}/zarr_rckd/KUsKAs_Wn/lores.zarr')
# ds_xr = xr.open_zarr(f'{path_data}/zarr/KUsKAs_Wn/lores.zarr')
ds_xr = ds_xr.sel(time=~ds_xr.get_index("time").duplicated())



CPU times: user 853 ms, sys: 105 ms, total: 958 ms
Wall time: 958 ms


In [17]:
ds_prop = pd.read_csv(f'{path_proj}/results/filtrado_sobre_fechas.csv', usecols=['time'])
ds_prop['dates'] = pd.to_datetime(ds_prop.time)
times = list(ds_prop.dates)
print(len(times))

11366


In [18]:
%%time
ds_data = ds_xr[['zhh14', 'azimuth', 'DR']].sel(time=times)
# ds_data = ds_xr[['zhh14', 'azimuth', 'DR']].sel(time=slice('2019-09-16 03:12:50', '2019-09-16 03:13:05'))

# ds_data = ds_xr[['zhh14', 'azimuth', 'DR']]
# ds_data = ds_xr[['zhh14', 'azimuth', 'DR']]

len(ds_data.time)

CPU times: user 69.7 ms, sys: 4.65 ms, total: 74.3 ms
Wall time: 63.8 ms


11366

In [19]:
a = ufunc_wrapper(ds_data)

In [20]:
%%time
w = dask.compute(a)

distributed.scheduler - ERROR - Couldn't gather keys {"('zeros_like-6bf3a416fc538c3de8802297389ee886', 0)": []} state: ['processing'] workers: []
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: [], ('zeros_like-6bf3a416fc538c3de8802297389ee886', 0)
NoneType: None
distributed.scheduler - ERROR - Couldn't gather keys {"('zeros_like-6bf3a416fc538c3de8802297389ee886', 0)": []} state: [None] workers: []
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: [], ('zeros_like-6bf3a416fc538c3de8802297389ee886', 0)
NoneType: None
distributed.scheduler - ERROR - Couldn't gather keys {"('zeros_like-8e27c5af8dbd847a0a94e941e0bfa1b7', 0)": []} state: ['processing'] workers: []
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: [], ('zeros_like-8e27c5af8dbd847a0a94e941e0bfa1b7', 0)
NoneType: None
distributed.scheduler - ERROR - Couldn't gather keys {"('zeros_like-7aef8b3c14b05600c999c58bd3d3d845', 0)": []} state: [

distributed.utils - ERROR - Set of coroutines/Futures is empty.
Traceback (most recent call last):
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/site-packages/distributed/utils.py", line 645, in log_errors
    yield
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/site-packages/distributed/deploy/adaptive.py", line 199, in scale_down
    await f
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/site-packages/distributed/deploy/spec.py", line 555, in scale_down
    await self
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/site-packages/distributed/deploy/spec.py", line 402, in _
    await self._correct_state()
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/site-packages/distributed/deploy/spec.py", line 346, in _correct_state_internal
    await asyncio.wait(tasks)
  File "/data/keeling/a/alfonso8/miniconda3/envs/camp2ex_proj/lib/python3.9/asyncio/t

CPU times: user 1h 22min 42s, sys: 13min 6s, total: 1h 35min 48s
Wall time: 4h 51min 25s


In [21]:
df = w[0].to_dataframe()

In [22]:
df.to_csv('../results/filtrado_sobre_fechas.csv')

In [None]:
df.columns

In [None]:
df.dropna(how='any', inplace=True)

In [None]:
client.get_versions(check=True)

In [None]:
df.describe()

In [None]:
w[0]

boundary box, max ref, mean (ku, ka), max ref (surf), 