Major datasets that are missing: Groundwater/Surface Flooding

I think we should use a sparce array, but this is proving difficult to work in with Dask

# File Setup

In [1]:
# from dask.distributed import Client
# Setup a dask scheduler remotely to help the processing
# ssh into server, and: dask-scheduler
# ssh into server again in another window, and dask-worker <scheuler address with port>
# in a new terminal/prompt locally, ssh -L 8786<scheduler address with port> user@server
# optional, yet another window, start workers locally to help: dask-worker localhost:8786
# Leave all open and then run:
# client = Client('tcp://localhost:8786')
# I'm getting errors here, which I'm pretty sure is because of a version mismatch between local and external dask environment
# For now, just run locally
# Connect to the local Dask scheduler
# client = Client('tcp://161.65.32.69:8786')

# Dashboard link to monitor performance
# print(client.dashboard_link)


In [2]:
from joblib import Parallel, delayed
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
import datetime
import pyproj
from scipy.interpolate import griddata
import scipy as sp
import os
from itertools import combinations
import dask.array as da
from concurrent.futures import ThreadPoolExecutor
from oceanum.datamesh import Connector
datamesh=Connector(token='3052e2bdd10904ae353ac54ed205df32bfcc20e2')
# import sparse

In [3]:
# Define the current and target CRS using EPSG codes
current_crs = pyproj.CRS("EPSG:2193")  
target_crs = pyproj.CRS("EPSG:4326")

transformer = pyproj.Transformer.from_crs(current_crs, target_crs, always_xy=True,allow_ballpark=False)

# Load and preprocess Datasets

### DEM

In [4]:
xr_dem = datamesh.query({'datasource':'linz-wellington_2013-2014-dem_1m-2193',"geofilter":{"type":"bbox","geom":[174.536322,-41.442487,175.124777,-41.049083]}})
xr_dem = xr_dem.band_data



In [5]:
dem_values = xr_dem.values
fill_value = np.min(dem_values)

dem_values[dem_values==fill_value] = np.nan
xr_dem.values = dem_values

# Sparse array seems to not work later on with Dask
# xr_dem.data = xr_dem.chunk(100).data.map_blocks(sparse.COO.from_numpy)
xr_dem = xr_dem.astype(np.float32).chunk(100)

In [6]:
# Create a regular grid of coordinates
res = 4000# low res to make the initial calcs fast
x_regular = np.linspace(xr_dem['x'].min(), xr_dem['x'].max(), res)
y_regular = np.linspace(xr_dem['y'].min(), xr_dem['y'].max(), res)

# Interpolate data onto regular grid
data_interpolated = xr_dem[0,:,:].interp(y=y_regular, x=x_regular)

# Now you can use the interpolated data for transformation
xr_new_DEM = xr.DataArray(data_interpolated,coords={"y":y_regular,"x":x_regular},dims=["y","x"])

# Change to regular lat-lon coords
new_x,new_y = transformer.transform(x_regular,y_regular)
xr_new_DEM = xr_new_DEM.assign_coords(x=new_x,y=new_y)

# Chunk to save memory
xr_new_DEM = xr_new_DEM.chunk(100)

In [7]:
# Float 32 to save some memory
xr_new_DEM['x'] = xr_new_DEM.x.astype(np.float32)
xr_new_DEM['y'] = xr_new_DEM.y.astype(np.float32)

In [8]:
xr_new_DEM

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 17 graph layers,1600 chunks in 17 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 61.04 MiB 39.06 kiB Shape (4000, 4000) (100, 100) Dask graph 1600 chunks in 17 graph layers Data type float32 numpy.ndarray",4000  4000,

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 17 graph layers,1600 chunks in 17 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [9]:
# Create a grid from the DEM to use for the other variables
x = xr_new_DEM.x.astype(np.float32)
y = xr_new_DEM.y.astype(np.float32)
X, Y = np.meshgrid(x, y)

In [10]:
del data_interpolated
del dem_values
del xr_dem

