diff --git a/include/nc4internal.h b/include/nc4internal.h index 63fd6652cd..1de0f416a8 100644 --- a/include/nc4internal.h +++ b/include/nc4internal.h @@ -254,6 +254,10 @@ typedef struct NC_HDF5_FILE_INFO { NC* controller; hid_t hdfid; +#ifdef USE_PARALLEL + MPI_Comm comm; /* Copy of MPI Communicator used to open the file */ + MPI_Info info; /* Copy of MPI Information Object used to open the file */ +#endif int flags; int cmode; int nvars; diff --git a/include/netcdf.h b/include/netcdf.h index 385b1e3167..98aad527ad 100644 --- a/include/netcdf.h +++ b/include/netcdf.h @@ -379,8 +379,9 @@ by the desired type. */ #define NC_ENOTBUILT (-128) /**< Attempt to use feature that was not turned on when netCDF was built. */ #define NC_EDISKLESS (-129) /**< Error in using diskless access. */ #define NC_ECANTEXTEND (-130) /**< Attempt to extend dataset during ind. I/O operation. */ +#define NC_EMPI (-131) /**< MPI operation failed. */ -#define NC4_LAST_ERROR (-130) +#define NC4_LAST_ERROR (-131) /* This is used in netCDF-4 files for dimensions without coordinate * vars. */ diff --git a/libsrc4/nc4file.c b/libsrc4/nc4file.c index e792b558f7..2d9388393a 100644 --- a/libsrc4/nc4file.c +++ b/libsrc4/nc4file.c @@ -221,7 +221,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info, FILE *fp; int retval = NC_NOERR; NC_HDF5_FILE_INFO_T* nc4_info = NULL; -#ifndef USE_PARALLEL +#ifdef USE_PARALLEL + int comm_duped = 0; /* Whether the MPI Communicator was duplicated */ + int info_duped = 0; /* Whether the MPI Info object was duplicated */ +#else /* !USE_PARALLEL */ int persist = 0; /* Should diskless try to persist its data into file?*/ #endif @@ -294,6 +297,22 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info, if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0) BAIL(NC_EPARINIT); } + + /* Keep copies of the MPI Comm & Info objects */ + if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm)) + BAIL(NC_EMPI); + comm_duped++; + if (MPI_INFO_NULL != info) + { + if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info)) + BAIL(NC_EMPI); + info_duped++; + } + else + { + /* No dup, just copy it. */ + nc4_info->info = info; + } } #else /* only set cache for non-parallel... */ if(cmode & NC_DISKLESS) { @@ -353,6 +372,10 @@ nc4_create_file(const char *path, int cmode, MPI_Comm comm, MPI_Info info, return NC_NOERR; exit: /*failure exit*/ +#ifdef USE_PARALLEL + if (comm_duped) MPI_Comm_free(&nc4_info->comm); + if (info_duped) MPI_Info_free(&nc4_info->info); +#endif #ifdef EXTRA_TESTS num_plists--; #endif @@ -2302,6 +2325,10 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm, H5F_ACC_RDWR : H5F_ACC_RDONLY; int retval; NC_HDF5_FILE_INFO_T* nc4_info = NULL; +#ifdef USE_PARALLEL + int comm_duped = 0; /* Whether the MPI Communicator was duplicated */ + int info_duped = 0; /* Whether the MPI Info object was duplicated */ +#endif /* !USE_PARALLEL */ LOG((3, "nc4_open_file: path %s mode %d", path, mode)); assert(path && nc); @@ -2349,6 +2376,22 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm, if (H5Pset_fapl_mpiposix(fapl_id, comm, 0) < 0) BAIL(NC_EPARINIT); } + + /* Keep copies of the MPI Comm & Info objects */ + if (MPI_SUCCESS != MPI_Comm_dup(comm, &nc4_info->comm)) + BAIL(NC_EMPI); + comm_duped++; + if (MPI_INFO_NULL != info) + { + if (MPI_SUCCESS != MPI_Info_dup(info, &nc4_info->info)) + BAIL(NC_EMPI); + info_duped++; + } + else + { + /* No dup, just copy it. */ + nc4_info->info = info; + } } #else /* only set cache for non-parallel. */ if (H5Pset_cache(fapl_id, 0, nc4_chunk_cache_nelems, nc4_chunk_cache_size, @@ -2397,11 +2440,15 @@ nc4_open_file(const char *path, int mode, MPI_Comm comm, return NC_NOERR; - exit: - if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id); +exit: +#ifdef USE_PARALLEL + if (comm_duped) MPI_Comm_free(&nc4_info->comm); + if (info_duped) MPI_Info_free(&nc4_info->info); +#endif #ifdef EXTRA_TESTS num_plists--; #endif + if (fapl_id != H5P_DEFAULT) H5Pclose(fapl_id); if (!nc4_info) return retval; close_netcdf4_file(nc4_info,1); /* treat like abort*/ #if 0 @@ -3115,6 +3162,15 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort) } else { +#ifdef USE_PARALLEL + /* Free the MPI Comm & Info objects, if we opened the file in parallel */ + if(h5->parallel) + { + MPI_Comm_free(&h5->comm); + if(MPI_INFO_NULL != h5->info) + MPI_Info_free(&h5->info); + } +#endif if (H5Fclose(h5->hdfid) < 0) { int nobjs; @@ -3133,10 +3189,6 @@ close_netcdf4_file(NC_HDF5_FILE_INFO_T *h5, int abort) retval = NC_EHDFERR; goto done; } } -#if 0 - if (H5garbage_collect() < 0) - {retval = NC_EHDFERR; goto done; -#endif } done: diff --git a/libsrc4/nc4hdf.c b/libsrc4/nc4hdf.c index 439f3783fc..1d59df1fb4 100644 --- a/libsrc4/nc4hdf.c +++ b/libsrc4/nc4hdf.c @@ -572,7 +572,7 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp, hid_t file_spaceid = 0, mem_spaceid = 0, xfer_plistid = 0; size_t file_type_size; - hsize_t *xtend_size = NULL, count[NC_MAX_VAR_DIMS]; + hsize_t xtend_size[NC_MAX_VAR_DIMS] , count[NC_MAX_VAR_DIMS]; hsize_t fdims[NC_MAX_VAR_DIMS], fmaxdims[NC_MAX_VAR_DIMS]; hsize_t start[NC_MAX_VAR_DIMS]; int need_to_extend = 0; @@ -748,8 +748,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp, it to that size. */ if (var->ndims) { - if (!(xtend_size = malloc(var->ndims * sizeof(hsize_t)))) - BAIL(NC_ENOMEM); for (d2 = 0; d2 < var->ndims; d2++) { if ((retval = nc4_find_dim(grp, var->dimids[d2], &dim, NULL))) @@ -776,14 +774,37 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp, } } +#ifdef USE_PARALLEL + /* Check if anyone wants to extend */ + if (h5->parallel && NC_COLLECTIVE == var->parallel_access) + { + /* Form consensus opinion among all processes about whether to perform + * collective I/O + */ + if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &need_to_extend, 1, MPI_INT, MPI_BOR, h5->comm)) + BAIL(NC_EMPI); + } +#endif /* USE_PARALLEL */ + /* If we need to extend it, we also need a new file_spaceid to reflect the new size of the space. */ if (need_to_extend) { LOG((4, "extending dataset")); #ifdef USE_PARALLEL - if (h5->parallel && NC_COLLECTIVE != var->parallel_access) - BAIL(NC_ECANTEXTEND); + if (h5->parallel) + { + if(NC_COLLECTIVE != var->parallel_access) + BAIL(NC_ECANTEXTEND); + + /* Reach consensus about dimension sizes to extend to */ + /* (Note: Somewhat hackish, with the use of MPI_BYTE, but MPI_MAX is + * correct with this usage, as long as it's not executed on + * heterogenous systems) + */ + if(MPI_SUCCESS != MPI_Allreduce(MPI_IN_PLACE, &xtend_size, (var->ndims * sizeof(hsize_t)), MPI_BYTE, MPI_MAX, h5->comm)) + BAIL(NC_EMPI); + } #endif /* USE_PARALLEL */ if (H5Dset_extent(var->hdf_datasetid, xtend_size) < 0) BAIL(NC_EHDFERR); @@ -850,7 +871,6 @@ nc4_put_vara(NC *nc, int ncid, int varid, const size_t *startp, #ifndef HDF5_CONVERT if (need_to_convert) free(bufr); #endif - if (xtend_size) free(xtend_size); /* If there was an error return it, otherwise return any potential range error value. If none, return NC_NOERR as usual.*/ @@ -2415,7 +2435,7 @@ write_dim(NC_DIM_INFO_T *dim, NC_GRP_INFO_T *grp, int write_dimid) } } } - if (H5Dextend(v1->hdf_datasetid, new_size) < 0) + if (H5Dset_extent(v1->hdf_datasetid, new_size) < 0) BAIL(NC_EHDFERR); free(new_size); } diff --git a/nc_test4/tst_parallel3.c b/nc_test4/tst_parallel3.c index 703d408f7e..2adc3d1abd 100644 --- a/nc_test4/tst_parallel3.c +++ b/nc_test4/tst_parallel3.c @@ -74,7 +74,7 @@ int main(int argc, char **argv) MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); - if (mpi_rank == 1) + if (mpi_rank == 0) printf("\n*** Testing more advanced parallel access.\n"); for (i = 0; i < 16; i++){ @@ -101,56 +101,68 @@ int main(int argc, char **argv) sprintf(file_name, "%s/%s", TEMP_LARGE, FILE_NAME); /* Test NetCDF4 with MPI-IO driver */ - if (mpi_rank == 1) + if (mpi_rank == 0) printf("*** Testing parallel IO for raw-data with MPI-IO (driver)..."); if(test_pio(NC_INDEPENDENT)!=0) ERR; if(test_pio(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) SUMMARIZE_ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) printf("*** Testing parallel IO for meta-data with MPI-IO (driver)..."); if(test_pio_attr(NC_INDEPENDENT)!=0) ERR; if(test_pio_attr(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) SUMMARIZE_ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) printf("*** Testing parallel IO for different hyperslab selections with MPI-IO (driver)..."); if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR; if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) SUMMARIZE_ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) + printf("*** Testing parallel IO for extending variables with MPI-IO (driver)..."); + if(test_pio_extend(NC_COLLECTIVE)!=0) ERR; + if (mpi_rank == 0) + SUMMARIZE_ERR; + + if (mpi_rank == 0) printf("*** Testing parallel IO for raw-data with MPIPOSIX-IO (driver)..."); facc_type = NC_NETCDF4|NC_MPIPOSIX; facc_type_open = NC_MPIPOSIX; if(test_pio(NC_INDEPENDENT)!=0) ERR; if(test_pio(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) SUMMARIZE_ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) printf("*** Testing parallel IO for meta-data with MPIPOSIX-IO (driver)..."); if(test_pio_attr(NC_INDEPENDENT)!=0) ERR; if(test_pio_attr(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) SUMMARIZE_ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) printf("*** Testing parallel IO for different hyperslab selections " "with MPIPOSIX-IO (driver)..."); if(test_pio_hyper(NC_INDEPENDENT)!=0)ERR; if(test_pio_hyper(NC_COLLECTIVE)!=0) ERR; - if (mpi_rank == 1) + if (mpi_rank == 0) + SUMMARIZE_ERR; + + if (mpi_rank == 0) + printf("*** Testing parallel IO for extending variables with MPIPOSIX-IO (driver)..."); + if(test_pio_extend(NC_COLLECTIVE)!=0) ERR; + if (mpi_rank == 0) SUMMARIZE_ERR; /* if(!getenv_all(MPI_COMM_WORLD,0,"NETCDF4_NOCLEANUP")) */ remove(file_name); MPI_Finalize(); - if (mpi_rank == 1) + if (mpi_rank == 0) FINAL_RESULTS; return 0; } @@ -686,6 +698,52 @@ int test_pio_hyper(int flag){ return 0; } +/* test extending variables */ +int test_pio_extend(int flag){ + int rank, procs; + int ncFile; + int ncDimPart; + int ncDimVrtx; + int ncVarVrtx; + int dimsVrtx[2]; + size_t start[2]; + size_t count[2]; + int vertices[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &procs); + + /* Create netcdf file */ + if (nc_create_par("test.nc", NC_NETCDF4 | NC_MPIIO, MPI_COMM_WORLD, MPI_INFO_NULL, &ncFile)) ERR; + + /* Create netcdf dimensions */ + if (nc_def_dim(ncFile, "partitions", procs, &ncDimPart)) ERR; + if (nc_def_dim(ncFile, "vertices", NC_UNLIMITED, &ncDimVrtx)) ERR; + + /* Create netcdf variables */ + dimsVrtx[0] = ncDimPart; + dimsVrtx[1] = ncDimVrtx; + if (nc_def_var(ncFile, "vertex", NC_INT, 2, dimsVrtx, &ncVarVrtx)) ERR; + + /* Start writing data */ + if (nc_enddef(ncFile)) ERR; + + /* Set access mode */ + if (nc_var_par_access(ncFile, ncVarVrtx, flag)) ERR; + + /* Write vertices */ + start[0] = rank; + start[1] = 0; + count[0] = 1; + count[1] = rank; + if (nc_put_vara_int(ncFile, ncVarVrtx, start, count, vertices)) ERR; + + /* Close netcdf file */ + if (nc_close(ncFile)) ERR; + + return 0; +} + /*------------------------------------------------------------------------- * Function: getenv_all *