In [None]:
# To use multi-processing, please install the following package
# The pros on using pymp over default multiprocessing is that the default one require prepare the data before hand
# for instance, when building the index with dggrid4py, it requries a geo pandas dataframe which we have to build either a big dataframe
# then trunck it for multi process (cost time) or build individual dataframe (cost memory)
# For pymp, the dataframe can be create at each process start then throw it away, so the max dataframe concurrent exists is the number of multiprocess. 
!pip install pymp-pypi
!pip install geoviews
!pip install flox
!pip install "dask[dataframe]"
!pip install datashader

In [1]:
import os
import warnings
warnings.filterwarnings('ignore')

# If query from STAC, need to clone the source from https://gitlab.ut.ee/geog/lgeo_datacube.git
# Change the following path if need 
os.sys.path.append('../')
os.sys.path.append('../../lgeo_datacube/apps/xarraySTAC')
from STACEntrypoint import STACEntrypoint
import xdggs

import numpy as np
import xarray as xr
import flox.xarray as fxr
import zarr
import rasterio
import time 
import geoviews as gv
import shapely
import geopandas as gpd
import holoviews as hv
from dask.distributed import Client
# Env Var setup for the dggrid
os.environ['DGGRID_PATH']='/home/dick/micromamba/envs/geo/bin/dggrid'



gv.extension('matplotlib','bokeh')
gv.output(dpi=120, fig='svg')

### Query the asset from STAC and open it as xarray

In [None]:
# STAC query parameter
searchpara = {
        'max_items' : 100,
        'collections' : ['esa-cci-2004-clipped-demo']
}
# xarray open_dataset parameter, open in dask 
openpara = {
        'chunks' : 'auto'
}
# If the assets require auth (GCP credential json file)
with rasterio.Env(GOOGLE_APPLICATION_CREDENTIALS='../../lgeo_datacube/apps/stac_catalog/config_dev/glomodat-stac-testing-svc.json'):
    stacdataset = STACEntrypoint()
    stacdataset = stacdataset.open_dataset('https://maps.landscape-geoinformatics.org/stac', stacsearchparam=searchpara, opendatasetpara=openpara)

In [None]:
findcollections = list(stacdataset.keys())
print(findcollections)

In [None]:
for i in stacdataset[findcollections[0]]['items']:
    print(i.attrs['id'])

In [None]:
dataset = stacdataset[findcollections[0]]['items'][0]

In [None]:
dataset = dataset.drop_indexes('band').squeeze('band')

### Prepare to convert index from lon,lat to cell id
There are two ways to generate dggs cell id. Currently the library only support fixed naming , which is lat lon

   **Method A: Set as xindex**
   - 1. Rename (x ,y) to (lon, lat)
   - 2. Stack over x and y , the shape of the dataset will become N x 1 , where N = (x*y) 
   - 3. Create a variable with data in tuple format , ex: (lon,lat)
   - 4. Set the required parameters for creating dggs idx in the newly created variable's attrs
   - 5. Finally, set the new variable as index by set_xindexs
    
   **Method B: Set index when performing stack**
   - 1. Rename (x ,y) to (lon, lat)
   - 2. Set the required parameters for creating dggs idx in either 'lat' or 'lon' variable's attrs
   - 3. Finally, set the new variable as index by dataset.stack(cell_ids=['lon','lat'],index_cls=xdggs.DGGSIndex)

### Method A

In [None]:
# Step 1.
dataset_methodA = dataset.rename({'x':'lon','y':'lat'})
print('Step 1 completed')

# Step 2
start=time.time()
dataset_methodA = dataset_methodA.stack(cell_ids=['lat','lon'])
end=time.time()
print(f'Step 2 completed {end-start}')
print(f'Coordinate Check: {dataset_methodA.cell_ids.data[0]}')

# Step 3.
start=time.time()
dataset_methodA = dataset_methodA.assign_coords({'cell_ids': (('lat','lon'),[ i for i in dataset_methodA.cell_ids.data] )})
end=time.time()
print(f'Step 3 completed {end-start}')

