In [1]:
import xarray
import spires
import numpy as np
import matplotlib.pyplot as plt

In [2]:
interpolator = spires.LutInterpolator(lut_file='../tests/data/lut_sentinel2b_b2to12_3um_dust.mat')
bands = interpolator.bands
solar_angles = interpolator.solar_angles
dust_concentrations = interpolator.dust_concentrations
grain_sizes = interpolator.grain_sizes
reflectances = interpolator.reflectances

In [3]:
lut = interpolator.to_xarray()

In [4]:
r = xarray.load_dataset('../tests/data/sentinel_r.nc')
r0 = xarray.load_dataset('../tests/data/sentinel_r0.nc')
x0 = np.array([0.5, 0.05, 10, 250])

In [5]:
#r['reflectance']

In [6]:
ds = xarray.Dataset()
ds['r'] = r['reflectance']
ds['r0'] = r0['reflectance']
ds['solar_z'] = r['sun_zenith_grid']
ds

# Take a single day

In [7]:
date = '2024-02-25'
ts = ds.sel(time=date).squeeze().drop_vars('time')

# Using the Core function (i.e. an with output argument)
the function is abstracted in `spires.invert_array2d()`

In [8]:
spectra_targets = ts['r'].transpose('y', 'x', 'band')
spectra_backgrounds = ts['r0'].transpose('y', 'x', 'band')
obs_solar_angles = ts['solar_z'].transpose('y', 'x')

In [None]:
%%time
spires.core.invert_array2d(spectra_backgrounds=spectra_backgrounds, 
                           spectra_targets=spectra_targets, 
                           spectrum_shade=spectrum_shade, 
                           obs_solar_angles=obs_solar_angles, 
                           bands=bands, 
                           solar_angles=solar_angles, 
                           dust_concentrations=dust_concentrations, 
                           grain_sizes=grain_sizes, 
                           lut=reflectances, 
                           results=results, 
                           max_eval=100, 
                           x0=x0, 
                           algorithm=2)

NameError: name 'results' is not defined

In [None]:
plt.imshow(results[:,:, 0])

# High level functions

In [11]:
%%time
results = spires.speedy_invert_array1d(spectra_targets=spectra_targets.stack(location=('y', 'x')).transpose('location', 'band'),
                                       spectra_backgrounds=spectra_backgrounds.stack(location=('y', 'x')).transpose('location', 'band'),
                                       obs_solar_anglesb=obs_solar_angles.stack(location=('y', 'x')),
                                       interpolator=interpolator)

CPU times: user 1min 25s, sys: 380 ms, total: 1min 25s
Wall time: 1min 25s


array([[9.63259431e-01, 0.00000000e+00, 6.49353634e+02, 1.17120886e+03],
       [2.83946861e-01, 0.00000000e+00, 1.29616305e+02, 5.07527026e+02],
       [7.41671411e-01, 0.00000000e+00, 7.38506177e+02, 1.09101886e+03],
       ...,
       [2.22510146e-01, 0.00000000e+00, 0.00000000e+00, 3.00000000e+01],
       [2.23504038e-01, 0.00000000e+00, 0.00000000e+00, 3.00000000e+01],
       [1.35933824e-01, 2.69882276e-02, 7.93106034e+00, 3.07768281e+01]])

In [None]:
spectra_targets = ts['r']
spectra_backgrounds = ts['r0']
obs_solar_angles = ts['solar_z']
x0 = np.array([0.5, 0.05, 10, 250])

# Attempting to parallelize with dask
This won't work since `speedy_invert_array2d` expects an xarray

## This works

In [24]:
import dask
import zarr

In [None]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=112, 
                       threads_per_worker=1,
                       processes=True,   
                       dashboard_address='localhost:8788')

In [None]:
spectra_targets = ts['r'].transpose('y', 'x', 'band')
spectra_backgrounds = ts['r0'].transpose('y', 'x', 'band')
obs_solar_angles = ts['solar_z'].transpose('y', 'x')

In [None]:
ds = xarray.open_zarr('/data/sentinel2/zarrs/10SFH_sharpend.zarr')
ds = ds.sel(band=['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B11', 'B12', 'B8'])

spectra_targets = ds.isel(time=100)['reflectance'].transpose('y', 'x', 'band')
spectra_backgrounds = (spectra_targets/2)
obs_solar_angles = ds['sun_zenith_grid'].isel(time=100).interp(y_angles=ds.y, x_angles=ds.x, method='nearest').squeeze().chunk(x=500, y=500)
spectrum_shade = np.zeros_like(spectra_targets[0,0,:])

In [21]:
%%time
# We write the chunked arrays to a tempfile so that each worker can grab the chunks it needs
import tempfile

tmp_st = tempfile.NamedTemporaryFile(suffix=".nc")
spectra_targets.to_netcdf(tmp_st.name, engine='netcdf4', format='NETCDF4')

tmp_sb = tempfile.NamedTemporaryFile(suffix=".nc")
spectra_backgrounds.to_netcdf(tmp_sb.name, engine='netcdf4', format='NETCDF4')

