## Jupyter Notebooks

In [10]:
# Convenient jupyter setup
%load_ext autoreload
%autoreload 2
%config IPCompleter.greedy=True

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Libaries

In [11]:
import os
import dask
import dask.array as da
from dask.distributed import Client
import xarray as xr
from src.constants import SAT_DIR
from src.preprocessing.load_landsat_esa import clip, return_xy_dask
from src.preprocessing.esa_compress import compress_esa, decompress_esa

## Undefined client

In [None]:
client = dask.distributed.Client()

In [None]:
client

## Defined client 1GB

In [4]:
client = Client(n_workers=2, threads_per_worker=4, memory_limit="1GB")

In [5]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:44310  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 8  Memory: 2.00 GB


## Defined client 5GB

In [3]:
%time client5GB = Client(n_workers=2, threads_per_worker=4, memory_limit="5GB")

CPU times: user 62.6 ms, sys: 38.4 ms, total: 101 ms
Wall time: 4.13 s


In [5]:
%time client20GB = Client(n_workers=2, threads_per_worker=4, memory_limit="20GB")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38707 instead


CPU times: user 34.7 ms, sys: 23.9 ms, total: 58.5 ms
Wall time: 598 ms


In [3]:
%time client40GB = Client(n_workers=4, threads_per_worker=8, memory_limit="40GB")

CPU times: user 47.8 ms, sys: 55.1 ms, total: 103 ms
Wall time: 720 ms


In [29]:
client5GB

0,1
Client  Scheduler: tcp://127.0.0.1:41356  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 8  Memory: 10.00 GB


## Get data

In [4]:
input_direc = os.path.join(SAT_DIR, "inputs")
x_file_name = os.path.join(input_direc, "take_esa_coords_True_use_mfd_False_use_ffil_True_use_ir_False_x.nc")
y_file_name = os.path.join(input_direc, "take_esa_coords_True_use_mfd_False_use_ffil_False_use_ir_False_y.nc")
x_da = xr.open_dataset(x_file_name, chunks='auto').norm_refl  # {'year': 1, 'band': 1, 'mn': 1}
y_da = xr.open_dataset(y_file_name, chunks='auto').esa_cci
x_da, y_da = clip(x_da, y_da)

'clip'  0.00422 s



## Look at DataArrays

In [7]:
x_da

Unnamed: 0,Array,Chunk
Bytes,851.98 MB,47.33 MB
Shape,"(681, 1086, 3, 24, 4)","(227, 181, 3, 24, 4)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 851.98 MB 47.33 MB Shape (681, 1086, 3, 24, 4) (227, 181, 3, 24, 4) Count 19 Tasks 18 Chunks Type float32 numpy.ndarray",1086  681  4  24  3,

Unnamed: 0,Array,Chunk
Bytes,851.98 MB,47.33 MB
Shape,"(681, 1086, 3, 24, 4)","(227, 181, 3, 24, 4)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray


In [8]:
y_da

Unnamed: 0,Array,Chunk
Bytes,17.75 MB,17.75 MB
Shape,"(24, 681, 1086)","(24, 681, 1086)"
Count,2 Tasks,1 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 17.75 MB 17.75 MB Shape (24, 681, 1086) (24, 681, 1086) Count 2 Tasks 1 Chunks Type uint8 numpy.ndarray",1086  681  24,

Unnamed: 0,Array,Chunk
Bytes,17.75 MB,17.75 MB
Shape,"(24, 681, 1086)","(24, 681, 1086)"
Count,2 Tasks,1 Chunks
Type,uint8,numpy.ndarray


## Return xy dask array

