## Notebook for testing code

In [5]:
%matplotlib inline

import datacube
import numpy as np
import sys
import xarray as xr
from datacube.utils.dask import start_local_dask


import dask
from odc.algo._dask import reshape_yxbt
import dask.array as da
from odc.algo import randomize, reshape_for_geomedian
from datacube.utils.geometry import assign_crs

from odc.algo import yxbt_sink
import hdstats

sys.path.append('../Scripts')
from deafrica_datahandling import load_ard
from deafrica_dask import create_local_dask_cluster
from deafrica_plotting import rgb
from datacube.testutils.io import rio_slurp_xarray
from odc.algo._dask import reshape_yxbt
import psutil

In [6]:
# client = create_local_dask_cluster()
ncpu = psutil.cpu_count() - 2
client = start_local_dask(nanny=False,
                          n_workers=1, threads_per_worker=ncpu, mem_safety_margin='7G', # Generic
                          processes=False)
client

0,1
Client  Scheduler: inproc://10.95.98.254/1848/1  Dashboard: /user/chad/proxy/8787/status,Cluster  Workers: 1  Cores: 62  Memory: 508.40 GB


In [7]:
dc = datacube.Datacube(app='stuff')

In [8]:
# Define area of interest
lat = -3.0475
lon = 37.322
lon_buffer = 0.4
lat_buffer = 0.4

# Combine central lat,lon with buffer to get area of interest
lat_range = (lat-lat_buffer, lat+lat_buffer)
lon_range = (lon-lon_buffer, lon+lon_buffer)

# Set the range of dates for the analysis
years_range = ('2019-01', '2019-12')

In [9]:
# Create a reusable query
query = {
    'y': lat_range,
    'x': lon_range,
    'time': years_range,
    'measurements': ['red','blue','green','nir','swir_1',
                     'swir_2','red_edge_1','red_edge_2','red_edge_3'],
    'resolution': (-10,10),
    'output_crs': 'epsg:6933',
    'group_by': 'solar_day'
}

# Load available data from Landsat 8
ds = load_ard(dc=dc,
              products=['s2_l2a'],
              **query,
#               dtype='native',
              dask_chunks={},
              scaling='normalised'
              )
ds

Using pixel quality parameters for Sentinel 2
Finding datasets
    s2_l2a
Applying pixel quality/cloud mask
Re-scaling Sentinel-2 data
Returning 72 time steps as a dask array


Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.66 GB 314.73 MB Shape (72, 10192, 7720) (1, 10192, 7720) Count 869 Tasks 72 Chunks Type float32 numpy.ndarray",7720  10192  72,

Unnamed: 0,Array,Chunk
Bytes,22.66 GB,314.73 MB
Shape,"(72, 10192, 7720)","(1, 10192, 7720)"
Count,869 Tasks,72 Chunks
Type,float32,numpy.ndarray


In [None]:
def xr_geomedian_tmad(ds, axis='time', where=None, **kw):
    """
    :param ds: xr.Dataset|xr.DataArray|numpy array
    Other parameters:
    **kwargs -- passed on to pcm.gnmpcm
       maxiters   : int         1000
       eps        : float       0.0001
       num_threads: int| None   None
    """

    import hdstats
    def gm_tmad(arr, **kw):
        """
        arr: a high dimensional numpy array where the last dimension will be reduced. 
    
        returns: a numpy array with one less dimension than input.
        """
        gm = hdstats.nangeomedian_pcm(arr, **kw)
        return gm
        nt = kw.pop('num_threads', None)
        emad = hdstats.emad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        smad = hdstats.smad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        bcmad = hdstats.bcmad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        return np.concatenate([gm, emad, smad, bcmad], axis=-1)


    def norm_input(ds, axis):
        if isinstance(ds, xr.Dataset):
            xx = reshape_yxbt(ds, yx_chunks=500)
            return ds, xx, xx.data
        else:  # assume numpy or similar
            xx_data = ds
            if xx_data.ndim != 4:
                raise ValueError("Expect 4 dimensions on input: y,x,band,time")
            return None, None, xx_data

    kw.setdefault('nocheck', False)
    kw.setdefault('num_threads', 1)
    kw.setdefault('eps', 1e-6)

    ds, xx, xx_data = norm_input(ds, axis)
    is_dask = dask.is_dask_collection(xx_data)

    if is_dask:
        if xx_data.shape[-2:] != xx_data.chunksize[-2:]:
            xx_data = xx_data.rechunk(xx_data.chunksize[:2] + (-1, -1))

        data = da.map_blocks(lambda x: gm_tmad(x, **kw),
                             xx_data,
                             name=randomize('geomedian'),
                             dtype=xx_data.dtype, 
                             chunks=xx_data.chunks[:-2] + (xx_data.chunks[-2][0]+3,),
                             drop_axis=3)
    else:
        data = gm_tmad(xx_data, **kw)

    dims = xx.dims[:-1]
    cc = {k: xx.coords[k] for k in dims}
    cc[dims[-1]] = np.hstack([xx.coords[dims[-1]].values,['edev', 'sdev', 'bcdev']])
    xx_out = xr.DataArray(data, dims=dims, coords=cc)

    if ds is None:
        xx_out.attrs.update(xx.attrs)
        return xx_out

    ds_out = xx_out.to_dataset(dim='band')
    for b in ds.data_vars.keys():
        src, dst = ds[b], ds_out[b]
        dst.attrs.update(src.attrs)

    return assign_crs(ds_out, crs=ds.geobox.crs)

