In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import gtsa

from pathlib import Path
import shutil
import psutil
import pandas as pd
import hvplot.xarray

# Raster stacking

Stacks single band rasters and chunks along the time dimension (on disk) for memory-efficient data retrieval.

#### Prerequesites
- Download DEM data with `00_download_dem_data.py` or `00_download_dem_data.ipynb`

## Start dask cluster
- For parallel read/write

In [3]:
workers = psutil.cpu_count(logical=True)-1
client = gtsa.io.dask_start_cluster(workers,
                                    ip_address=None, # replace with address if working on remote machine
                                    port='8787', # if occupies a different port will automatically be assigned
                                   )


Dask dashboard at: http://127.0.0.1:8787/status
Workers: 31
Threads per worker: 1 



## Get DEM file paths and time stamps

In [4]:
# data_dir = '../../data/dems/south-cascade/' # small test dataset
data_dir = '../../data/dems/mount-baker' # large dataset

In [5]:
dems = [x.as_posix() for x in sorted(Path(data_dir).glob('*.tif'))]
date_strings = [x[1:-1] for x in gtsa.io.parse_timestamps(dems,date_string_pattern='_........_')]
date_strings, dems = list(zip(*sorted(zip(date_strings, dems)))) # ensure chronological sorting 
date_times = [pd.to_datetime(x, format="%Y%m%d") for x in date_strings]

In [6]:
ref_dem = dems[-1] # always last after chronological sorting
ref_dem

'../../data/dems/mount-baker/WADNR_mount-baker_20150827_1m_dem.tif'

## Reproject to reference DEM grid
- Create a reprojected NetCDF file for each DEM
- Loads all NetCDF files lazily

In [14]:
ds = gtsa.io.xr_stack_geotifs(dems,
                              date_times,
                              ref_dem,
                              resampling="bilinear",
                              save_to_nc = True,
                              nc_out_dir = Path(data_dir,'nc_files').as_posix(),
                              overwrite = False)

Reading files from ../../data/dems/mount-baker/nc_files


## Examine current chunk shape
- Each time stamped DEM is a single chunk

In [15]:
ds['band1']

Unnamed: 0,Array,Chunk
Bytes,22.43 GiB,127.98 MiB
Shape,"(11, 28392, 19282)","(1, 7029, 4773)"
Dask graph,275 chunks in 23 graph layers,275 chunks in 23 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 22.43 GiB 127.98 MiB Shape (11, 28392, 19282) (1, 7029, 4773) Dask graph 275 chunks in 23 graph layers Data type float32 numpy.ndarray",19282  28392  11,

Unnamed: 0,Array,Chunk
Bytes,22.43 GiB,127.98 MiB
Shape,"(11, 28392, 19282)","(1, 7029, 4773)"
Dask graph,275 chunks in 23 graph layers,275 chunks in 23 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [16]:
ds['band1'].sel(time = ds.time.values[0])

Unnamed: 0,Array,Chunk
Bytes,2.04 GiB,127.98 MiB
Shape,"(28392, 19282)","(7029, 4773)"
Dask graph,25 chunks in 24 graph layers,25 chunks in 24 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.04 GiB 127.98 MiB Shape (28392, 19282) (7029, 4773) Dask graph 25 chunks in 24 graph layers Data type float32 numpy.ndarray",19282  28392,

Unnamed: 0,Array,Chunk
Bytes,2.04 GiB,127.98 MiB
Shape,"(28392, 19282)","(7029, 4773)"
Dask graph,25 chunks in 24 graph layers,25 chunks in 24 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## Rechunk along time dimension
- Creates temporary zarr file for efficient rechunking
- Saves a zarr file chunked along full time dimension to disk
- Significantly improves dask worker occupation and processing time for computations along the time dimension

In [17]:
ds_zarr = gtsa.io.create_zarr_stack(ds,
                                    output_directory = Path(data_dir,'stack').as_posix(),
                                    variable_name='band1',
                                    zarr_stack_file_name='stack.zarr',
                                    overwrite = False,
                                    cleanup=True)

Creating temporary zarr stack
/
 ├── band1 (11, 28392, 19282) float32
 ├── time (11,) int64
 ├── x (19282,) float64
 └── y (28392,) float64
Name               : /band1
Type               : zarr.core.Array
Data type          : float32
Shape              : (11, 28392, 19282)
Chunk shape        : (1, 5679, 3857)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.DirectoryStore
No. bytes          : 24087999936 (22.4G)
No. bytes stored   : 1963497325 (1.8G)
Storage ratio      : 12.3
Chunks initialized : 275/275

Rechunking temporary zarr stack and saving as
../../data/dems/mount-baker/stack/stack.zarr
Rechunked zarr file info
/
 ├── band1 (11, 28392, 19282) float32
 ├── time (11,) int64
 ├── x (19282,) float64
 └── y (28392,) float64
Name               : /band1
Type               : zarr.core.Array
Data type          : float32
Shape              : (11, 28392, 19282)
Chunk shape   

In [None]:
ds_zarr['band1']

In [None]:
ds_zarr['band1'].sel(time = ds_zarr.time.values[0])

In [None]:
ll ../../data/dems/mount-baker/

In [None]:
rm -rf ../../data/dems/mount-baker/stack

In [None]:
rm -rf ../../data/dems/mount-baker/nc_files

In [5]:
client.get_versions(check=True)

{'scheduler': {'host': {'python': '3.11.4.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '5.15.0-76-generic',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'en_US.UTF-8'},
  'packages': {'python': '3.11.4.final.0',
   'dask': '2023.7.0',
   'distributed': '2023.7.0',
   'msgpack': '1.0.5',
   'cloudpickle': '2.2.1',
   'tornado': '6.3.2',
   'toolz': '0.12.0',
   'numpy': '1.24.4',
   'pandas': '2.0.3',
   'lz4': '4.3.2'}},
 'workers': {'tcp://127.0.0.1:33119': {'host': {'python': '3.11.4.final.0',
    'python-bits': 64,
    'OS': 'Linux',
    'OS-release': '5.15.0-76-generic',
    'machine': 'x86_64',
    'processor': 'x86_64',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'en_US.UTF-8'},
   'packages': {'python': '3.11.4.final.0',
    'dask': '2023.7.0',
    'distributed': '2023.7.0',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.3.2',
    'toolz': '0.12.0',
    'n

In [6]:
distributed.dask_worker

NameError: name 'distributed' is not defined

In [11]:
import distributed

In [13]:
distributed.scheduler.

<module 'distributed.scheduler' from '/mnt/working/knuth/sw/miniconda3/envs/gtsa/lib/python3.11/site-packages/distributed/scheduler.py'>