In [12]:
def return_xy_dask(x_da, y_da, year=5):
    """
    return the x and y numpy arrays for a given number of years.
    Currently this function just returns (N, D) for x and (N,) for Y
    for UNET we want a function that returns (yr, y, xr, D) for x and (yr, y, x, D) for y
    :param x_da: xarray.DataArray, inputs
    :param y_da: xarray.DataArray, labels
    :param year: ints, single or list
    :return: x_val, y_val
    """

    def combine_first_two_indices(x_val, y_val):
        return (
            da.stack([x_val[:, :, i].ravel() for i in range(x_val.shape[2])], axis=1),
            y_val.ravel(),
        )

    def _return_xy_dask_array(x_da, y_da, yr=5):
        assert isinstance(yr, int)
        x_val = da.stack(
            [
                x_da.isel(year=0, mn=mn, band=band).data.ravel()
                for mn in range(len(x_da.mn.values))
                for band in range(len(x_da.band.values))
            ],
            axis=1,
        )  # [mn, band]
        return x_val, y_da.isel(year=yr).data.ravel()

    if isinstance(year, range) or isinstance(year, list):
        x_val_l, y_val_l = [], []
        for yr in year:
            x_val_p, y_val_p = _return_xy_dask_array(x_da, y_da, yr=yr)
            x_val_l.append(x_val_p)
            y_val_l.append(y_val_p)
        # x_val, y_val = da.stack(x_val_l), da.stack(y_val_l)
        x_val, y_val = combine_first_two_indices(da.stack(x_val_l), da.stack(y_val_l))
    else:
        x_val, y_val = _return_xy_dask_array(x_da, y_da, yr=year)
    return x_val, y_val

## Load x and y values

In [13]:
x_val, y_val = return_xy_dask(x_da, y_da, year=range(8, 19))
y_val = da.array(compress_esa(y_val))

In [9]:
x_val

Unnamed: 0,Array,Chunk
Bytes,390.49 MB,182.45 kB
Shape,"(8135226, 12)","(45612, 1)"
Count,11683 Tasks,2376 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 390.49 MB 182.45 kB Shape (8135226, 12) (45612, 1) Count 11683 Tasks 2376 Chunks Type float32 numpy.ndarray",12  8135226,

Unnamed: 0,Array,Chunk
Bytes,390.49 MB,182.45 kB
Shape,"(8135226, 12)","(45612, 1)"
Count,11683 Tasks,2376 Chunks
Type,float32,numpy.ndarray


In [10]:
y_val

Unnamed: 0,Array,Chunk
Bytes,8.14 MB,739.57 kB
Shape,"(8135226,)","(739566,)"
Count,46 Tasks,11 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 8.14 MB 739.57 kB Shape (8135226,) (739566,) Count 46 Tasks 11 Chunks Type uint8 numpy.ndarray",8135226  1,

Unnamed: 0,Array,Chunk
Bytes,8.14 MB,739.57 kB
Shape,"(8135226,)","(739566,)"
Count,46 Tasks,11 Chunks
Type,uint8,numpy.ndarray


## Train XGB model

In [39]:
import xgboost as xgb

def train_xgb_dask(
    client: dask.distributed.Client,
    train_X: dask.array.core.Array,
    train_Y: dask.array.core.Array,
    test_X: dask.array.core.Array,
    test_Y: dask.array.core.Array,
    objective: str = "multi:softmax",
    eta: float = 0.3,
    max_depth: int = 12,
    nthread: int = 16,
    num_round: int = 5,
):
    param = {}
    param["objective"] = objective  # use softmax multi-class classification
    param["eta"] = eta  # scale weight of positive examples
    param["max_depth"] = max_depth  # max_depth
    param["silent"] = 1
    param["nthread"] = nthread  # number of threads
    param["num_class"] = 20 # (np.max(train_Y) + 1).compute() # max size of labels.
    dtrain = xgb.dask.DaskDMatrix(client, train_X, train_Y)
    dtest = xgb.dask.DaskDMatrix(client, test_X, test_Y)

    return xgb.dask.train(
        client,
        param,
        dtrain,
        num_boost_round=num_round,
        evals=[(dtrain, "train"), (dtest, "test")],
    )

