Skip to content

Commit

Permalink
Enable collective MPI IO. (#623)
Browse files Browse the repository at this point in the history
Restructured version of #621. Which added the ability to specify a transfer property to read and write operations. This is needed to be able to request collective IO.

Co-authored-by: Rob Latham <rlatham@gmail.com>
  • Loading branch information
1uc and roblatham00 committed Oct 28, 2022
1 parent f3c95a8 commit 1df7ad0
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 36 deletions.
13 changes: 13 additions & 0 deletions include/highfive/H5PropertyList.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,19 @@ class CreateIntermediateGroup {
const bool _create;
};

#ifdef H5_HAVE_PARALLEL
class UseCollectiveIO {
public:
explicit UseCollectiveIO(bool enable = true)
: _enable(enable) {}

private:
friend DataTransferProps;
void apply(hid_t hid) const;
bool _enable;
};
#endif

} // namespace HighFive

#include "bits/H5PropertyList_misc.hpp"
Expand Down
8 changes: 8 additions & 0 deletions include/highfive/bits/H5PropertyList_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ inline void CreateIntermediateGroup::apply(const hid_t hid) const {
}
}

#ifdef H5_HAVE_PARALLEL
inline void UseCollectiveIO::apply(const hid_t hid) const {
if (H5Pset_dxpl_mpio(hid, _enable ? H5FD_MPIO_COLLECTIVE : H5FD_MPIO_INDEPENDENT) < 0) {
HDF5ErrMapper::ToException<PropertyException>("Error setting H5Pset_dxpl_mpio.");
}
}
#endif

} // namespace HighFive

#endif // H5PROPERTY_LIST_HPP
17 changes: 12 additions & 5 deletions include/highfive/bits/H5Slice_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "H5_definitions.hpp"
#include "H5Utils.hpp"

#include "../H5PropertyList.hpp"