### Seismic Data

In [11]:
# Long term Jack should put results straight on datamesh, but this is still needed
array_2pc = np.load('for_DataMesh/for_DataMesh/2perc_disps_sites_c_MDEz_uniform.npy')
array_10pc = np.load('for_DataMesh/for_DataMesh/10perc_disps_sites_c_MDEz_uniform.npy')

df_2pc = pd.DataFrame(array_2pc)
df_2pc.columns = ['ID','Lon','Lat','Uplift','Subsidence','absVLM']
lon_lats = [transformer.transform(x,y) for x,y in zip(df_2pc.Lon,df_2pc.Lat)]
df_2pc['Lon'] = [x[0] for x in lon_lats]
df_2pc['Lat'] = [x[1] for x in lon_lats]

df_10pc = pd.DataFrame(array_10pc)
df_10pc.columns = ['ID','Lon','Lat','Uplift','Subsidence','absVLM']
lon_lats = [transformer.transform(x,y) for x,y in zip(df_10pc.Lon,df_10pc.Lat)]
df_10pc['Lon'] = [x[0] for x in lon_lats]
df_10pc['Lat'] = [x[1] for x in lon_lats]

array_2pc = griddata((df_2pc['Lon'], df_2pc['Lat']), df_2pc.absVLM, (X, Y), method='linear')
array_10pc = griddata((df_10pc['Lon'], df_10pc['Lat']), df_10pc.absVLM, (X, Y), method='linear')
array_none = array_10pc.copy()
array_none[:,:] = 0

# Change data type
array_2pc = array_2pc.astype(np.float32)
array_10pc = array_10pc.astype(np.float32)
array_none = array_none.astype(np.float32)

# Put all into larger array
seismic_array = np.empty((array_2pc.shape[0],array_2pc.shape[1],3))
seismic_array[:,:,0] = array_none
seismic_array[:,:,1] = array_2pc
seismic_array[:,:,2] = array_10pc


In [12]:
# Put all into an xarray
xr_seismic = xr.DataArray(seismic_array,coords=[x,y,[0,2,10]],dims=['x','y','exc_prob']).astype(np.float32)

In [13]:
xr_seismic = xr_seismic.chunk(100)

In [14]:
xr_seismic

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,117.19 kiB
Shape,"(4000, 4000, 3)","(100, 100, 3)"
Dask graph,1600 chunks in 1 graph layer,1600 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 183.11 MiB 117.19 kiB Shape (4000, 4000, 3) (100, 100, 3) Dask graph 1600 chunks in 1 graph layer Data type float32 numpy.ndarray",3  4000  4000,

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,117.19 kiB
Shape,"(4000, 4000, 3)","(100, 100, 3)"
Dask graph,1600 chunks in 1 graph layer,1600 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [15]:
del seismic_array
del array_2pc
del array_10pc
del array_none

### Storm Surge (Replace with Total Water Level)

DataSet: https://data.4tu.nl/articles/_/13392314/1

In [16]:
# xr_storm_surge = xr.open_dataset('COAST-RP.nc')
# xr_100rp = xr_storm_surge['storm_tide_rp_0100']

In [17]:
# well_y = -41.274678
# well_x = 174.854143

In [18]:
# df_storm_surge = xr_storm_surge.to_dataframe()

In [19]:
# df_storm_surge.loc[:,'euclidean'] = (df_storm_surge.station_x_coordinate-well_x)**2+(df_storm_surge.station_y_coordinate-well_y)**2
# df_storm_surge = df_storm_surge.sort_values('euclidean').reset_index(drop=True)
# storm_tide_rps_dict = df_storm_surge.loc[0,[x for x in df_storm_surge.columns if 'storm_tide_rp' in x]].to_dict()

In [20]:
# storm_tide_rps_dict = {k.split('_')[-1]:v for k,v in storm_tide_rps_dict.items()}

In [21]:
# storm_tide_rps_dict

### VLM

