Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable collective MPI IO. #623

Merged
merged 7 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/highfive/H5PropertyList.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,19 @@ class CreateIntermediateGroup {
const bool _create;
};

#ifdef H5_HAVE_PARALLEL
class UseCollectiveIO {
public:
explicit UseCollectiveIO(bool enable = true)
: _enable(enable) {}

private:
friend DataTransferProps;
void apply(hid_t hid) const;
bool _enable;
};
#endif

} // namespace HighFive

#include "bits/H5PropertyList_misc.hpp"
Expand Down
8 changes: 8 additions & 0 deletions include/highfive/bits/H5PropertyList_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ inline void CreateIntermediateGroup::apply(const hid_t hid) const {
}
}

#ifdef H5_HAVE_PARALLEL
inline void UseCollectiveIO::apply(const hid_t hid) const {
if (H5Pset_dxpl_mpio(hid, _enable ? H5FD_MPIO_COLLECTIVE : H5FD_MPIO_INDEPENDENT) < 0) {
HDF5ErrMapper::ToException<PropertyException>("Error setting H5Pset_dxpl_mpio.");
}
}
#endif

} // namespace HighFive

#endif // H5PROPERTY_LIST_HPP
17 changes: 12 additions & 5 deletions include/highfive/bits/H5Slice_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "H5_definitions.hpp"
#include "H5Utils.hpp"

#include "../H5PropertyList.hpp"

