Skip to content

Commit

Permalink
Move gather/scatter code to their own functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Rombur committed Jan 23, 2024
1 parent c0ce7da commit 77edb6d
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 106 deletions.
237 changes: 132 additions & 105 deletions source/DataAssimilator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DataAssimilator::DataAssimilator(MPI_Comm const &global_communicator,
_local_communicator(local_communicator), _color(color)
{
_global_rank = dealii::Utilities::MPI::this_mpi_process(_global_communicator);
_global_comm_size =
dealii::Utilities::MPI::n_mpi_processes(_global_communicator);

if (_global_rank == 0)
{
Expand Down Expand Up @@ -92,6 +94,108 @@ void DataAssimilator::update_ensemble(
&augmented_state_ensemble,
std::vector<double> const &expt_data, dealii::SparseMatrix<double> const &R)
{

std::vector<dealii::LA::distributed::BlockVector<double>>
global_augmented_state_ensemble;
std::vector<unsigned int> local_n_ensemble_members(_global_comm_size);
std::vector<block_size_type> block_sizes(2, 0);

gather_ensemble_members(augmented_state_ensemble,
global_augmented_state_ensemble,
local_n_ensemble_members, block_sizes);

if (_global_rank == 0)
{
// Set some constants
_num_ensemble_members = global_augmented_state_ensemble.size();
_sim_size = block_sizes[0];
_parameter_size = block_sizes[1];

adamantine::ASSERT_THROW(_expt_size == expt_data.size(),
"Error: Unexpected experiment vector size.");

// Check if R is diagonal, needed for filling the noise vector
auto bandwidth = R.get_sparsity_pattern().bandwidth();
bool const R_is_diagonal = bandwidth == 0 ? true : false;

// Get the perturbed innovation, ( y+u - Hx )
// This is determined using the unaugmented state because the parameters
// are not observable
if (_global_rank == 0)
std::cout << "Getting the perturbed innovation..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_get_pert_inno");
#endif

int constexpr base_state = 0;
std::vector<dealii::Vector<double>> perturbed_innovation(
_num_ensemble_members);
for (unsigned int member = 0; member < _num_ensemble_members; ++member)
{
perturbed_innovation[member].reinit(_expt_size);
fill_noise_vector(perturbed_innovation[member], R, R_is_diagonal);
dealii::Vector<double> temporary =
calc_Hx(global_augmented_state_ensemble[member].block(base_state));

for (unsigned int i = 0; i < _expt_size; ++i)
{
perturbed_innovation[member][i] += expt_data[i] - temporary[i];
}
}

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_get_pert_inno");
#endif

// Apply the Kalman gain to update the augmented state ensemble
if (_global_rank == 0)
std::cout << "Applying the Kalman gain..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_apply_K");
#endif

// Apply the Kalman filter to the perturbed innovation, K ( y+u - Hx )
std::vector<dealii::LA::distributed::BlockVector<double>> forecast_shift =
apply_kalman_gain(global_augmented_state_ensemble, R,
perturbed_innovation);

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_apply_K");
#endif

// Update the ensemble, x = x + K ( y+u - Hx )
if (_global_rank == 0)
std::cout << "Updating the ensemble members..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_update_members");
#endif

for (unsigned int member = 0; member < _num_ensemble_members; ++member)
{
global_augmented_state_ensemble[member] += forecast_shift[member];
}

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_update_members");
#endif
}

scatter_ensemble_members(augmented_state_ensemble,
global_augmented_state_ensemble,
local_n_ensemble_members, block_sizes);
}