In [None]:
old=xr_geomedian_tmad(ds) #{'x':1500, 'y':1500, 'time':1}

In [None]:
%%time
old=old.compute()

In [None]:
old
# old gm+tmads: 7min 51s
# old gm-only: 7min 20s

In [None]:
def xr_geomedian_tmad_new(ds, axis='time', **kw):
    """
    :param ds: xr.Dataset|xr.DataArray|numpy array
    Other parameters:
    **kwargs -- passed on to pcm.gnmpcm
       maxiters   : int         1000
       eps        : float       0.0001
       num_threads: int| None   None
    """

    import hdstats
    def gm_tmad(arr, **kw):
        """
        arr: a high dimensional numpy array where the last dimension will be reduced. 
    
        returns: a numpy array with one less dimension than input.
        """
        gm = hdstats.nangeomedian_pcm(arr, **kw)
        return gm
        nt = kw.pop('num_threads', None)
        emad = hdstats.emad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        smad = hdstats.smad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        bcmad = hdstats.bcmad_pcm(arr, gm, num_threads=nt)[:,:, np.newaxis]
        return np.concatenate([gm, emad, smad, bcmad], axis=-1)


    def norm_input(ds):
        if isinstance(ds, xr.Dataset):
            xx = reshape_yxbt(ds, yx_chunks=500)
            return ds, xx, xx.data

    kw.setdefault('nocheck', False)
    kw.setdefault('num_threads', 1)
    kw.setdefault('eps', 1e-6)

    ds, xx, xx_data = norm_input(ds)
    is_dask = dask.is_dask_collection(xx_data)

    if is_dask:
        data = da.map_blocks(lambda x: gm_tmad(x, **kw),
                             xx_data,
                             name=randomize('geomedian'),
                             dtype=xx_data.dtype, 
                             chunks=xx_data.chunks[:-2] + (xx_data.chunks[-2][0]+3,),
                             drop_axis=3)
    
    dims = xx.dims[:-1]
    cc = {k: xx.coords[k] for k in dims}
    cc[dims[-1]] = np.hstack([xx.coords[dims[-1]].values,['edev', 'sdev', 'bcdev']])
    xx_out = xr.DataArray(data, dims=dims, coords=cc)

    if ds is None:
        xx_out.attrs.update(xx.attrs)
        return xx_out

    ds_out = xx_out.to_dataset(dim='band')
    for b in ds.data_vars.keys():
        src, dst = ds[b], ds_out[b]
        dst.attrs.update(src.attrs)

    return assign_crs(ds_out, crs=ds.geobox.crs)


In [None]:
new=xr_geomedian_tmad_new(ds.chunk({'x':11000, 'y':11000, 'time':1}))

In [None]:
%%time
new=new.compute()

In [None]:
new
# new gm+tmads: 6min 35s
# new gm-only: 7min 57s

In [None]:
# chunk=750, no persist 4min 30s
# chunk=750, persist 4min 21s
# chunk=500, persist 5min 48s

# no_dask = 3min 50s
# no_dask_sink = 2min 55s

# no-dask

In [None]:
dask.config.config['distributed']['worker']['memory']

In [10]:
def xr_geomedian_tmad_fast(ds, ncpu, client):
    """
    Fastest version of geomedian+TMADS.
    Dask is used to arrange data into yxbt order,
    openmp runs runs gm+tmads in parallel, but all
    data is stored in memory
    """

    
    def gm_tmad(arr, ncpu, **kw):
        """
        arr: a high dimensional numpy array where the last dimension will be reduced. 
    
        returns: a numpy array with one less dimension than input.
        """
        print('gm')
        gm = hdstats.nangeomedian_pcm(arr, maxiters=1000,
                                   eps=1e-4,
                                   nocheck=True, 
                                   nodata=0,
                                   num_threads=ncpu)
        print('tmads')
        emad = hdstats.emad_pcm(arr, gm, num_threads=ncpu)[:,:, np.newaxis]
        smad = hdstats.smad_pcm(arr, gm, num_threads=ncpu)[:,:, np.newaxis]
        bcmad = hdstats.bcmad_pcm(arr, gm, num_threads=ncpu)[:,:, np.newaxis]
        return np.concatenate([gm, emad, smad, bcmad], axis=-1)
    
    # reorder data into yxbt order using dask
    bands = list(dv.data for dv in ds.data_vars.values())
    xx_data = yxbt_sink(bands, client)
    
    # run the gm and tmads using hdstats 
    data = gm_tmad(xx_data, ncpu)

    # recreate xarray
    dims = ('y', 'x', 'band')
    cc = ds.geobox.xr_coords(with_crs=True)
    cc['band'] = list(ds.data_vars)
    cc[dims[-1]] = np.hstack([cc[dims[-1]], ['edev', 'sdev', 'bcdev']])
    xx_out = xr.DataArray(data, dims=dims, coords=cc)

    ds_out = xx_out.to_dataset(dim='band')
    for b in ds.data_vars.keys():
        src, dst = ds[b], ds_out[b]
        dst.attrs.update(src.attrs)

    return assign_crs(ds_out, crs=ds.geobox.crs)

