Skip to content

Commit

Permalink
Correct error when a parallel application writes different amounts of…
Browse files Browse the repository at this point in the history
… data to

an unlimited-dimension variable, and different processes don't agree on the
whether to extend the underlying HDF5 dataset, or don't agree on the amount
to extend the dataset.
  • Loading branch information
qkoziol committed Aug 19, 2013
1 parent dae5e2f commit 3cdce9e
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 29 deletions.
4 changes: 4 additions & 0 deletions include/nc4internal.h
Expand Up @@ -254,6 +254,10 @@ typedef struct NC_HDF5_FILE_INFO
{
NC* controller;
hid_t hdfid;
#ifdef USE_PARALLEL
MPI_Comm comm; /* Copy of MPI Communicator used to open the file */
MPI_Info info; /* Copy of MPI Information Object used to open the file */
#endif
int flags;
int cmode;
int nvars;
Expand Down
3 changes: 2 additions & 1 deletion include/netcdf.h
Expand Up @@ -379,8 +379,9 @@ by the desired type. */
#define NC_ENOTBUILT (-128) /**< Attempt to use feature that was not turned on when netCDF was built. */
#define NC_EDISKLESS (-129) /**< Error in using diskless access. */
#define NC_ECANTEXTEND (-130) /**< Attempt to extend dataset during ind. I/O operation. */
#define NC_EMPI (-131) /**< MPI operation failed. */

#define NC4_LAST_ERROR (-130)
#define NC4_LAST_ERROR (-131)

/* This is used in netCDF-4 files for dimensions without coordinate
* vars. */
Expand Down
66 changes: 59 additions & 7 deletions libsrc4/nc4file.c
Expand Up @@ -221,7 +221,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
FILE *fp;
int retval = NC_NOERR;
NC_HDF5_FILE_INFO_T* nc4_info = NULL;
#ifndef USE_PARALLEL
#ifdef USE_PARALLEL
int comm_duped = 0; /* Whether the MPI Communicator was duplicated */
int info_duped = 0; /* Whether the MPI Info object was duplicated */
#else /* !USE_PARALLEL */
int persist = 0; /* Should diskless try to persist its data into file?*/
#endif

Expand Down Expand Up @@ -294,6 +297,22 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0)
BAIL(NC_EPARINIT);
}

/* Keep copies of the MPI Comm & Info objects */
if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm))
BAIL(NC_EMPI);
comm_duped++;
if (MPI_INFO_NULL != info)
{
if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info))
BAIL(NC_EMPI);
info_duped++;
}
else
{
/* No dup, just copy it. */
nc4_info->info = info;
}
}
#else /* only set cache for non-parallel... */
if(cmode & NC_DISKLESS) {
Expand Down Expand Up @@ -353,6 +372,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info,
return NC_NOERR;

exit: /*failure exit*/
#ifdef USE_PARALLEL
if (comm_duped) MPI_Comm_free(&nc4_info->comm);
if (info_duped) MPI_Info_free(&nc4_info->info);
#endif
#ifdef EXTRA_TESTS
num_plists--;
#endif
Expand Down Expand Up @@ -2302,6 +2325,10 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,
H5F_ACC_RDWR : H5F_ACC_RDONLY;
int retval;
NC_HDF5_FILE_INFO_T* nc4_info = NULL;
#ifdef USE_PARALLEL
int comm_duped = 0; /* Whether the MPI Communicator was duplicated */
int info_duped = 0; /* Whether the MPI Info object was duplicated */
#endif /* !USE_PARALLEL */

LOG((3, "nc4_open_file: path %s mode %d", path, mode));
assert(path && nc);
Expand Down Expand Up @@ -2349,6 +2376,22 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,
if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0)
BAIL(NC_EPARINIT);
}