In [22]:
df_vlms = pd.read_csv('Welly_VLM_2018-2023_100m.txt',delimiter='\t')
df_vlms = df_vlms.astype(np.float32)

In [23]:
df_vlms = df_vlms.rename(columns={'  0.000000':'lon','  0.000000.1':'lat'})

In [24]:
df_vlms.set_index(['lat','lon'],inplace=True)
df_vlms.columns = df_vlms.columns.astype(float)
df_vlms = df_vlms.reset_index()

In [25]:
cols = [x for x in df_vlms.columns if x not in ['lon','lat']]

In [26]:
# Function to perform interpolation for a given year
def interpolate_year(year):
    Z_year = griddata((df_vlms['lon'].astype(np.float32), df_vlms['lat'].astype(np.float32)), df_vlms[year].astype(np.float32), (X, Y), method='linear')
    return np.nan_to_num(Z_year, nan=0).astype(np.float32)

# Function to process a chunk of years
def process_chunk(chunk):
    with ThreadPoolExecutor() as executor:
        return list(executor.map(interpolate_year, chunk))

# Determine chunk size and number of chunks
chunk_size = 10  # Adjust based on available memory
chunks = [cols[i:i + chunk_size] for i in range(0, len(cols), chunk_size)]

# Initialize list to collect results
all_chunks_results = []

# Process each chunk
for chunk in chunks:
    chunk_results = process_chunk(chunk)
    all_chunks_results.append(np.stack(chunk_results, axis=-1))

# Concatenate all chunks along the time dimension
xr_vlm_grid = np.concatenate(all_chunks_results, axis=-1)

# Create DataArray
xr_vlm_grid = xr.DataArray(xr_vlm_grid, coords={"x": np.array(x), "y": np.array(y), "years": cols}, dims=['x', 'y', 'years'])

# Interpolate NaN values
xr_vlm_grid = (xr_vlm_grid.chunk(dict(y=-1)).interpolate_na(dim='y', method='linear') + xr_vlm_grid.chunk(dict(x=-1)).interpolate_na(dim='x', method='linear')) / 2

# rechunk
xr_vlm_grid = xr_vlm_grid.chunk(100)

In [27]:
# Average rates over time
xr_vlm_grid = xr_vlm_grid.mean('years')

In [28]:
xr_vlm_grid

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 19 graph layers,1600 chunks in 19 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 61.04 MiB 39.06 kiB Shape (4000, 4000) (100, 100) Dask graph 1600 chunks in 19 graph layers Data type float32 numpy.ndarray",4000  4000,

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 19 graph layers,1600 chunks in 19 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [29]:
del all_chunks_results
del df_vlms

### SLR

In [30]:
# Initialise years variables with a dataset
# this is only medium confidence, I haven't done low confidence
xr_slr = sp.io.loadmat('Ian_relabelled_sites/total_volc_noVLMssp585_medium_confidence_values.mat')
years = np.unique(xr_slr['years'])
num_years = len(years)

file_names = os.listdir("Ian_relabelled_sites/")
file_names = [x for x in file_names if ('total' in x)&('medium' in x)&('_noVLM' in x)]

quantiles = [0.17,0.5,0.83]
del xr_slr

In [31]:
# import dask
# with dask.config.set(**{'array.slicing.split_large_chunks': True}):
#     array.reshape(shape)

In [32]:
# Initialize a Dask array to store the results
slr_array = da.zeros(
    (len(x), len(y), len(np.unique(sp.io.loadmat(f'Ian_relabelled_sites/{file_names[0]}')['years'])), len(quantiles), len(file_names)),
    dtype=np.float32, chunks=(len(x), len(y), 1, 1, 1)
)