In [11]:
%%time
nodask=xr_geomedian_tmad_fast(ds, ncpu=ncpu, client=client)

gm
tmads
CPU times: user 2h 21min 55s, sys: 45min 32s, total: 3h 7min 28s
Wall time: 18min 18s


In [None]:
#nodask
# nodask gm+tmads 18mins 14s
# nodask gm-only 5min 43s
# nodask gm+tmads 18mins 14s hdstats1.8post1

In [None]:
nodask['edev'] = -np.log(nodask['edev'])
nodask['sdev'] = -np.log(nodask['sdev'])
nodask['bcdev'] = -np.log(nodask['bcdev'])

In [None]:
nodask

In [None]:
rgb(nodask, size=15)

In [None]:
rgb(nodask, size=15, bands=['edev', 'sdev', 'bcdev'])

## Outlier detection

In [None]:
tif = '../Supplementary_data/Urban_index_comparison/GHS_BUILT_LDSMT_GLOBE_R2018A_3857_30_V2_0_12_10.tif'

In [None]:
X = xr.open_rasterio(tif).squeeze().isel(x=range(10000,20000), y=range(10000,20000)).astype(np.uint8)

In [None]:
X

In [None]:
X.plot(size=12);

# ***

In [None]:
url_slope = "https://deafrica-data.s3.amazonaws.com/ancillary/dem-derivatives/cog_slope_africa.tif"
slope = rio_slurp_xarray(url_slope, gbox=ds.geobox)
print('mean: '+str(slope.mean().values))
print('max: '+str(slope.max().values))
print('min: '+str(slope.min().values))
slope.plot.hist(bins=50)

In [None]:
_50 = slope>50
_100 = slope>100
_25 = slope>25
_30 = slope>30
_35 = slope>35

In [None]:
slope.plot(figsize=(10,10), vmax=50);

In [None]:
_25.plot(figsize=(10,10));

In [None]:
from datacube.utils.cog import write_cog
write_cog(_35.astype(float), 'slope_35.tif', overwrite=True)

In [None]:
_35.plot(figsize=(20,20));

In [None]:
_30.plot(figsize=(20,20));

In [None]:
_50.plot(figsize=(8,8));

In [None]:
slope.plot.hist(figsize=(10,10), bins=50)

In [None]:
def date_of_median(da, year, sample_lat, sample_lon):
    """
    da = xr.DataArray
        Assuming an annual time-series
    year = str
        year of time-series in 'da'
    sample_lat = float
        latitude pixel coordinate
    sample_lon = float
        longitude pixel coordinate
    
    """
    
    #calculate medians for each month
    monthly_medians = da.groupby('time.month').median()
    
    months = [str(i) for i in range(1,13)]
    indexes = [i for i in range(0,12)]
    
    dates=[]
    values=[]
    for month, index in zip(months,indexes): 
        
        #select the month of interest from da
        m = da.sel(time=year+"-"+month)
        
        #find regions with all-NaN slices
        mask = m.isnull().all('time')
        
        #calculate distance each pixel has from median
        distance = m - monthly_medians.isel(month=index)
        
        #index of the absolute minimum distance
        distance = distance.fillna(float(distance.max() + 1))
        distance=xr.ufuncs.fabs(distance)
        idx = distance.idxmin(dim='time', skipna=True).where(~mask)
        value = distance.sel(time=idx, method='nearest')
        values.append(value)
        dates.append(idx)
    
    #join into dataarray along new dimension
    dates = xr.concat(dates, "date of median")
    dist_from_median = xr.concat(values, 'dist_from_monthly_median')
    
    #select pixel
    dates = dates.sel(x=sample_lon, y=sample_lat, method='nearest')
    dist_from_median = dist_from_median.sel(x=sample_lon, y=sample_lat, method='nearest')
    
    return dates, dist_from_median

In [None]:
date_of_median, dist_from_median = date_of_median(ds.blue,
                                                  sample_lon=1929690.,
                                                  sample_lat=-4123870.,
                                                  year='2018')

In [None]:
print(date_of_median)

In [None]:
dist_from_median.plot()