/* Keep copies of the MPI Comm & Info objects */
if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm))
BAIL(NC_EMPI);
comm_duped++;
if (MPI_INFO_NULL != info)
{
if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info))
BAIL(NC_EMPI);
info_duped++;
}
else
{
/* No dup, just copy it. */
nc4_info->info = info;
}
}
#else /* only set cache for non-parallel. */
if (H5Pset_cache(fapl_id, 0, nc4_chunk_cache_nelems, nc4_chunk_cache_size,
Expand Down Expand Up @@ -2397,11 +2440,15 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm,

return NC_NOERR;

exit:
if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id);
exit:
#ifdef USE_PARALLEL
if (comm_duped) MPI_Comm_free(&nc4_info->comm);
if (info_duped) MPI_Info_free(&nc4_info->info);
#endif
#ifdef EXTRA_TESTS
num_plists--;
#endif
if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id);
if (!nc4_info) return retval;
close_netcdf4_file(nc4_info,1); /* treat like abort*/
#if 0
Expand Down Expand Up @@ -3115,6 +3162,15 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort)
}
else
{
#ifdef USE_PARALLEL
/* Free the MPI Comm & Info objects, if we opened the file in parallel */
if(h5->parallel)
{
MPI_Comm_free(&h5->comm);
if(MPI_INFO_NULL != h5->info)
MPI_Info_free(&h5->info);
}
#endif
if (H5Fclose(h5->hdfid) < 0)
{
int nobjs;
Expand All @@ -3133,10 +3189,6 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort)
retval = NC_EHDFERR; goto done;
}
}
#if 0
if (H5garbage_collect() < 0)
{retval = NC_EHDFERR; goto done;
#endif
}

done:
Expand Down
34 changes: 27 additions & 7 deletions libsrc4/nc4hdf.c
Expand Up @@ -572,7 +572,7 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
hid_t file_spaceid = 0, mem_spaceid = 0, xfer_plistid = 0;
size_t file_type_size;

hsize_t *xtend_size = NULL, count[NC_MAX_VAR_DIMS];
hsize_t xtend_size[NC_MAX_VAR_DIMS] , count[NC_MAX_VAR_DIMS];
hsize_t fdims[NC_MAX_VAR_DIMS], fmaxdims[NC_MAX_VAR_DIMS];
hsize_t start[NC_MAX_VAR_DIMS];
int need_to_extend = 0;
Expand Down Expand Up @@ -748,8 +748,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
it to that size. */
if (var->ndims)
{
if (!(xtend_size = malloc(var->ndims * sizeof(hsize_t))))
BAIL(NC_ENOMEM);
for (d2 = 0; d2 < var->ndims; d2++)
{
if ((retval = nc4_find_dim(grp, var->dimids[d2], &dim, NULL)))
Expand All @@ -776,14 +774,37 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
}
}

#ifdef USE_PARALLEL
/* Check if anyone wants to extend */
if (h5->parallel && NC_COLLECTIVE == var->parallel_access)
{
/* Form consensus opinion among all processes about whether to perform
* collective I/O
*/
if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &need_to_extend, 1, MPI_INT, MPI_BOR, h5->comm))
BAIL(NC_EMPI);
}
#endif /* USE_PARALLEL */

/* If we need to extend it, we also need a new file_spaceid
to reflect the new size of the space. */
if (need_to_extend)
{
LOG((4, "extending dataset"));
#ifdef USE_PARALLEL
if (h5->parallel && NC_COLLECTIVE != var->parallel_access)
BAIL(NC_ECANTEXTEND);
if (h5->parallel)
{
if(NC_COLLECTIVE != var->parallel_access)
BAIL(NC_ECANTEXTEND);

/* Reach consensus about dimension sizes to extend to */
/* (Note: Somewhat hackish, with the use of MPI_BYTE, but MPI_MAX is
* correct with this usage, as long as it's not executed on
* heterogenous systems)
*/
if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &xtend_size, (var->ndims * sizeof(hsize_t)), MPI_BYTE, MPI_MAX, h5->comm))
BAIL(NC_EMPI);
}
#endif /* USE_PARALLEL */
if (H5Dset_extent(var->hdf_datasetid, xtend_size) < 0)
BAIL(NC_EHDFERR);
Expand Down Expand Up @@ -850,7 +871,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp,
#ifndef HDF5_CONVERT
if (need_to_convert) free(bufr);
#endif
if (xtend_size) free(xtend_size);

