Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
fix cryptic error
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanraba committed Apr 27, 2021
1 parent 3b29833 commit 8fd2452
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions cngi/dio/write_vis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
1. zarr.consolidate_metadata(outfile) is very slow for a zarr group (datatset) with many chunks (there is a python for loop that checks each file). We might have to implement our own version. This is also important for cngi.dio.append_zarr
'''

def write_vis(mxds, outfile, chunks_on_disk={}, partition=None, consolidated=True, compressor=None, graph_name='write_zarr'):
def write_vis(mxds, outfile, chunks_on_disk=None, partition=None, consolidated=True, compressor=None, graph_name='write_zarr'):
"""
Write xarray dataset to zarr format on disk. When chunks_on_disk is not specified the chunking in the input dataset is used.
When chunks_on_disk is specified that dataset is saved using that chunking.
Parameters
----------
mxds : xarray.core.dataset.Dataset
Expand All @@ -45,6 +46,7 @@ def write_vis(mxds, outfile, chunks_on_disk={}, partition=None, consolidated=Tru
graph_name : string
The time taken to execute the graph and save the dataset is measured and saved as an attribute in the zarr file.
The graph_name is the label for this timing information.
Returns
-------
"""
Expand All @@ -58,8 +60,7 @@ def write_vis(mxds, outfile, chunks_on_disk={}, partition=None, consolidated=Tru

if compressor is None:
compressor = Blosc(cname='zstd', clevel=2, shuffle=0)



if partition is None:
partition = list(mxds.attrs.keys())
partition = list(np.atleast_1d(partition))
Expand All @@ -70,31 +71,26 @@ def write_vis(mxds, outfile, chunks_on_disk={}, partition=None, consolidated=Tru
for xds_name in partition:
if "xds" in xds_name:
xds_outfile = outfile + '/' + xds_name
#Check if disk chunking is specified
if bool(chunks_on_disk):
xds_for_disk = mxds.attrs[xds_name].chunk(chunks=chunks_on_disk)
else:
xds_for_disk = mxds.attrs[xds_name]
xds_for_disk = mxds.attrs[xds_name]
if chunks_on_disk is not None:
xds_for_disk = xds_for_disk.chunk(chunks=chunks_on_disk)
else:
xds_outfile = outfile + '/global/' + xds_name
xds_for_disk = mxds.attrs[xds_name]

try:
#Create compression encoding for each datavariable
encoding = dict(zip(list(xds_for_disk.data_vars), cycle([{'compressor': compressor}])))
start = time.time()
#Consolidated is set to False so that the timing information is included in the consolidate metadata.
xr.Dataset.to_zarr(xds_for_disk, store=xds_outfile, mode='w', encoding=encoding,consolidated=False)
time_to_calc_and_store = time.time() - start
print('Time to store and execute graph for ', xds_name, graph_name, time_to_calc_and_store)
#Add timing information
dataset_group = zarr.open_group(xds_outfile,mode='a')
dataset_group.attrs[graph_name+'_time'] = time_to_calc_and_store
# Create compression encoding for each datavariable
encoding = dict(zip(list(xds_for_disk.data_vars), cycle([{'compressor': compressor}])))
start = time.time()

# Consolidated is set to False so that the timing information is included in the consolidate metadata.
xr.Dataset.to_zarr(xds_for_disk, store=xds_outfile, mode='w', encoding=encoding,consolidated=False)
time_to_calc_and_store = time.time() - start
print('Time to store and execute graph for ', xds_name, graph_name, time_to_calc_and_store)

#Add timing information
dataset_group = zarr.open_group(xds_outfile, mode='a')
dataset_group.attrs[graph_name+'_time'] = time_to_calc_and_store

if consolidated == True:
#Consolidate metadata
zarr.consolidate_metadata(xds_outfile)
except:
print('Could not save ', xds_name)
if consolidated == True:
zarr.consolidate_metadata(xds_outfile)

0 comments on commit 8fd2452

Please sign in to comment.