# Step 4.
# The resolution to be use in generating cell id , -1 == auto (by apporixmation of sphere's surface integral )
# trunk size : since the data is now flatten, the array representation is repeat every 9816 elements with identical lon value with corresponding lat value
# Resolution : Manaully set the resoltion to either 9 and 11 will case the dggrid fail with coordinate out of range exception. But 8 and 10 is working well.
#              Further testing suggested that odd resoultion is going to fail when even is fine.
#              It is tested with single or multi process, the result is the same, so it is not related to trunk problem.
#              To calculate auto resolution, the surface are of one trunk is calculated then divided by the number of row of the trunk.
#              It is tested that with trunk=9816*305 (3663 divided by 12 ), the avg km^2 per cell is ~ 0.6 (res 10)
#                                          =9816*1221  , km^2 ~ 0.848 (res 10)
#                                          =9816*3663  , km^2 ~ 1.4 (res 9)
#              In result, auto-resolution should use the whole extent for calculation, instead of just one trunk.
resolution=8
dataset_methodA['cell_ids'].attrs= {
                        'grid_name': 'isea',
                        'resolution': resolution,
                        'aperture': 7,
                        'topology': 'h',
                        'mp': 12,
                        'trunk': 9816*10,
                        'epsg': dataset.attrs['assets']['gsdata']['proj:epsg']['epsg']
}
print('Step 4 completed')

# Step 5.
start=time.time()
dataset_methodA = dataset_methodA.set_xindex('cell_ids',xdggs.DGGSIndex)
end=time.time()
print(f'Step 5 completed {end-start}')
# Some performance note for Method A: 

# For pixel resoultion : x: 9816 y: 3663
# For Step 2 and Step3 Method A roughtly use 6GB RAM for processing. It tooks ~4-7mins with 12 cores for conversion and ~32mins for signle core

# For pixel resoultion : x: 43200 y: 21600
# For Step 2 Method A roughtly use 24GB RAM while processing, those memory usage will be release after finished  
# After that, if accessing the the cell_ids (ex Step 3), the memory usage will blows up to 29GB (max memory of notebook) and keep on consuming page memory

In [None]:
dataset_methodA=dataset_methodA.drop_dims(['lon','lat'])
dataset_methodA

In [None]:
# Number of unique idx
np.unique(dataset_methodA.cell_ids.data).shape

### Save and load with Zarr (re-instantiate index)

In [None]:
# Save to zarr with compression
# Currently, the result of auto resolution can't be propagete back to the dataset.
# the resolution has to update manually back to the variable's attrs
dataset_methodA['cell_ids'].attrs['resolution'] = dataset_methodA.xindexes.get('cell_ids')._resolution
compressor = zarr.Blosc(cname="zstd", clevel=3, shuffle=2)
dataset_methodA.to_zarr(f'{dataset.attrs['id']}_methodA_res8.zarr',encoding={"band_data": {"compressor": compressor},"cell_ids": {"compressor": compressor}})

In [2]:
# load back from zarr and re-instantiate the ISEA index
zarr_MethodA = xr.open_zarr('esa_cci_2004_clipped_resample_4326.tif_methodA_res8.zarr/')
zarr_MethodA=zarr_MethodA.dropna('cell_ids')
zarr_MethodA

Unnamed: 0,Array,Chunk
Bytes,78.87 MiB,498.80 kiB
Shape,"(10337342,)","(63847,)"
Dask graph,216 chunks in 3 graph layers,216 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 78.87 MiB 498.80 kiB Shape (10337342,) (63847,) Dask graph 216 chunks in 3 graph layers Data type float64 numpy.ndarray",10337342  1,

Unnamed: 0,Array,Chunk
Bytes,78.87 MiB,498.80 kiB
Shape,"(10337342,)","(63847,)"
Dask graph,216 chunks in 3 graph layers,216 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [3]:
# As the resoultion is specified in the variable attrs, after the auto resolution is calculated, 
# there is no way to put it back currently, so we have to manually set it.
zarr_MethodA=zarr_MethodA.drop_indexes('cell_ids').drop('band').set_xindex('cell_ids',xdggs.DGGSIndex)
zarr_MethodA

Unnamed: 0,Array,Chunk
Bytes,78.87 MiB,498.80 kiB
Shape,"(10337342,)","(63847,)"
Dask graph,216 chunks in 3 graph layers,216 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 78.87 MiB 498.80 kiB Shape (10337342,) (63847,) Dask graph 216 chunks in 3 graph layers Data type float64 numpy.ndarray",10337342  1,

Unnamed: 0,Array,Chunk
Bytes,78.87 MiB,498.80 kiB
Shape,"(10337342,)","(63847,)"
Dask graph,216 chunks in 3 graph layers,216 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [4]:
zarr_MethodA = zarr_MethodA.to_dataframe().reset_index()
print(zarr_MethodA.shape)
zarr_MethodA.to_parquet(f'./esa_cci_2004_clipped_resample_4326_parquet')

(10337342, 3)


### Select by lon,lat and Extent