/* If there was an error return it, otherwise return any potential
range error value. If none, return NC_NOERR as usual.*/
Expand Down Expand Up @@ -2415,7 +2435,7 @@ write_dim(NC_DIM_INFO_T *dim, NC_GRP_INFO_T *grp, int write_dimid)
}
}
}
if (H5Dextend(v1->hdf_datasetid, new_size) < 0)
if (H5Dset_extent(v1->hdf_datasetid, new_size) < 0)
BAIL(NC_EHDFERR);
free(new_size);
}
Expand Down
86 changes: 72 additions & 14 deletions nc_test4/tst_parallel3.c
Expand Up @@ -74,7 +74,7 @@ int main(int argc, char **argv)
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("\n*** Testing more advanced parallel access.\n");

for (i = 0; i < 16; i++){
Expand All @@ -101,56 +101,68 @@ int main(int argc, char **argv)
sprintf(file_name, "%s/%s", TEMP_LARGE, FILE_NAME);

/* Test NetCDF4 with MPI-IO driver */
if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for raw-data with MPI-IO (driver)...");
if(test_pio(NC_INDEPENDENT)!=0) ERR;
if(test_pio(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for meta-data with MPI-IO (driver)...");
if(test_pio_attr(NC_INDEPENDENT)!=0) ERR;
if(test_pio_attr(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for different hyperslab selections with MPI-IO (driver)...");
if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR;
if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for extending variables with MPI-IO (driver)...");
if(test_pio_extend(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 0)
printf("*** Testing parallel IO for raw-data with MPIPOSIX-IO (driver)...");
facc_type = NC_NETCDF4|NC_MPIPOSIX;
facc_type_open = NC_MPIPOSIX;
if(test_pio(NC_INDEPENDENT)!=0) ERR;
if(test_pio(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for meta-data with MPIPOSIX-IO (driver)...");
if(test_pio_attr(NC_INDEPENDENT)!=0) ERR;
if(test_pio_attr(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 1)
if (mpi_rank == 0)
printf("*** Testing parallel IO for different hyperslab selections "
"with MPIPOSIX-IO (driver)...");
if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR;
if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 1)
if (mpi_rank == 0)
SUMMARIZE_ERR;

if (mpi_rank == 0)
printf("*** Testing parallel IO for extending variables with MPIPOSIX-IO (driver)...");
if(test_pio_extend(NC_COLLECTIVE)!=0) ERR;
if (mpi_rank == 0)
SUMMARIZE_ERR;

/* if(!getenv_all(MPI_COMM_WORLD,0,"NETCDF4_NOCLEANUP")) */
remove(file_name);
MPI_Finalize();

if (mpi_rank == 1)
if (mpi_rank == 0)
FINAL_RESULTS;
return 0;
}
Expand Down Expand Up @@ -686,6 +698,52 @@ int test_pio_hyper(int flag){
return 0;
}

/* test extending variables */
int test_pio_extend(int flag){
int rank, procs;
int ncFile;
int ncDimPart;
int ncDimVrtx;
int ncVarVrtx;
int dimsVrtx[2];
size_t start[2];
size_t count[2];
int vertices[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &procs);

/* Create netcdf file */
if (nc_create_par("test.nc", NC_NETCDF4 | NC_MPIIO, MPI_COMM_WORLD, MPI_INFO_NULL, &ncFile)) ERR;

/* Create netcdf dimensions */
if (nc_def_dim(ncFile, "partitions", procs, &ncDimPart)) ERR;
if (nc_def_dim(ncFile, "vertices", NC_UNLIMITED, &ncDimVrtx)) ERR;

/* Create netcdf variables */
dimsVrtx[0] = ncDimPart;
dimsVrtx[1] = ncDimVrtx;
if (nc_def_var(ncFile, "vertex", NC_INT, 2, dimsVrtx, &ncVarVrtx)) ERR;

/* Start writing data */
if (nc_enddef(ncFile)) ERR;

/* Set access mode */
if (nc_var_par_access(ncFile, ncVarVrtx, flag)) ERR;

/* Write vertices */
start[0] = rank;
start[1] = 0;
count[0] = 1;
count[1] = rank;
if (nc_put_vara_int(ncFile, ncVarVrtx, start, count, vertices)) ERR;

/* Close netcdf file */
if (nc_close(ncFile)) ERR;

return 0;
}

/*-------------------------------------------------------------------------
* Function: getenv_all
*
Expand Down

0 comments on commit 3cdce9e

Please sign in to comment.