tmp_so = tempfile.NamedTemporaryFile(suffix=".nc")
obs_solar_angles.to_netcdf(tmp_so.name, engine='netcdf4', format='NETCDF4')

CPU times: user 93 ms, sys: 204 ms, total: 297 ms
Wall time: 165 ms


In [22]:
# We lazily read the data back in
chunksize = 200
spectra_targets = xarray.open_dataarray(tmp_st.name, chunks={'x': chunksize, 'y': chunksize, 'band': -1})
spectra_backgrounds = xarray.open_dataarray(tmp_sb.name, chunks={'x': chunksize, 'y': chunksize, 'band': -1})
obs_solar_angles = xarray.open_dataarray(tmp_so.name, chunks={'x': chunksize, 'y': chunksize})

- 64 workers
    - nonchunked/nonscattered
        - 500: 3:02
    - chunked+scattered        
        - 250: 2:20 nonscattered: 2:18
        - 325: 2:28
        - 500: 2:58
- 96 workers
    - chunked/scattered
        - chunksize 200: 1:57
        - chunksize 250: 1:49
        - chunksize 300: 2:02
        - chunksize 325: 2:11
        - chunksize 500: 2:35 # nongraceful
- 112 workers
    - chunked/scattered
        - chunksize 200: 1:57
        - chunksize 250: 1:48
        - chunksize 275: 1:51
        - chunksize 300: 1:56
        - chunksize 325: 2:04
        - chunksize 350: 2:00
        - chunksize 400: 2:04
        - chunksize 500: 2:06
- 120 workers
    - chunked/scattered
        - chunksize 200: 2:09
        - chunksize 250: 1:58
        - chunksize 300: 1:54
        - chunksize 325: 1:54
        - chunksize 350: 2:09
- 128 workers
    - noscattered/nonchunked: -40 seconds    - 
        - chunksize 325: 2:56
        - chunksize 500: 2:51
    - scattered/chunked: -40 seconds
        - chunksize 325: 2:28
    - nonscattered:
        - chunksize 100: 4:46
        - chunksize 325: 2:00
        - chunksize 500: 2:54
    - scattered:
        - chunksize 100: 4:46
        - chunksize 200: fail
        - chunksize 250: 1:56
        - chunksize 300: 2:03
        - chunksize 325: 2:00 | 2:03
        - chunksize 350: 2:15
        - chunksize 400: 2:19
        - chunksize 500: 2:11
        - chunksize 750: 3 min+

In [26]:
client = Client(cluster)

In [29]:
%%time
a = dask.array.from_array(interpolator.reflectances)
dsk = client.scatter(dict(a.dask), broadcast=True)
a = dask.array.Array(dsk, name=a.name, chunks=a.chunks, dtype=a.dtype, meta=a._meta, shape=a.shape)
refletance_scattered = xarray.DataArray(a, dims=['bands', 'sz', 'dust', 'grain'])

CPU times: user 295 ms, sys: 243 ms, total: 539 ms
Wall time: 650 ms


In [35]:
%%time
results = xarray.apply_ufunc(
    spires.speedy_invert_array2d,
    spectra_targets,
    spectra_backgrounds,
    obs_solar_angles,    
    interpolator.bands.data,
    interpolator.solar_angles.data,
    interpolator.dust_concentrations.data,
    interpolator.grain_sizes.data,        
    #interpolator.reflectances,
    refletance_scattered,
    input_core_dims=[['band'], ['band'], [],['bands'], ['solar'], ['dust'], ['grain'], ['bands', 'sz', 'dust', 'grain']],
    output_core_dims=[['property']],
    dask='parallelized',
    dask_gufunc_kwargs={'allow_rechunk': False, 'output_sizes': {'property': 4}},
    output_dtypes=[float],
    vectorize=False
).compute()

FutureCancelledError: finalize-hlgfinalizecompute-d536103a33724ef7a6e1377637a9f903 cancelled for reason: lost dependencies.

In [34]:
%%time
with Client(cluster) as client:
    # We scatter the interpolator. This is super odd but somehow works
    
    a = dask.array.from_array(interpolator.reflectances)
    dsk = client.scatter(dict(a.dask), broadcast=True)
    a = dask.array.Array(dsk, name=a.name, chunks=a.chunks, dtype=a.dtype, meta=a._meta, shape=a.shape)
    refletance_scattered = xarray.DataArray(a, dims=['bands', 'sz', 'dust', 'grain'])

    results = xarray.apply_ufunc(
        spires.speedy_invert_array2d,
        spectra_targets,
        spectra_backgrounds,
        obs_solar_angles,    
        interpolator.bands.data,
        interpolator.solar_angles.data,
        interpolator.dust_concentrations.data,
        interpolator.grain_sizes.data,        
        #interpolator.reflectances,
        refletance_scattered,
        input_core_dims=[['band'], ['band'], [],['bands'], ['solar'], ['dust'], ['grain'], ['bands', 'sz', 'dust', 'grain']],
        output_core_dims=[['property']],
        dask='parallelized',
        dask_gufunc_kwargs={'allow_rechunk': False, 'output_sizes': {'property': 4}},
        output_dtypes=[float],
        vectorize=False
    )
    r = results.compute()