void DataAssimilator::gather_ensemble_members(
std::vector<dealii::LA::distributed::BlockVector<double>>
&augmented_state_ensemble,
std::vector<dealii::LA::distributed::BlockVector<double>>
&global_augmented_state_ensemble,
std::vector<unsigned int> &local_n_ensemble_members,
std::vector<block_size_type> &block_sizes)
{
// We need to gather the augmented_state_ensemble from the other processors.
// BlockVector is a complex structure with its own communicator and so we
// cannot simple use dealii's gather to perform the communication. Instead, we
Expand Down Expand Up @@ -127,12 +231,9 @@ void DataAssimilator::update_ensemble(
auto local_indexsets_block_1 = dealii::Utilities::MPI::gather(
_local_communicator,
augmented_state_ensemble[0].block(1).locally_owned_elements());
// The local processor zero has all the data. Reorder the data before sending
// it to the global processor zero
// The local processor zero has all the data. Reorder the data before
// sending it to the global processor zero
std::vector<std::vector<std::vector<double>>> reordered_local_block_data;
using block_size_type =
dealii::LA::distributed::BlockVector<double>::size_type;
std::vector<block_size_type> block_sizes(2, 0);
if (dealii::Utilities::MPI::this_mpi_process(_local_communicator) == 0)
{
reordered_local_block_data.resize(n_local_ensemble_members,
Expand Down Expand Up @@ -172,12 +273,6 @@ void DataAssimilator::update_ensemble(
auto all_local_block_data = dealii::Utilities::MPI::gather(
_global_communicator, reordered_local_block_data);

std::vector<dealii::LA::distributed::BlockVector<double>>
global_augmented_state_ensemble;
unsigned int const global_comm_size =
dealii::Utilities::MPI::n_mpi_processes(_global_communicator);
std::vector<unsigned int> local_n_ensemble_members(global_comm_size);

if (_global_rank == 0)
{
// Build the new BlockVector
Expand All @@ -204,99 +299,32 @@ void DataAssimilator::update_ensemble(
}
}
}

// Set some constants
_num_ensemble_members = global_augmented_state_ensemble.size();
_sim_size = block_sizes[0];
_parameter_size = block_sizes[1];

adamantine::ASSERT_THROW(_expt_size == expt_data.size(),
"Error: Unexpected experiment vector size.");

// Check if R is diagonal, needed for filling the noise vector
auto bandwidth = R.get_sparsity_pattern().bandwidth();
bool const R_is_diagonal = bandwidth == 0 ? true : false;

// Get the perturbed innovation, ( y+u - Hx )
// This is determined using the unaugmented state because the parameters are
// not observable
if (_global_rank == 0)
std::cout << "Getting the perturbed innovation..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_get_pert_inno");
#endif

int constexpr base_state = 0;
std::vector<dealii::Vector<double>> perturbed_innovation(
_num_ensemble_members);
for (unsigned int member = 0; member < _num_ensemble_members; ++member)
{
perturbed_innovation[member].reinit(_expt_size);
fill_noise_vector(perturbed_innovation[member], R, R_is_diagonal);
dealii::Vector<double> temporary =
calc_Hx(global_augmented_state_ensemble[member].block(base_state));

for (unsigned int i = 0; i < _expt_size; ++i)
{
perturbed_innovation[member][i] += expt_data[i] - temporary[i];
}
}

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_get_pert_inno");
#endif

// Apply the Kalman gain to update the augmented state ensemble
if (_global_rank == 0)
std::cout << "Applying the Kalman gain..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_apply_K");
#endif

// Apply the Kalman filter to the perturbed innovation, K ( y+u - Hx )
std::vector<dealii::LA::distributed::BlockVector<double>> forecast_shift =
apply_kalman_gain(global_augmented_state_ensemble, R,
perturbed_innovation);

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_apply_K");
#endif

// Update the ensemble, x = x + K ( y+u - Hx )
if (_global_rank == 0)
std::cout << "Updating the ensemble members..." << std::endl;

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_BEGIN("da_update_members");
#endif

for (unsigned int member = 0; member < _num_ensemble_members; ++member)
{
global_augmented_state_ensemble[member] += forecast_shift[member];
}

#ifdef ADAMANTINE_WITH_CALIPER
CALI_MARK_END("da_update_members");
#endif
}
}