namespace HighFive {

class ElementSet {
Expand Down Expand Up @@ -282,7 +284,7 @@ class SliceTraits {
Selection select(const ElementSet& elements) const;

template <typename T>
T read() const;
T read(const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Read the entire dataset into a buffer
Expand All @@ -294,7 +296,7 @@ class SliceTraits {
/// responsibility to ensure that the right amount of space has been
/// allocated.
template <typename T>
void read(T& array) const;
void read(T& array, const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Read the entire dataset into a raw buffer
Expand All @@ -305,7 +307,9 @@ class SliceTraits {
/// \param array: A buffer containing enough space for the data
/// \param dtype: The type of the data, in case it cannot be automatically guessed
template <typename T>
void read(T* array, const DataType& dtype = DataType()) const;
void read(T* array,
const DataType& dtype = DataType(),
const DataTransferProps& xfer_props = DataTransferProps()) const;

///
/// Write the integrality N-dimension buffer to this dataset
Expand All @@ -315,7 +319,7 @@ class SliceTraits {
/// The array type can be a N-pointer or a N-vector ( e.g int** integer two
/// dimensional array )
template <typename T>
void write(const T& buffer);
void write(const T& buffer, const DataTransferProps& xfer_props = DataTransferProps());

///
/// Write from a raw buffer into this dataset
Expand All @@ -326,8 +330,11 @@ class SliceTraits {
/// default conventions.
/// \param buffer: A buffer containing the data to be written
/// \param dtype: The type of the data, in case it cannot be automatically guessed
/// \param xfer_props: The HDF5 data transfer properties, e.g. collective MPI-IO.
template <typename T>
void write_raw(const T* buffer, const DataType& dtype = DataType());
void write_raw(const T* buffer,
const DataType& dtype = DataType(),
const DataTransferProps& xfer_props = DataTransferProps());

protected:
inline Selection select_impl(const HyperSlab& hyperslab, const DataSpace& memspace) const;
Expand Down
28 changes: 16 additions & 12 deletions include/highfive/bits/H5Slice_traits_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ inline Selection SliceTraits<Derivate>::select(const ElementSet& elements) const

template <typename Derivate>
template <typename T>
inline T SliceTraits<Derivate>::read() const {
inline T SliceTraits<Derivate>::read(const DataTransferProps& xfer_props) const {
T array;
read(array);
read(array, xfer_props);
return array;
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::read(T& array) const {
inline void SliceTraits<Derivate>::read(T& array, const DataTransferProps& xfer_props) const {
const auto& slice = static_cast<const Derivate&>(*this);
const DataSpace& mem_space = slice.getMemSpace();
const details::BufferInfo<T> buffer_info(slice.getDataType(), [slice]() -> std::string {
Expand All @@ -184,20 +184,22 @@ inline void SliceTraits<Derivate>::read(T& array) const {
}
auto dims = mem_space.getDimensions();
auto r = details::data_converter::get_reader<T>(dims, array);
read(r.get_pointer(), buffer_info.data_type);
read(r.get_pointer(), buffer_info.data_type, xfer_props);
// re-arrange results
r.unserialize();
auto t = create_datatype<typename details::inspector<T>::base_type>();
auto c = t.getClass();
if (c == DataTypeClass::VarLen) {
(void) H5Dvlen_reclaim(t.getId(), mem_space.getId(), H5P_DEFAULT, r.get_pointer());
(void) H5Dvlen_reclaim(t.getId(), mem_space.getId(), xfer_props.getId(), r.get_pointer());
}
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::read(T* array, const DataType& dtype) const {
inline void SliceTraits<Derivate>::read(T* array,
const DataType& dtype,
const DataTransferProps& xfer_props) const {
static_assert(!std::is_const<T>::value,
"read() requires a non-const structure to read data into");
const auto& slice = static_cast<const Derivate&>(*this);
Expand All @@ -211,16 +213,16 @@ inline void SliceTraits<Derivate>::read(T* array, const DataType& dtype) const {
mem_datatype.getId(),
details::get_memspace_id(slice),
slice.getSpace().getId(),
H5P_DEFAULT,
xfer_props.getId(),
static_cast<void*>(array)) < 0) {
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Read: ");
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Read.");
}
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::write(const T& buffer) {
inline void SliceTraits<Derivate>::write(const T& buffer, const DataTransferProps& xfer_props) {
const auto& slice = static_cast<const Derivate&>(*this);
const DataSpace& mem_space = slice.getMemSpace();
const details::BufferInfo<T> buffer_info(slice.getDataType(), [slice]() -> std::string {
Expand All @@ -234,13 +236,15 @@ inline void SliceTraits<Derivate>::write(const T& buffer) {
throw DataSpaceException(ss.str());
}
auto w = details::data_converter::serialize<T>(buffer);
write_raw(w.get_pointer(), buffer_info.data_type);
write_raw(w.get_pointer(), buffer_info.data_type, xfer_props);
}


template <typename Derivate>
template <typename T>
inline void SliceTraits<Derivate>::write_raw(const T* buffer, const DataType& dtype) {
inline void SliceTraits<Derivate>::write_raw(const T* buffer,
const DataType& dtype,
const DataTransferProps& xfer_props) {
using element_type = typename details::inspector<T>::base_type;
const auto& slice = static_cast<const Derivate&>(*this);
const auto& mem_datatype = dtype.empty() ? create_and_check_datatype<element_type>() : dtype;
Expand All @@ -249,7 +253,7 @@ inline void SliceTraits<Derivate>::write_raw(const T* buffer, const DataType& dt
mem_datatype.getId(),
details::get_memspace_id(slice),
slice.getSpace().getId(),
H5P_DEFAULT,
xfer_props.getId(),
static_cast<const void*>(buffer)) < 0) {
HDF5ErrMapper::ToException<DataSetException>("Error during HDF5 Write: ");
}
Expand Down
22 changes: 20 additions & 2 deletions src/examples/parallel_hdf5_write_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,28 @@ int main(int argc, char** argv) {

// Each node want to write its own rank two time in
// its associated row
int data[1][2] = {{mpi_rank, mpi_rank}};
double data[1][2] = {{mpi_rank * 1.0, mpi_rank * 2.0}};

auto xfer_props = DataTransferProps{};
xfer_props.add(UseCollectiveIO{});

// write it to the associated mpi_rank
dataset.select({std::size_t(mpi_rank), 0}, {1, 2}).write(data);
dataset.select({std::size_t(mpi_rank), 0}, {1, 2}).write(data, xfer_props);

// Currently, HighFive doesn't wrap retrieving information from property lists.
// Therefore, one needs to use HDF5 directly. For example, so see if and why
// collective MPI-IO operations were used, one may:
uint32_t local_cause = 0, global_cause = 0;
auto err = H5Pget_mpio_no_collective_cause(xfer_props.getId(), &local_cause, &global_cause);
if (err < 0) {
throw std::runtime_error("Failed to check mpio_no_collective_cause.");
}
if (local_cause || global_cause) {
std::cout
<< "The operation was successful, but couldn't use collective MPI-IO. local cause: "
<< local_cause << " global cause:" << global_cause << std::endl;
}


} catch (Exception& err) {
// catch and print any HDF5 error
Expand Down
68 changes: 51 additions & 17 deletions tests/unit/tests_high_five_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ struct MpiFixture {
int size;
};

void check_was_collective(const DataTransferProps& xfer_props) {
uint32_t local_cause = 0, global_cause = 0;
if (H5Pget_mpio_no_collective_cause(xfer_props.getId(), &local_cause, &global_cause) < 0) {
throw std::runtime_error("Failed to check mpio_no_collective_cause.");
}
CHECK(local_cause == 0);
CHECK(global_cause == 0);
}

template <typename T>
void selectionArraySimpleTestParallel() {
int mpi_rank, mpi_size;
Expand All @@ -54,7 +63,8 @@ void selectionArraySimpleTestParallel() {
const auto offset_x = static_cast<size_t>(mpi_rank);
const auto count_x = static_cast<size_t>(mpi_size - mpi_rank);

const std::string DATASET_NAME("dset");
const std::string d1_name("dset1");
const std::string d2_name("dset2");

Vector values(size_x);

Expand All @@ -66,31 +76,55 @@ void selectionArraySimpleTestParallel() {
adam.add(MPIOFileAccess(MPI_COMM_WORLD, MPI_INFO_NULL));
File file(filename.str(), File::ReadWrite | File::Create | File::Truncate, adam);

DataSet dataset = file.createDataSet<T>(DATASET_NAME, DataSpace::From(values));
DataSet d1 = file.createDataSet<T>(d1_name, DataSpace::From(values));
if (mpi_rank == 0) {
d1.write(values);
}

DataSet d2 = file.createDataSet<T>(d2_name, DataSpace::From(values));
auto xfer_props = DataTransferProps{};
xfer_props.add(UseCollectiveIO{});

{
auto offset = std::vector<size_t>{static_cast<size_t>(mpi_rank)};
auto count = std::vector<size_t>{1ul};
auto slice = d2.select(offset, count);

dataset.write(values);
auto local_values = Vector(count[0]);
local_values[0] = values[offset[0]];

// Write collectively, each MPI rank writes one slab.
slice.write(local_values, xfer_props);
check_was_collective(xfer_props);
}

file.flush();

// read it back
Vector result;
std::vector<size_t> offset;
offset.push_back(offset_x);
std::vector<size_t> size;
size.push_back(count_x);
// -- read it back

auto check_result = [&values, offset_x, count_x](const Vector& result) {
CHECK(result.size() == count_x);

Selection slice = dataset.select(offset, size);
for (size_t i = offset_x; i < count_x; ++i) {
CHECK(values[i + offset_x] == result[i]);
}
};

CHECK(slice.getSpace().getDimensions()[0] == size_x);
CHECK(slice.getMemSpace().getDimensions()[0] == count_x);
auto make_slice = [size_x, offset_x, count_x](DataSet& dataset) {
auto slice = dataset.select(std::vector<size_t>{offset_x}, std::vector<size_t>{count_x});

slice.read(result);
CHECK(slice.getSpace().getDimensions()[0] == size_x);
CHECK(slice.getMemSpace().getDimensions()[0] == count_x);

CHECK(result.size() == count_x);
return slice;
};

for (size_t i = offset_x; i < count_x; ++i) {
CHECK(values[i + offset_x] == result[i]);
}
auto s1 = make_slice(d1);
check_result(s1.template read<Vector>());

auto s2 = make_slice(d2);
check_result(s2.template read<Vector>(xfer_props));
check_was_collective(xfer_props);
}

TEMPLATE_LIST_TEST_CASE("mpiSelectionArraySimple", "[template]", numerical_test_types) {
Expand Down