In [None]:
# Some lon, lat reference.
#[(-54.169803910617055, 77.33133137161553),
#       (-54.169803910617055, 77.31750671697031),
#       (-54.169803910617055, 77.3036820623251) ]
#(longitude=[28.4,20.2],latitude=[57,60]
zarr_MethodA.dggs.sel_latlon(longitude=[77.33133137161553,77.3036820623251],latitude=[-54.169803910617055])

In [None]:
geojson = {
  "coordinates": [
    [[-2.4609375,56.5804494],
     [-1.7578125,51.2053036],
    [3.6914063,51.5343963],
      [ 3.1640625,57.0612533],
      [-2.4609375,56.2889971],
      [-2.4609375,56.5804494]]],
   "type": "Polygon"
}
start = time.time()
print(zarr_MethodA.dggs.polygon_for_extent(geojson,4326))
end = time.time()

print(end-start)

### Assigning back Lon,Lat by Hex center point 

In [None]:
# Generateing lat lon again with cell_ids
start = time.time()
zarr_MethodA = zarr_MethodA.dggs.assign_latlon_coords()
end = time.time()
print(end-start)
zarr_MethodA

In [None]:
zarr_MethodA.to_zarr(f'{dataset.attrs['id']}_methodA_res8_withLonLat.zarr',encoding={"band_data": {"compressor": compressor},
                                                                        "cell_ids": {"compressor": compressor},
                                                                        "latitude": {"compressor": compressor},
                                                                        "longitude": {"compressor": compressor}})

