Skip to content

Commit

Permalink
Thread local memory allocation for BuildHist (#6358)
Browse files Browse the repository at this point in the history
* thread mem locality

* fix apply

* cleanup

* fix lint

* fix tests

* simple try

* fix

* fix

* apply comments

* fix comments

* fix

* apply simple comment

Co-authored-by: ShvetsKS <kirill.shvets@intel.com>
  • Loading branch information
ShvetsKS and ShvetsKS committed Nov 25, 2020
1 parent 4dbbeb6 commit 956beea
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 47 deletions.
66 changes: 43 additions & 23 deletions src/common/hist_util.h
Expand Up @@ -407,9 +407,14 @@ class HistCollection {
// access histogram for i-th node
GHistRowT operator[](bst_uint nid) const {
constexpr uint32_t kMax = std::numeric_limits<uint32_t>::max();
CHECK_NE(row_ptr_[nid], kMax);
GradientPairT* ptr =
const_cast<GradientPairT*>(dmlc::BeginPtr(data_) + row_ptr_[nid]);
const size_t id = row_ptr_[nid];
CHECK_NE(id, kMax);
GradientPairT* ptr = nullptr;
if (contiguous_allocation_) {
ptr = const_cast<GradientPairT*>(data_[0].data() + nbins_*id);
} else {
ptr = const_cast<GradientPairT*>(data_[id].data());
}
return {ptr, nbins_};
}

Expand Down Expand Up @@ -438,21 +443,37 @@ class HistCollection {
}
CHECK_EQ(row_ptr_[nid], kMax);

if (data_.size() < nbins_ * (nid + 1)) {
data_.resize(nbins_ * (nid + 1));
if (data_.size() < (nid + 1)) {
data_.resize((nid + 1));
}

row_ptr_[nid] = nbins_ * n_nodes_added_;
row_ptr_[nid] = n_nodes_added_;
n_nodes_added_++;
}
// allocate thread local memory i-th node
void AllocateData(bst_uint nid) {
if (data_[row_ptr_[nid]].size() == 0) {
data_[row_ptr_[nid]].resize(nbins_, {0, 0});
}
}
// allocate common buffer contiguously for all nodes, need for single Allreduce call
void AllocateAllData() {
const size_t new_size = nbins_*data_.size();
contiguous_allocation_ = true;
if (data_[0].size() != new_size) {
data_[0].resize(new_size);
}
}

private:
/*! \brief number of all bins over all features */
uint32_t nbins_ = 0;
/*! \brief amount of active nodes in hist collection */
uint32_t n_nodes_added_ = 0;
/*! \brief flag to identify contiguous memory allocation */
bool contiguous_allocation_ = false;

std::vector<GradientPairT> data_;
std::vector<std::vector<GradientPairT>> data_;

/*! \brief row_ptr_[nid] locates bin for histogram of node nid */
std::vector<size_t> row_ptr_;
Expand Down Expand Up @@ -481,7 +502,6 @@ class ParallelGHistBuilder {
const std::vector<GHistRowT>& targeted_hists) {
hist_buffer_.Init(nbins_);
tid_nid_to_hist_.clear();
hist_memory_.clear();
threads_to_nids_map_.clear();

targeted_hists_ = targeted_hists;
Expand All @@ -504,8 +524,11 @@ class ParallelGHistBuilder {
CHECK_LT(nid, nodes_);
CHECK_LT(tid, nthreads_);

size_t idx = tid_nid_to_hist_.at({tid, nid});
GHistRowT hist = hist_memory_[idx];
int idx = tid_nid_to_hist_.at({tid, nid});
if (idx >= 0) {
hist_buffer_.AllocateData(idx);
}
GHistRowT hist = idx == -1 ? targeted_hists_[nid] : hist_buffer_[idx];

if (!hist_was_used_[tid * nodes_ + nid]) {
InitilizeHistByZeroes(hist, 0, hist.size());
Expand All @@ -526,8 +549,9 @@ class ParallelGHistBuilder {
for (size_t tid = 0; tid < nthreads_; ++tid) {
if (hist_was_used_[tid * nodes_ + nid]) {
is_updated = true;
const size_t idx = tid_nid_to_hist_.at({tid, nid});
GHistRowT src = hist_memory_[idx];

int idx = tid_nid_to_hist_.at({tid, nid});
GHistRowT src = idx == -1 ? targeted_hists_[nid] : hist_buffer_[idx];

if (dst.data() != src.data()) {
IncrementHist(dst, src, begin, end);
Expand Down Expand Up @@ -589,23 +613,18 @@ class ParallelGHistBuilder {
}

void MatchNodeNidPairToHist() {
size_t hist_total = 0;
size_t hist_allocated_additionally = 0;

for (size_t nid = 0; nid < nodes_; ++nid) {
bool first_hist = true;
for (size_t tid = 0; tid < nthreads_; ++tid) {
if (threads_to_nids_map_[tid * nodes_ + nid]) {
if (first_hist) {
hist_memory_.push_back(targeted_hists_[nid]);
tid_nid_to_hist_[{tid, nid}] = -1;
first_hist = false;
} else {
hist_memory_.push_back(hist_buffer_[hist_allocated_additionally]);
hist_allocated_additionally++;
tid_nid_to_hist_[{tid, nid}] = hist_allocated_additionally++;
}
// map pair {tid, nid} to index of allocated histogram from hist_memory_
tid_nid_to_hist_[{tid, nid}] = hist_total++;
CHECK_EQ(hist_total, hist_memory_.size());
}
}
}
Expand All @@ -630,10 +649,11 @@ class ParallelGHistBuilder {
std::vector<bool> threads_to_nids_map_;
/*! \brief Contains histograms for final results */
std::vector<GHistRowT> targeted_hists_;
/*! \brief Allocated memory for histograms used for construction */
std::vector<GHistRowT> hist_memory_;
/*! \brief map pair {tid, nid} to index of allocated histogram from hist_memory_ */
std::map<std::pair<size_t, size_t>, size_t> tid_nid_to_hist_;
/*!
* \brief map pair {tid, nid} to index of allocated histogram from hist_buffer_ and targeted_hists_,
* -1 is reserved for targeted_hists_
*/
std::map<std::pair<size_t, size_t>, int> tid_nid_to_hist_;
};

/*!
Expand Down
46 changes: 28 additions & 18 deletions src/common/row_set.h
Expand Up @@ -11,6 +11,7 @@
#include <algorithm>
#include <vector>
#include <utility>
#include <memory>

namespace xgboost {
namespace common {
Expand Down Expand Up @@ -150,24 +151,33 @@ class PartitionBuilder {
}
}

// allocate thread local memory, should be called for each specific task
void AllocateForTask(size_t id) {
if (mem_blocks_[id].get() == nullptr) {
BlockInfo* local_block_ptr = new BlockInfo;
CHECK_NE(local_block_ptr, (BlockInfo*)nullptr);
mem_blocks_[id].reset(local_block_ptr);
}
}

common::Span<size_t> GetLeftBuffer(int nid, size_t begin, size_t end) {
const size_t task_idx = GetTaskIdx(nid, begin);
return { mem_blocks_.at(task_idx).Left(), end - begin };
return { mem_blocks_.at(task_idx)->Left(), end - begin };
}

common::Span<size_t> GetRightBuffer(int nid, size_t begin, size_t end) {
const size_t task_idx = GetTaskIdx(nid, begin);
return { mem_blocks_.at(task_idx).Right(), end - begin };
return { mem_blocks_.at(task_idx)->Right(), end - begin };
}

void SetNLeftElems(int nid, size_t begin, size_t end, size_t n_left) {
size_t task_idx = GetTaskIdx(nid, begin);
mem_blocks_.at(task_idx).n_left = n_left;
mem_blocks_.at(task_idx)->n_left = n_left;
}

void SetNRightElems(int nid, size_t begin, size_t end, size_t n_right) {
size_t task_idx = GetTaskIdx(nid, begin);
mem_blocks_.at(task_idx).n_right = n_right;
mem_blocks_.at(task_idx)->n_right = n_right;
}


Expand All @@ -185,13 +195,13 @@ class PartitionBuilder {
for (size_t i = 0; i < blocks_offsets_.size()-1; ++i) {
size_t n_left = 0;
for (size_t j = blocks_offsets_[i]; j < blocks_offsets_[i+1]; ++j) {
mem_blocks_[j].n_offset_left = n_left;
n_left += mem_blocks_[j].n_left;
mem_blocks_[j]->n_offset_left = n_left;
n_left += mem_blocks_[j]->n_left;
}
size_t n_right = 0;
for (size_t j = blocks_offsets_[i]; j < blocks_offsets_[i+1]; ++j) {
mem_blocks_[j].n_offset_right = n_left + n_right;
n_right += mem_blocks_[j].n_right;
mem_blocks_[j]->n_offset_right = n_left + n_right;
n_right += mem_blocks_[j]->n_right;
}
left_right_nodes_sizes_[i] = {n_left, n_right};
}
Expand All @@ -200,21 +210,21 @@ class PartitionBuilder {
void MergeToArray(int nid, size_t begin, size_t* rows_indexes) {
size_t task_idx = GetTaskIdx(nid, begin);

size_t* left_result = rows_indexes + mem_blocks_[task_idx].n_offset_left;
size_t* right_result = rows_indexes + mem_blocks_[task_idx].n_offset_right;
size_t* left_result = rows_indexes + mem_blocks_[task_idx]->n_offset_left;
size_t* right_result = rows_indexes + mem_blocks_[task_idx]->n_offset_right;

const size_t* left = mem_blocks_[task_idx].Left();
const size_t* right = mem_blocks_[task_idx].Right();
const size_t* left = mem_blocks_[task_idx]->Left();
const size_t* right = mem_blocks_[task_idx]->Right();

std::copy_n(left, mem_blocks_[task_idx].n_left, left_result);
std::copy_n(right, mem_blocks_[task_idx].n_right, right_result);
std::copy_n(left, mem_blocks_[task_idx]->n_left, left_result);
std::copy_n(right, mem_blocks_[task_idx]->n_right, right_result);
}

protected:
size_t GetTaskIdx(int nid, size_t begin) {
return blocks_offsets_[nid] + begin / BlockSize;
}

protected:
struct BlockInfo{
size_t n_left;
size_t n_right;
Expand All @@ -230,12 +240,12 @@ class PartitionBuilder {
return &right_data_[0];
}
private:
alignas(128) size_t left_data_[BlockSize];
alignas(128) size_t right_data_[BlockSize];
size_t left_data_[BlockSize];
size_t right_data_[BlockSize];
};
std::vector<std::pair<size_t, size_t>> left_right_nodes_sizes_;
std::vector<size_t> blocks_offsets_;
std::vector<BlockInfo> mem_blocks_;
std::vector<std::shared_ptr<BlockInfo>> mem_blocks_;
size_t max_n_tasks_ = 0;
};

Expand Down
11 changes: 9 additions & 2 deletions src/tree/updater_quantile_hist.cc
Expand Up @@ -182,8 +182,10 @@ void DistributedHistSynchronizer<GradientSumT>::SyncHistograms(BuilderT* builder
}
});
builder->builder_monitor_.Start("SyncHistogramsAllreduce");

builder->histred_.Allreduce(builder->hist_[starting_index].data(),
builder->hist_builder_.GetNumBins() * sync_count);

builder->builder_monitor_.Stop("SyncHistogramsAllreduce");

ParallelSubtractionHist(builder, space, builder->nodes_for_explicit_hist_build_, p_tree);
Expand Down Expand Up @@ -232,7 +234,7 @@ void BatchHistRowsAdder<GradientSumT>::AddHistRows(BuilderT *builder,
for (auto const& node : builder->nodes_for_subtraction_trick_) {
builder->hist_.AddHistRow(node.nid);
}

builder->hist_.AllocateAllData();
builder->builder_monitor_.Stop("AddHistRows");
}

Expand Down Expand Up @@ -268,6 +270,8 @@ void DistributedHistRowsAdder<GradientSumT>::AddHistRows(BuilderT *builder,
builder->hist_local_worker_.AddHistRow(nid);
}
}
builder->hist_.AllocateAllData();
builder->hist_local_worker_.AllocateAllData();
(*sync_count) = std::max(1, n_left);
builder->builder_monitor_.Stop("AddHistRows");
}
Expand Down Expand Up @@ -1166,7 +1170,7 @@ template <typename GradientSumT>
void QuantileHistMaker::Builder<GradientSumT>::ApplySplit(const std::vector<ExpandEntry> nodes,
const GHistIndexMatrix& gmat,
const ColumnMatrix& column_matrix,
const HistCollection<GradientSumT>&,
const HistCollection<GradientSumT>& hist,
RegTree* p_tree) {
builder_monitor_.Start("ApplySplit");
// 1. Find split condition for each split
Expand All @@ -1189,7 +1193,10 @@ void QuantileHistMaker::Builder<GradientSumT>::ApplySplit(const std::vector<Expa
// 2.3 Split elements of row_set_collection_ to left and right child-nodes for each node
// Store results in intermediate buffers from partition_builder_
common::ParallelFor2d(space, this->nthread_, [&](size_t node_in_set, common::Range1d r) {
size_t begin = r.begin();
const int32_t nid = nodes[node_in_set].nid;
const size_t task_id = partition_builder_.GetTaskIdx(node_in_set, begin);
partition_builder_.AllocateForTask(task_id);
switch (column_matrix.GetTypeSize()) {
case common::kUint8BinsTypeSize:
PartitionKernel<uint8_t>(node_in_set, nid, r,
Expand Down
4 changes: 2 additions & 2 deletions tests/cpp/common/test_hist_util.cc
Expand Up @@ -35,7 +35,7 @@ void ParallelGHistBuilderReset() {
for(size_t inode = 0; inode < kNodesExtended; inode++) {
collection.AddHistRow(inode);
}

collection.AllocateAllData();
ParallelGHistBuilder<GradientSumT> hist_builder;
hist_builder.Init(kBins);
std::vector<GHistRow<GradientSumT>> target_hist(kNodes);
Expand Down Expand Up @@ -91,7 +91,7 @@ void ParallelGHistBuilderReduceHist(){
for(size_t inode = 0; inode < kNodes; inode++) {
collection.AddHistRow(inode);
}

collection.AllocateAllData();
ParallelGHistBuilder<GradientSumT> hist_builder;
hist_builder.Init(kBins);
std::vector<GHistRow<GradientSumT>> target_hist(kNodes);
Expand Down
2 changes: 2 additions & 0 deletions tests/cpp/common/test_partition_builder.cc
Expand Up @@ -32,6 +32,8 @@ TEST(PartitionBuilder, BasicTest) {
for(size_t j = 0; j < tasks[nid]; ++j) {
size_t begin = kBlockSize*j;
size_t end = kBlockSize*(j+1);
const size_t id = builder.GetTaskIdx(nid, begin);
builder.AllocateForTask(id);

auto left = builder.GetLeftBuffer(nid, begin, end);
auto right = builder.GetRightBuffer(nid, begin, end);
Expand Down
7 changes: 5 additions & 2 deletions tests/cpp/tree/test_quantile_hist.cc
Expand Up @@ -274,6 +274,7 @@ class QuantileHistMock : public QuantileHistMaker {
RealImpl::InitData(gmat, gpair, fmat, tree);
GHistIndexBlockMatrix dummy;
this->hist_.AddHistRow(nid);
this->hist_.AllocateAllData();
this->BuildHist(gpair, this->row_set_collection_[nid],
gmat, dummy, this->hist_[nid]);

Expand Down Expand Up @@ -315,7 +316,7 @@ class QuantileHistMock : public QuantileHistMaker {

RealImpl::InitData(gmat, row_gpairs, *dmat, tree);
this->hist_.AddHistRow(0);

this->hist_.AllocateAllData();
this->BuildHist(row_gpairs, this->row_set_collection_[0],
gmat, quantile_index_block, this->hist_[0]);

Expand Down Expand Up @@ -411,7 +412,7 @@ class QuantileHistMock : public QuantileHistMaker {
cm.Init(gmat, 0.0);
RealImpl::InitData(gmat, row_gpairs, *dmat, tree);
this->hist_.AddHistRow(0);

this->hist_.AllocateAllData();
RealImpl::InitNewNode(0, gmat, row_gpairs, *dmat, tree);

const size_t num_row = dmat->Info().num_row_;
Expand Down Expand Up @@ -449,6 +450,8 @@ class QuantileHistMock : public QuantileHistMaker {
RealImpl::partition_builder_.Init(1, 1, [&](size_t node_in_set) {
return 1;
});
const size_t task_id = RealImpl::partition_builder_.GetTaskIdx(0, 0);
RealImpl::partition_builder_.AllocateForTask(task_id);
this->template PartitionKernel<uint8_t>(0, 0, common::Range1d(0, kNRows),
split, cm, tree);
RealImpl::partition_builder_.CalculateRowOffsets();
Expand Down

0 comments on commit 956beea

Please sign in to comment.