def process_file(k, file):
    print(file)
    data = sp.io.loadmat(f'Ian_relabelled_sites/{file}')
    years = np.unique(data['years'])
    lats = {loc: lat for loc, lat in zip(data['locations'].squeeze(), data['lat'].squeeze())}
    lons = {loc: lon for loc, lon in zip(data['locations'].squeeze(), data['lon'].squeeze())}

    slr_da = xr.DataArray(
        data['sea_level_change'],
        coords={
            'locations': data['locations'].squeeze(),
            'years': data['years'].squeeze(),
            'quantiles': data['quantiles'].squeeze()
        },
        dims=['locations', 'years', 'quantiles']
    ).sel(quantiles=quantiles).to_dataframe('slr')

    slr_da['Lat'] = slr_da.index.get_level_values('locations').map(lats)
    slr_da['Lon'] = slr_da.index.get_level_values('locations').map(lons)

    del data, lats, lons
    
    # Filter the DataFrame for the Wellington region
    slr_da = slr_da[
        (slr_da.Lon > np.min(x.values)) &
        (slr_da.Lat > np.min(y.values)) &
        (slr_da.Lon < np.max(x.values)) &
        (slr_da.Lat < np.max(y.values))
    ].reset_index().drop(columns='locations').astype(np.float32)

    # Convert DataFrame columns to NumPy arrays
    years_arr = slr_da['years'].to_numpy()
    quantiles_arr = slr_da['quantiles'].to_numpy()
    lon_arr = slr_da['Lon'].to_numpy()
    lat_arr = slr_da['Lat'].to_numpy()
    slr_arr = slr_da['slr'].to_numpy()

    def process_year_quantile(year, quantile):
        mask = (years_arr == year) & (quantiles_arr == quantile)
        if np.any(mask):
            grid = griddata((lon_arr[mask], lat_arr[mask]), slr_arr[mask], (X, Y), method='linear').astype(np.float32)
            return grid
        return None

    grids = Parallel(n_jobs=-1)(
        delayed(process_year_quantile)(year, quantile)
        for year in years
        for quantile in quantiles
    )

    del slr_da, years_arr, quantiles_arr, lon_arr, lat_arr, slr_arr

    # Filter out None results and stack into a Dask array
    valid_grids = [grid for grid in grids if grid is not None]
    if valid_grids:
        stacked_grids = da.stack(valid_grids, axis=-1).rechunk((len(x)/20, len(y)/20, 1, 1))
        
        slr_array[:, :, :, :, k] = stacked_grids.reshape((len(x), len(y), num_years, len(quantiles)))
        del stacked_grids
    return k

# Process each file
for k, file in enumerate(file_names):
    process_file(k, file)

# Create an xarray DataArray
xr_slr = xr.DataArray(
    slr_array,
    coords={"x": np.array(x), "y": np.array(y), "years": years, "quantiles": quantiles, "scenarios": file_names},
    dims=['x', 'y', 'years', 'quantiles', 'scenarios']
)
del slr_array

# Fill missing values
xr_slr_filled = xr_slr.ffill('x').bfill('x').ffill('y').bfill('y')
del xr_slr

# rechunking
xr_slr_filled = xr_slr_filled.chunk(100)

total_volc_noVLMssp119_medium_confidence_values.mat
total_volc_noVLMssp126_medium_confidence_values.mat
total_volc_noVLMssp245_medium_confidence_values.mat
total_volc_noVLMssp370_medium_confidence_values.mat
total_volc_noVLMssp585_medium_confidence_values.mat


In [33]:
xr_slr_filled

Unnamed: 0,Array,Chunk
Bytes,12.52 GiB,8.01 MiB
Shape,"(4000, 4000, 14, 3, 5)","(100, 100, 14, 3, 5)"
Dask graph,1600 chunks in 22 graph layers,1600 chunks in 22 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 12.52 GiB 8.01 MiB Shape (4000, 4000, 14, 3, 5) (100, 100, 14, 3, 5) Dask graph 1600 chunks in 22 graph layers Data type float32 numpy.ndarray",4000  4000  5  3  14,