In [45]:
def load_and_train_xgb_dask():
    client40GB = Client(n_workers=4, threads_per_worker=8, memory_limit="40GB")
    input_direc = os.path.join(SAT_DIR, "inputs")
    x_file_name = os.path.join(input_direc, "take_esa_coords_True_use_mfd_False_use_ffil_True_use_ir_False_x.nc")
    y_file_name = os.path.join(input_direc, "take_esa_coords_True_use_mfd_False_use_ffil_False_use_ir_False_y.nc")
    x_da = xr.open_dataset(x_file_name, chunks='auto').norm_refl  # {'year': 1, 'band': 1, 'mn': 1}
    y_da = xr.open_dataset(y_file_name, chunks='auto').esa_cci
    x_da, y_da = clip(x_da, y_da)
    x_tr, y_tr = return_xy_dask(x_da, y_da, year=range(8, 19))
    y_tr = da.array(compress_esa(y_tr))
    x_te, y_te = return_xy_dask(x_da, y_da, year=range(19, 24))
    y_te = da.array(compress_esa(y_te))
    bst = train_xgb_dask(client20GB, x_tr.rechunk(chunks=(2772558, 12)), y_tr.rechunk(chunks=(2772558, 1)),
                         x_te.rechunk(chunks=(2772558, 12)), y_te.rechunk(chunks=(2772558, 1)))
    return bst

In [None]:
bst = load_and_train_xgb_dask()

'clip'  0.00370 s



In [40]:
bst = train_xgb_dask(client20GB, x_val.rechunk(chunks=(2772558, 12)), y_val.rechunk(chunks=(2772558, 1)),
                     x_te.rechunk(chunks=(2772558, 12)), y_te.rechunk(chunks=(2772558, 1)))

In [33]:
dtrain = xgb.dask.DaskDMatrix(client20GB, x_val.rechunk(chunks=(2772558, 12)), y_val.rechunk(chunks=(2772558, 1)))
xgb.dask.predict(client20GB, bst['booster'], dtrain)

Unnamed: 0,Array,Chunk
Bytes,32.54 MB,11.09 MB
Shape,"(8135226,)","(2772558,)"
Count,9 Tasks,3 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 32.54 MB 11.09 MB Shape (8135226,) (2772558,) Count 9 Tasks 3 Chunks Type float32 numpy.ndarray",8135226  1,

Unnamed: 0,Array,Chunk
Bytes,32.54 MB,11.09 MB
Shape,"(8135226,)","(2772558,)"
Count,9 Tasks,3 Chunks
Type,float32,numpy.ndarray


In [36]:
type(x_val)

dask.array.core.Array

In [31]:
bst['booster']

AttributeError: 'Booster' object has no attribute 'dask'

In [22]:
type(bst)

dict


Parameters: { silent } might not be used.

  This may not be accurate due to some parameters are only used in language bindings but
  passed down to XGBoost core.  Or some parameters are not used but slip through this
  verification. Please open an issue if you find above cases.


distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 16.58 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Worker is at 82% memory usage. Pausing worker.  Process memory: 16.58 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 16.79 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 16.81 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 17.29 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 17.76 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 18.23 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 18.70 GB -- Worker memory limit: 20.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 19.16 GB -- Worker memory limit: 20.00 GB
```

## Investigation into rechunking

In [22]:
y_val.rechunk(chunks=(2772558, 1))

Unnamed: 0,Array,Chunk
Bytes,8.14 MB,2.77 MB
Shape,"(8135226,)","(2772558,)"
Count,53 Tasks,3 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 8.14 MB 2.77 MB Shape (8135226,) (2772558,) Count 53 Tasks 3 Chunks Type uint8 numpy.ndarray",8135226  1,

Unnamed: 0,Array,Chunk
Bytes,8.14 MB,2.77 MB
Shape,"(8135226,)","(2772558,)"
Count,53 Tasks,3 Chunks
Type,uint8,numpy.ndarray


In [24]:
x_val.rechunk(chunks=(2772558, 12))

Unnamed: 0,Array,Chunk
Bytes,390.49 MB,133.08 MB
Shape,"(8135226, 12)","(2772558, 12)"
Count,11734 Tasks,3 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 390.49 MB 133.08 MB Shape (8135226, 12) (2772558, 12) Count 11734 Tasks 3 Chunks Type float32 numpy.ndarray",12  8135226,

Unnamed: 0,Array,Chunk
Bytes,390.49 MB,133.08 MB
Shape,"(8135226, 12)","(2772558, 12)"
Count,11734 Tasks,3 Chunks
Type,float32,numpy.ndarray