### Aggregate  and Plotting for Extent
1. Perform grouping on the whole dataset with cell_ids and calculate the mean for each group.
2. generate geometry (hex_geometry) for the selected extent (those hexagon is store as object inside the xarray, so it is not possible to save to zarr anymore.
4. convert the resulting xarray to geo pandas dataframe
5. plot those hexagon with geoviews

Dimension Summary :

1 variable x 35,956,008 cells reduce to 1 variable x 5,722,332 cells

In [2]:
# Load back the saved zarr data , without dask
zarr_MethodA = xr.open_zarr('esa_cci_2004_clipped_resample_4326.tif_methodA_res8.zarr',chunks={'cell_ids':'auto'})
zarr_MethodA=zarr_MethodA.drop_indexes(['cell_ids']).set_xindex('cell_ids',xdggs.DGGSIndex).drop_vars('band')
zarr_MethodA = zarr_MethodA.chunk({'cell_ids':'auto'}).compute()
zarr_MethodA

NameError: name 'xr' is not defined

In [3]:
#zarr_MethodA.coords['name'] = ('name'),list(zarr_MethodA.keys()) # make variable names as cooridnate, for flox reduce on mode, which only support blockwise
group = np.unique(zarr_MethodA.cell_ids.values) # for flox operation which require the final grouping lable , which is the unique cell id
client = Client(n_workers=6, threads_per_worker=2, memory_limit='2GB') # dask cluster
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 12,Total memory: 11.18 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45929,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 11.18 GiB

0,1
Comm: tcp://127.0.0.1:41059,Total threads: 2
Dashboard: http://127.0.0.1:35643/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34977,
Local directory: /tmp/dask-scratch-space/worker-fmw33a87,Local directory: /tmp/dask-scratch-space/worker-fmw33a87

0,1
Comm: tcp://127.0.0.1:33143,Total threads: 2
Dashboard: http://127.0.0.1:38883/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35907,
Local directory: /tmp/dask-scratch-space/worker-60fad636,Local directory: /tmp/dask-scratch-space/worker-60fad636

0,1
Comm: tcp://127.0.0.1:43011,Total threads: 2
Dashboard: http://127.0.0.1:41121/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45995,
Local directory: /tmp/dask-scratch-space/worker-ndn_xdmg,Local directory: /tmp/dask-scratch-space/worker-ndn_xdmg

0,1
Comm: tcp://127.0.0.1:39209,Total threads: 2
Dashboard: http://127.0.0.1:46149/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34947,
Local directory: /tmp/dask-scratch-space/worker-ky4c4dfu,Local directory: /tmp/dask-scratch-space/worker-ky4c4dfu

0,1
Comm: tcp://127.0.0.1:36793,Total threads: 2
Dashboard: http://127.0.0.1:33375/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39559,
Local directory: /tmp/dask-scratch-space/worker-790fb32v,Local directory: /tmp/dask-scratch-space/worker-790fb32v

0,1
Comm: tcp://127.0.0.1:34289,Total threads: 2
Dashboard: http://127.0.0.1:42773/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:41233,
Local directory: /tmp/dask-scratch-space/worker-ujl3vsvi,Local directory: /tmp/dask-scratch-space/worker-ujl3vsvi


In [4]:
zarr_MethodA

### Aggregate by Mean for the whole Extent (15s)  
15.2 s ± 2.24 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [5]:
%%timeit
# Step 2. Calculate the mean without parallelism
mean_data=fxr.xarray_reduce(zarr_MethodA,zarr_MethodA.cell_ids,dim='cell_ids',func="mean").compute()
mean_data

15.2 s ± 2.24 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Aggregate by Mdoe for the whole Extent (21mins)
21min 23s ± 6min 44s per loop (mean ± std. dev. of 4 runs, 1 loop each)

In [7]:
%%timeit -n 1 -r 4
# Step 2. Calculate the mean without parallelism
# It is find that for just single variable, the runtime of multiprocess is much more slower than just single core
#zarr_MethodA.coords['name'] = ('name'),['band']
#grp = np.unique(zarr_MethodA.cell_ids.values)
mode_data=fxr.xarray_reduce(zarr_MethodA,zarr_MethodA.cell_ids,dim='cell_ids',func="mode").compute()
mode_data

21min 23s ± 6min 44s per loop (mean ± std. dev. of 4 runs, 1 loop each)


### Aggregate by Median for the whole Extent (13mins)
13min 39s ± 37.2 s per loop (mean ± std. dev. of 4 runs, 1 loop each)

In [5]:
%%time
zarr_MethodA.coords['name'] = ('name'),['band']
#grp = np.unique(zarr_MethodA.cell_ids.values)
median_data=fxr.xarray_reduce(zarr_MethodA,zarr_MethodA.cell_ids,dim='cell_ids',func="median",expected_groups=group).compute()
median_data

CPU times: user 15min 48s, sys: 13.7 s, total: 16min 1s
Wall time: 15min


### Aggregate by Quantile(75%) for the whole Extent (15mins)
15min 24s ± 23.1 s per loop (mean ± std. dev. of 4 runs, 1 loop each)

In [9]:
%%timeit -n 1 -r 4
quantile75_data=fxr.xarray_reduce(zarr_MethodA,zarr_MethodA.cell_ids,dim='cell_ids',func="quantile",q=0.75).compute()
quantile75_data

15min 24s ± 23.1 s per loop (mean ± std. dev. of 4 runs, 1 loop each)


### Aggregate by max for the whole Extent (8.13s)
8.13 s ± 358 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [10]:
%%timeit
max_data=fxr.xarray_reduce(zarr_MethodA,zarr_MethodA.cell_ids,dim='cell_ids',func="max").compute()
max_data

8.13 s ± 358 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Generate Hexagon for selected extent

In [None]:
%%time
# Step 2 , generating hex gemorty for plotting with selected extend , with 12 cores
estonia = shapely.geometry.box(20.2,57.00, 28.4,60.0 )
geojson = {
  "coordinates": [ [ [i[0],i[1]] for  i in estonia.boundary.coords]],
   "type": "Polygon"
}
mean_data=mean_data.drop_indexes('cell_ids').set_xindex('cell_ids',xdggs.DGGSIndex)
mean_data = mean_data.dggs.geometry_for_extent(geojson,4326)

### Plotting Hexagon

In [None]:
#### convert to Geopanda Dataframe
mean_data= gpd.GeoDataFrame({'band_data':mean_data.band_data.values.T.reshape(-1),
                                'geometry':mean_data.hex_geometry.values},crs='epsg:4326')

In [None]:
hv.output(backend='bokeh')
start = time.time()
poly_plot = gv.Polygons(mean_data, vdims=["band_data"]).opts(alpha=0.6, color='band_data', cmap='RdYlBu_r',line_color=None).opts(width=600, height=600)
gv.tile_sources.OSM * poly_plot.opts(tools=['hover'])
#gv.Feature(graticules, group='Lines') * poly_plot.opts(tools=['hover'])

### Generate Hexagon for selected extent

In [None]:
%%time
estonia = shapely.geometry.box(20.2,57.00, 28.4,60.0 )
geojson = {
  "coordinates": [ [ [i[0],i[1]] for  i in estonia.boundary.coords]],
   "type": "Polygon"
}
mode_data = mode_data.drop_indexes(['cell_ids','band']).set_xindex('cell_ids',xdggs.DGGSIndex)
mode_data = mode_data.dggs.geometry_for_extent(estonia,4326)

### Plotting Hexagon

In [None]:
mode_data= gpd.GeoDataFrame({'band_data':mode_data.band_data.values.T.reshape(-1), 
                                'geometry':mode_data.hex_geometry.values.T.reshape(-1)},crs='epsg:4326')
hv.output(backend='bokeh')
start = time.time()
poly_plot = gv.Polygons(mode_data, vdims=["band_data"]).opts(alpha=0.6, color='band_data', cmap='RdYlBu_r',line_color=None).opts(width=600, height=600)
gv.tile_sources.OSM * poly_plot.opts(tools=['hover'])

### Method B

Method B is an experimental method that try to overcome the memory problem on large pixel resoultion. For instance, the issue that mentioned above in Step 3.
From experiment result, Method B used around 20GB RAM for (x: 43200 y: 21600), and 20GB of page memory. The total index calculation time is around 2hrs with 12 cores. 

Furthermore, with resolution (x: 9816 y: 3663), the use of xarray broadcast to create the lon,lat pair is much faster (~0.09s) in compare to Method A (Step 2. ~2s )

In [None]:
# Step 1.
dataset_methodB = dataset.rename({'x':'lon','y':'lat'})
print('Step 1 completed')

# Step 2
dataset_methodB['lat'].attrs= {
                        'grid_name': 'isea',
                        'resolution': 8,
                        'aperture': 7,
                        'topology': 'h',
                        'mp': 12,
                        'trunk': 9816*305,
                        'epsg': dataset.attrs['assets']['gsdata']['proj:epsg']['epsg']
}
print('Step 2 completed')
start=time.time()
dataset_methodB = dataset_methodB.stack(cell_ids=['lat','lon'],index_cls=xdggs.DGGSIndex)
end=time.time()
print(f'Step 3 completed {end-start}')

In [None]:
dataset_methodB

In [None]:
np.unique(dataset_methodB.cell_ids.data).shape

In [None]:
compressor = zarr.Blosc(cname="zstd", clevel=3, shuffle=2)
dataset_methodB.to_zarr(f'{dataset.attrs['id']}_methodB_res_8.zarr',encoding={"band_data": {"compressor": compressor},"cell_ids": {"compressor": compressor}})

In [None]:
zarr_MethodB = xr.open_zarr('esa_cci_2004_clipped_resample_4326.tif_methodB_res_8.zarr')

In [None]:
zarr_MethodB = zarr_MethodB.drop_indexes('cell_ids').set_xindex('cell_ids',xdggs.DGGSIndex)
zarr_MethodB.xindexes.get('cell_ids')._resolution = 8

In [None]:
zarr_MethodB

In [None]:
# Some lon, lat reference.
#[(-54.169803910617055, 77.33133137161553),
#       (-54.169803910617055, 77.31750671697031),
#       (-54.169803910617055, 77.3036820623251) ]

zarr_MethodB.dggs.sel_latlon(longitude=[77.33133137161553,77.3036820623251],latitude=[-54.169803910617055])

In [None]:
geojson = {
  "coordinates": [
    [[-2.4609375,56.5804494],
     [-1.7578125,51.2053036],
    [3.6914063,51.5343963],
      [ 3.1640625,57.0612533],
      [-2.4609375,56.2889971],
      [-2.4609375,56.5804494]]],
   "type": "Polygon"
}


In [None]:
start = time.time()
zarr_MethodB = zarr_MethodB.dggs.geometry_for_extent(geojson,'4326')
end = time.time()
print(end-start)
zarr_MethodB

In [None]:
notNaN = np.where(~np.isnan(zarr_MethodB.band_data.values)==True)[1]
zarr_MethodB=zarr_MethodB.sel(cell_ids=zarr_MethodB.cell_ids.data[notNaN])
zarr_MethodB=zarr_MethodB.drop_indexes('cell_ids')

In [None]:
zarr_MethodB= gpd.GeoDataFrame(zarr_MethodB['band_data'].T,geometry=zarr_MethodB.hex_geometry.values,crs='epsg:4326')

In [None]:
zarr_MethodB=zarr_MethodB.rename(columns={0:'band_data'})

In [None]:
hv.output(backend='bokeh')
start = time.time()
poly_plot = gv.Polygons(zarr_MethodB, vdims=["band_data"]).opts(alpha=0.2, color='band_data', cmap='RdYlBu_r',line_color=None).opts(width=600, height=600)
gv.tile_sources.OSM * poly_plot.opts(tools=['hover'])
#gv.Feature(graticules, group='Lines') * poly_plot.opts(tools=['hover'])