namespace HighFive {

class ElementSet {
Expand Down Expand Up @@ -282,7 +284,7 @@ class SliceTraits {
Selection select(const ElementSet& elements) const;

template <typename T>
T read() const;
T read(const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Read the entire dataset into a buffer
Expand All @@ -294,7 +296,7 @@ class SliceTraits {
/// responsibility to ensure that the right amount of space has been
/// allocated.
template <typename T>
void read(T& array) const;
void read(T& array, const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Read the entire dataset into a raw buffer
Expand All @@ -305,7 +307,9 @@ class SliceTraits {
/// \param array: A buffer containing enough space for the data
/// \param dtype: The type of the data, in case it cannot be automatically guessed
template <typename T>
void read(T* array, const DataType& dtype = DataType()) const;
void read(T* array,
const DataType& dtype = DataType(),
const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Write the integrality N-dimension buffer to this dataset
Expand All @@ -315,7 +319,7 @@ class SliceTraits {
/// The array type can be a N-pointer or a N-vector ( e.g int** integer two
/// dimensional array )
template <typename T>
void write(const T& buffer);
void write(const T& buffer, const DataTransferProps& xfer_props = DataTransferProps());

///
/// Write from a raw buffer into this dataset
Expand All @@ -326,8 +330,11 @@ class SliceTraits {
/// default conventions.
/// \param buffer: A buffer containing the data to be written
/// \param dtype: The type of the data, in case it cannot be automatically guessed
/// \param xfer_props: The HDF5 data transfer properties, e.g. collective MPI-IO.
template <typename T>
void write_raw(const T* buffer, const DataType& dtype = DataType());
void write_raw(const T* buffer,
const DataType& dtype = DataType(),
const DataTransferProps& xfer_props = DataTransferProps());

protected:
inline Selection select_impl(const HyperSlab& hyperslab, const DataSpace& memspace) const;
Expand Down
28 changes: 16 additions & 12 deletions include/highfive/bits/H5Slice_traits_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ inline Selection SliceTraits<Derivate>::select(const ElementSet& elements) const

template <typename Derivate>
template <typename T>
inline T SliceTraits<Derivate>::read() const {
inline T SliceTraits<Derivate>::read(const DataTransferProps& xfer_props) const {
T array;
read(array);
read(array, xfer_props);
return array;
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::read(T& array) const {
inline void SliceTraits<Derivate>::read(T& array, const DataTransferProps& xfer_props) const {
const auto& slice = static_cast<const Derivate&>(*this);
const DataSpace& mem_space = slice.getMemSpace();
const details::BufferInfo<T> buffer_info(slice.getDataType(), [slice]() -> std::string {
Expand All @@ -184,20 +184,22 @@ inline void SliceTraits<Derivate>::read(T& array) const {
}
auto dims = mem_space.getDimensions();
auto r = details::data_converter::get_reader<T>(dims, array);
read(r.get_pointer(), buffer_info.data_type);
read(r.get_pointer(), buffer_info.data_type, xfer_props);
// re-arrange results
r.unserialize();
auto t = create_datatype<typename details::inspector<T>::base_type>();
auto c = t.getClass();
if (c == DataTypeClass::VarLen) {
(void) H5Dvlen_reclaim(t.getId(), mem_space.getId(), H5P_DEFAULT, r.get_pointer());
(void) H5Dvlen_reclaim(t.getId(), mem_space.getId(), xfer_props.getId(), r.get_pointer());
}
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::read(T* array, const DataType& dtype) const {
inline void SliceTraits<Derivate>::read(T* array,
const DataType& dtype,
const DataTransferProps& xfer_props) const {
static_assert(!std::is_const<T>::value,
"read() requires a non-const structure to read data into");
const auto& slice = static_cast<const Derivate&>(*this);
Expand All @@ -211,16 +213,16 @@ inline void SliceTraits<Derivate>::read(T* array, const DataType& dtype) const {
mem_datatype.getId(),
details::get_memspace_id(slice),
slice.getSpace().getId(),
H5P_DEFAULT,
xfer_props.getId(),
static_cast<void*>(array)) < 0) {
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Read: ");
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Read.");
}
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::write(const T& buffer) {
inline void SliceTraits<Derivate>::write(const T& buffer, const DataTransferProps& xfer_props) {
const auto& slice = static_cast<const Derivate&>(*this);
const DataSpace& mem_space = slice.getMemSpace();
const details::BufferInfo<T> buffer_info(slice.getDataType(), [slice]() -> std::string {
Expand All @@ -234,13 +236,15 @@ inline void SliceTraits<Derivate>::write(const T& buffer) {
throw DataSpaceException(ss.str());
}
auto w = details::data_converter::serialize<T>(buffer);
write_raw(w.get_pointer(), buffer_info.data_type);
write_raw(w.get_pointer(), buffer_info.data_type, xfer_props);
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::write_raw(const T* buffer, const DataType& dtype) {
inline void SliceTraits<Derivate>::write_raw(const T* buffer,
const DataType& dtype,
const DataTransferProps& xfer_props) {
using element_type = typename details::inspector<T>::base_type;
const auto& slice = static_cast<const Derivate&>(*this);
const auto& mem_datatype = dtype.empty() ? create_and_check_datatype<element_type>() : dtype;
Expand All @@ -249,7 +253,7 @@ inline void SliceTraits<Derivate>::write_raw(const T* buffer, const DataType& dt
mem_datatype.getId(),
details::get_memspace_id(slice),
slice.getSpace().getId(),
H5P_DEFAULT,
xfer_props.getId(),
static_cast<const void*>(buffer)) < 0) {
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Write: ");
}
Expand Down
22 changes: 20 additions & 2 deletions src/examples/parallel_hdf5_write_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,28 @@ int main(int argc, char** argv) {

// Each node want to write its own rank two time in
// its associated row
int data[1][2] = {{mpi_rank, mpi_rank}};
double data[1][2] = {{mpi_rank * 1.0, mpi_rank * 2.0}};

auto xfer_props = DataTransferProps{};
xfer_props.add(UseCollectiveIO{});

// write it to the associated mpi_rank
dataset.select({std::size_t(mpi_rank), 0}, {1, 2}).write(data);
dataset.select({std::size_t(mpi_rank), 0}, {1, 2}).write(data, xfer_props);

// Currently, HighFive doesn't wrap retrieving information from property lists.
// Therefore, one needs to use HDF5 directly. For example, so see if and why
// collective MPI-IO operations were used, one may:
uint32_t local_cause = 0, global_cause = 0;
auto err = H5Pget_mpio_no_collective_cause(xfer_props.getId(), &local_cause, &global_cause);
if (err < 0) {
throw std::runtime_error("Failed to check mpio_no_collective_cause.");
}
if (local_cause || global_cause) {
std::cout
<< "The operation was successful, but couldn't use collective MPI-IO. local cause: "
<< local_cause << " global cause:" << global_cause << std::endl;
}


} catch (Exception& err) {
// catch and print any HDF5 error
Expand Down
68 changes: 51 additions & 17 deletions tests/unit/tests_high_five_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ struct MpiFixture {
int size;
};

void check_was_collective(const DataTransferProps& xfer_props) {
uint32_t local_cause = 0, global_cause = 0;
if (H5Pget_mpio_no_collective_cause(xfer_props.getId(), &local_cause, &global_cause) < 0) {
throw std::runtime_error("Failed to check mpio_no_collective_cause.");
}
CHECK(local_cause == 0);
CHECK(global_cause == 0);
}

template <typename T>
void selectionArraySimpleTestParallel() {
int mpi_rank, mpi_size;
Expand All @@ -54,7 +63,8 @@ void selectionArraySimpleTestParallel() {
const auto offset_x = static_cast<size_t>(mpi_rank);
const auto count_x = static_cast<size_t>(mpi_size - mpi_rank);

const std::string DATASET_NAME("dset");
const std::string d1_name("dset1");
const std::string d2_name("dset2");

Vector values(size_x);

Expand All @@ -66,31 +76,55 @@ void selectionArraySimpleTestParallel() {
fapl.add(MPIOFileAccess(MPI_COMM_WORLD, MPI_INFO_NULL));
File file(filename.str(), File::ReadWrite | File::Create | File::Truncate, fapl);

DataSet dataset = file.createDataSet<T>(DATASET_NAME, DataSpace::From(values));
DataSet d1 = file.createDataSet<T>(d1_name, DataSpace::From(values));
if (mpi_rank == 0) {
d1.write(values);
}

DataSet d2 = file.createDataSet<T>(d2_name, DataSpace::From(values));
auto xfer_props = DataTransferProps{};
xfer_props.add(UseCollectiveIO{});

{
auto offset = std::vector<size_t>{static_cast<size_t>(mpi_rank)};
auto count = std::vector<size_t>{1ul};
auto slice = d2.select(offset, count);

dataset.write(values);
auto local_values = Vector(count[0]);
local_values[0] = values[offset[0]];

// Write collectively, each MPI rank writes one slab.
slice.write(local_values, xfer_props);
check_was_collective(xfer_props);
}

file.flush();

// read it back
Vector result;
std::vector<size_t> offset;
offset.push_back(offset_x);
std::vector<size_t> size;
size.push_back(count_x);
// -- read it back

auto check_result = [&values, offset_x, count_x](const Vector& result) {
CHECK(result.size() == count_x);

Selection slice = dataset.select(offset, size);
for (size_t i = offset_x; i < count_x; ++i) {
CHECK(values[i + offset_x] == result[i]);
}
};

CHECK(slice.getSpace().getDimensions()[0] == size_x);
CHECK(slice.getMemSpace().getDimensions()[0] == count_x);
auto make_slice = [size_x, offset_x, count_x](DataSet& dataset) {
auto slice = dataset.select(std::vector<size_t>{offset_x}, std::vector<size_t>{count_x});

slice.read(result);
CHECK(slice.getSpace().getDimensions()[0] == size_x);
CHECK(slice.getMemSpace().getDimensions()[0] == count_x);

CHECK(result.size() == count_x);
return slice;
};

for (size_t i = offset_x; i < count_x; ++i) {
CHECK(values[i + offset_x] == result[i]);
}
auto s1 = make_slice(d1);
check_result(s1.template read<Vector>());

auto s2 = make_slice(d2);
check_result(s2.template read<Vector>(xfer_props));
check_was_collective(xfer_props);
}

TEMPLATE_LIST_TEST_CASE("mpiSelectionArraySimple", "[template]", numerical_test_types) {
Expand Down

0 comments on commit 1df7ad0

Please sign in to comment.