CPU times: user 1.95 s, sys: 1.61 s, total: 3.56 s
Wall time: 6.63 s


In [None]:
r.isel(property=0).plot()

In [37]:
tmp_st.close()
tmp_sb.close()
tmp_so.close()

In [36]:
client.close()
cluster.close()

# With Zarr

In [None]:
client = Client(cluster)

In [None]:
client.restart()

In [None]:
%%time
store_t = zarr.storage.MemoryStore()
spectra_targets.chunk(band=-1, x=100, y=100).to_zarr(store_t, zarr_format=2, compute=True)

store_r = zarr.storage.MemoryStore()
spectra_backgrounds.chunk(band=-1, x=100, y=100).to_zarr(store_r, zarr_format=2, compute=True)

store_s = zarr.storage.MemoryStore()
obs_solar_angles.chunk(x=100, y=100).to_zarr(store_s, zarr_format=2, compute=True)

In [None]:
spectra_targets = xarray.open_zarr(store_t, chunks={'x': 100, 'y': 100, 'band': -1})['r']
spectra_backgrounds = xarray.open_zarr(store_r, chunks={'x': 100, 'y': 100, 'band': -1})['r0']
obs_solar_angles = xarray.open_zarr(store_s, chunks={'x': 100, 'y': 100})['solar_z']

In [None]:
%%time
store_t = zarr.storage.MemoryStore()
spectra_targets.chunk(band=-1, x=100, y=100).to_zarr(store_t, zarr_format=2, compute=True)
spectra_targets = xarray.open_zarr(store_t, chunks={'x': 100, 'y': 100, 'band': -1})['r']

store_r = zarr.storage.MemoryStore()
spectra_backgrounds.chunk(band=-1, x=100, y=100).to_zarr(store_r, zarr_format=2, compute=True)
spectra_backgrounds = xarray.open_zarr(store_r, chunks={'x': 100, 'y': 100, 'band': -1})['r0']

store_s = zarr.storage.MemoryStore()
obs_solar_angles.chunk(x=100, y=100).to_zarr(store_s, zarr_format=2, compute=True)
obs_solar_angles = xarray.open_zarr(store_s, chunks={'x': 100, 'y': 100})['solar_z']

In [None]:
results = xarray.apply_ufunc(
    spires.speedy_invert_array2d,
    spectra_targets,
    spectra_backgrounds,
    obs_solar_angles,    
    interpolator.bands.data,
    interpolator.solar_angles.data,
    interpolator.dust_concentrations.data,
    interpolator.grain_sizes.data,
    #interpolator.reflectances.data,
    refletance_scattered,
    #input_core_dims=[['band'], ['band'], [],[], [], [], [], []],
    input_core_dims=[['band'], ['band'], [],['band'], ['solar'], ['dust'], ['grain'], ['bands', 'sz', 'dust', 'grain']],
    output_core_dims=[['property']],
    dask='parallelized',
    dask_gufunc_kwargs={'allow_rechunk': False, 'output_sizes': {'property': 4}},
    output_dtypes=[float],
    vectorize=False
)

In [None]:
tmp_so.close()

In [None]:
%%timeit
results.compute()

In [None]:
client.close()

In [None]:
%%time
r = results.compute()

# Process function

In [40]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=64, 
                       threads_per_worker=1,
                       processes=True,   
                       dashboard_address='localhost:8788')

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44277 instead


In [13]:
cluster.close()

In [38]:
obs_solar_angles

Unnamed: 0,Array,Chunk
Bytes,4.73 MiB,156.25 kiB
Shape,"(921, 1347)","(200, 200)"
Dask graph,35 chunks in 2 graph layers,35 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 4.73 MiB 156.25 kiB Shape (921, 1347) (200, 200) Dask graph 35 chunks in 2 graph layers Data type float32 numpy.ndarray",1347  921,

Unnamed: 0,Array,Chunk
Bytes,4.73 MiB,156.25 kiB
Shape,"(921, 1347)","(200, 200)"
Dask graph,35 chunks in 2 graph layers,35 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [41]:
%%time
results = spires.process.speedy_invert_dask(spectra_targets=spectra_targets,
                                 spectra_backgrounds=spectra_backgrounds,
                                 obs_solar_angles=obs_solar_angles,
                                 bands=lut.band.data,
                                 solar_angles=lut.solar_angle.data,
                                 dust_concentrations=interpolator.dust_concentrations.data,
                                 grain_sizes=interpolator.grain_sizes.data,  
                                 reflectances=interpolator.reflectances,
                                 cluster=cluster, chunksize=200)

CPU times: user 2.99 s, sys: 2.7 s, total: 5.68 s
Wall time: 9.07 s


In [51]:
results['property']=['fsca', 'fshade', 'dust', 'grain']
results.to_dataset( dim='property')