void DataAssimilator::scatter_ensemble_members(
std::vector<dealii::LA::distributed::BlockVector<double>>
&augmented_state_ensemble,
std::vector<dealii::LA::distributed::BlockVector<double>> const
&global_augmented_state_ensemble,
std::vector<unsigned int> const &local_n_ensemble_members,
std::vector<block_size_type> const &block_sizes)
{
// Scatter global_augmented_state_ensemble to augmented_state_ensemble.
// First we split the data to the root of the local communicators and then,
// the data is moved inside each local communicator.
// deal.II has isend and irecv functions but we cannot them. Using these
// functions will result in a deadlock because the future returned by isend is
// blocking until we call the get function of the future returned by irecv.
// functions will result in a deadlock because the future returned by isend
// is blocking until we call the get function of the future returned by
// irecv.

std::vector<char> packed_recv_buffer;
if (_global_rank == 0)
{
unsigned int const n_local_roots = all_local_block_data.size();
std::vector<std::vector<char>> packed_send_buffers(n_local_roots);
std::vector<MPI_Request> mpi_requests(n_local_roots);
std::vector<std::vector<char>> packed_send_buffers(_global_comm_size);
std::vector<MPI_Request> mpi_requests(_global_comm_size);
unsigned int global_member_id = 0;
unsigned int local_root_id = 0;
for (unsigned int i = 0; i < global_comm_size; ++i)
for (int i = 0; i < _global_comm_size; ++i)
{
unsigned int const local_size = local_n_ensemble_members[i];
std::vector<std::vector<double>> send_buffer(local_size);
Expand All @@ -313,12 +341,9 @@ void DataAssimilator::update_ensemble(
++global_member_id;
}
// Pack and send the data to the local root rank
packed_send_buffers[local_root_id] = dealii::Utilities::pack(send_buffer);
MPI_Isend(packed_send_buffers[local_root_id].data(),
packed_send_buffers[local_root_id].size(), MPI_CHAR, i, 0,
_global_communicator, &mpi_requests[local_root_id]);

++local_root_id;
packed_send_buffers[i] = dealii::Utilities::pack(send_buffer);
MPI_Isend(packed_send_buffers[i].data(), packed_send_buffers[i].size(),
MPI_CHAR, i, 0, _global_communicator, &mpi_requests[i]);
}

// Receive the data. First, call MPI_Probe to get the size of the message.
Expand Down Expand Up @@ -353,8 +378,8 @@ void DataAssimilator::update_ensemble(
packed_recv_buffer);

// The local root ranks have all the data, now we need to update
// augmented_state_ensemble. This communication is easier to do than the other
// communications because we can use deal.II's built-in functions.
// augmented_state_ensemble. This communication is easier to do than the
// other communications because we can use deal.II's built-in functions.
for (unsigned int m = 0; m < augmented_state_ensemble.size(); ++m)
{
for (unsigned int b = 0; b < 2; ++b)
Expand Down Expand Up @@ -431,8 +456,9 @@ DataAssimilator::apply_kalman_gain(
dealii::Vector<double> temporary = op_K * entry;

// Copy into a distributed block vector, this is the only place where
// the mismatch matters, using dealii::Vector for the experimental data
// and dealii::LA::distributed::BlockVector for the simulation data.
// the mismatch matters, using dealii::Vector for the experimental
// data and dealii::LA::distributed::BlockVector for the simulation
// data.
dealii::LA::distributed::BlockVector<double> output_member(block_sizes);
for (unsigned int i = 0; i < augmented_state_size; ++i)
{
Expand Down Expand Up @@ -522,7 +548,8 @@ void DataAssimilator::update_covariance_sparsity_pattern(
if (_global_rank == 0)
{
// We need IndexSet to build the sparsity pattern. Since the data
// assimilation is done is serial, the IndexSet just contains everything.
// assimilation is done is serial, the IndexSet just contains
// everything.
dealii::IndexSet parallel_partitioning =
dealii::complete_index_set(augmented_state_size);
parallel_partitioning.compress();
Expand Down
45 changes: 44 additions & 1 deletion source/DataAssimilator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,33 @@ public:
const unsigned int parameter_size);

private:
using block_size_type =
dealii::LA::distributed::BlockVector<double>::size_type;

/**
* Gather the augmented ensemble members on the global root node in order to
* apply the Kalman gain.
*/
void gather_ensemble_members(
std::vector<dealii::LA::distributed::BlockVector<double>>
&augmented_state_ensemble,
std::vector<dealii::LA::distributed::BlockVector<double>>
&global_augmented_state_ensemble,
std::vector<unsigned int> &local_n_ensemble_members,
std::vector<block_size_type> &block_sizes);

/**
* Scatter the @p global_augmented_state_ensemble back into @p
* augmented_state_ensemble.
*/
void scatter_ensemble_members(
std::vector<dealii::LA::distributed::BlockVector<double>>
&augmented_state_ensemble,
std::vector<dealii::LA::distributed::BlockVector<double>> const
&global_augmented_state_ensemble,
std::vector<unsigned int> const &local_n_ensemble_members,
std::vector<block_size_type> const &block_sizes);

/**
* This calculates the Kalman gain and applies it to the perturbed innovation.
*/
Expand Down Expand Up @@ -160,13 +187,29 @@ private:
std::vector<dealii::LA::distributed::BlockVector<double>> const
&vec_ensemble) const;

// TODO
/**
* Global MPI communicator.
*/
MPI_Comm _global_communicator;

/**
* Local MPI communicator split off the global MPI communicator.
*/
MPI_Comm _local_communicator;

/**
* Rank of the process when using the global MPI communicator.
*/
int _global_rank = -1;

/**
* Number of processes in the gglbal MPI communicator.
*/
int _global_comm_size = -1;

/**
* Color associated with the local
*/
int _color = -1;

/**
Expand Down

0 comments on commit 77edb6d

Please sign in to comment.