Unnamed: 0,Array,Chunk
Bytes,12.52 GiB,8.01 MiB
Shape,"(4000, 4000, 14, 3, 5)","(100, 100, 14, 3, 5)"
Dask graph,1600 chunks in 22 graph layers,1600 chunks in 22 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


# Combine into a single dataset

In [34]:
xr_wellington = xr.Dataset({
    'DEM':xr_new_DEM,
    'seismic':xr_seismic,
    'vlm':xr_vlm_grid,
    'slr':xr_slr_filled
})
del xr_new_DEM
del xr_seismic
del xr_vlm_grid
del xr_slr_filled

In [35]:
xr_wellington

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 17 graph layers,1600 chunks in 17 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 61.04 MiB 39.06 kiB Shape (4000, 4000) (100, 100) Dask graph 1600 chunks in 17 graph layers Data type float32 numpy.ndarray",4000  4000,

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 17 graph layers,1600 chunks in 17 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,117.19 kiB
Shape,"(4000, 4000, 3)","(100, 100, 3)"
Dask graph,1600 chunks in 1 graph layer,1600 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 183.11 MiB 117.19 kiB Shape (4000, 4000, 3) (100, 100, 3) Dask graph 1600 chunks in 1 graph layer Data type float32 numpy.ndarray",3  4000  4000,

Unnamed: 0,Array,Chunk
Bytes,183.11 MiB,117.19 kiB
Shape,"(4000, 4000, 3)","(100, 100, 3)"
Dask graph,1600 chunks in 1 graph layer,1600 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 19 graph layers,1600 chunks in 19 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 61.04 MiB 39.06 kiB Shape (4000, 4000) (100, 100) Dask graph 1600 chunks in 19 graph layers Data type float32 numpy.ndarray",4000  4000,

Unnamed: 0,Array,Chunk
Bytes,61.04 MiB,39.06 kiB
Shape,"(4000, 4000)","(100, 100)"
Dask graph,1600 chunks in 19 graph layers,1600 chunks in 19 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,12.52 GiB,8.01 MiB
Shape,"(4000, 4000, 14, 3, 5)","(100, 100, 14, 3, 5)"
Dask graph,1600 chunks in 22 graph layers,1600 chunks in 22 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 12.52 GiB 8.01 MiB Shape (4000, 4000, 14, 3, 5) (100, 100, 14, 3, 5) Dask graph 1600 chunks in 22 graph layers Data type float32 numpy.ndarray",4000  4000  5  3  14,

Unnamed: 0,Array,Chunk
Bytes,12.52 GiB,8.01 MiB
Shape,"(4000, 4000, 14, 3, 5)","(100, 100, 14, 3, 5)"
Dask graph,1600 chunks in 22 graph layers,1600 chunks in 22 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


# Save to the Mesh

In [36]:

# Save with compression and parallel processing
encoding = {var: {'zlib': True, 'complevel': 5} for var in xr_wellington[['DEM','seismic','vlm','slr']]}

# Use the compute argument to leverage parallel processing
xr_wellington.to_netcdf('wellington_case_study_combined_data2.nc',encoding=encoding, compute=True)

MemoryError: Unable to allocate 9.06 GiB for an array with shape (4000, 152, 4000) and data type float32

In [None]:
# datamesh.write_datasource(datasource_id="wellington_casestudy_combined_data",
#                           name="Wellington Case Study Combined Data",
#                           description="Wellington Case Study Combined Data",
#                           data=xr_wellington,
#                           coordinates={'x':'x','y':'y','t':'years'},
#                           tstart=xr_wellington.years.values[0],
#                           tend=xr_wellington.years.values[-1],
#                           geom={'type':'array','coordinates':[[
#                               [xr_wellington.x.min(),xr_wellington.y.min()],
#                               [xr_wellington.x.max(),xr_wellington.y.min()],
#                               [xr_wellington.x.min(),xr_wellington.y.max()],
#                               [xr_wellington.x.max(),xr_wellington.y.max()]
#                           ]]},
#                          tags=['demo', 'wellington','lower hutt'],
#                          )