Skip to content

Commit

Permalink
Fix single row prediction performance in a multi-threaded environment (
Browse files Browse the repository at this point in the history
  • Loading branch information
Ten0 committed Mar 18, 2024
1 parent 43f021b commit 0a3e1a5
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 62 deletions.
4 changes: 4 additions & 0 deletions include/LightGBM/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const int kDefaultNumLeaves = 31;

struct Config {
public:
Config() {}
explicit Config(std::unordered_map<std::string, std::string> parameters_map) {
Set(parameters_map);
}
std::string ToString() const;
/*!
* \brief Get string value by specific name of key
Expand Down
159 changes: 97 additions & 62 deletions src/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ yamc::shared_lock<yamc::alternate::shared_mutex> lock(&mtx);
const int PREDICTOR_TYPES = 4;

// Single row predictor to abstract away caching logic
class SingleRowPredictor {
class SingleRowPredictorInner {
public:
PredictFunction predict_function;
int64_t num_pred_in_one_row;

SingleRowPredictor(int predict_type, Boosting* boosting, const Config& config, int start_iter, int num_iter) {
SingleRowPredictorInner(int predict_type, Boosting* boosting, const Config& config, int start_iter, int num_iter) {
bool is_predict_leaf = false;
bool is_raw_score = false;
bool predict_contrib = false;
Expand All @@ -86,7 +86,7 @@ class SingleRowPredictor {
num_total_model_ = boosting->NumberOfTotalModel();
}

~SingleRowPredictor() {}
~SingleRowPredictorInner() {}

bool IsPredictorEqual(const Config& config, int iter, Boosting* boosting) {
return early_stop_ == config.pred_early_stop &&
Expand All @@ -105,6 +105,60 @@ class SingleRowPredictor {
int num_total_model_;
};

/*!
* \brief Object to store resources meant for single-row Fast Predict methods.
*
* For legacy reasons this is called `FastConfig` in the public C API.
*
* Meant to be used by the *Fast* predict methods only.
* It stores the configuration and prediction resources for reuse across predictions.
*/
struct SingleRowPredictor {
public:
SingleRowPredictor(yamc::alternate::shared_mutex *booster_mutex,
const char *parameters,
const int data_type,
const int32_t num_cols,
int predict_type,
Boosting *boosting,
int start_iter,
int num_iter) : config(Config::Str2Map(parameters)), data_type(data_type), num_cols(num_cols), single_row_predictor_inner(predict_type, boosting, config, start_iter, num_iter), booster_mutex(booster_mutex) {
if (!config.predict_disable_shape_check && num_cols != boosting->MaxFeatureIdx() + 1) {
Log::Fatal("The number of features in data (%d) is not the same as it was in training data (%d).\n"\
"You can set ``predict_disable_shape_check=true`` to discard this error, but please be aware what you are doing.", num_cols, boosting->MaxFeatureIdx() + 1);
}
}

void Predict(std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
double* out_result, int64_t* out_len) const {
UNIQUE_LOCK(single_row_predictor_mutex)
yamc::shared_lock<yamc::alternate::shared_mutex> booster_shared_lock(booster_mutex);

auto one_row = get_row_fun(0);
single_row_predictor_inner.predict_function(one_row, out_result);

*out_len = single_row_predictor_inner.num_pred_in_one_row;
}

public:
Config config;
const int data_type;
const int32_t num_cols;

private:
SingleRowPredictorInner single_row_predictor_inner;

// Prevent the booster from being modified while we have a predictor relying on it during prediction
yamc::alternate::shared_mutex *booster_mutex;

// If several threads try to predict at the same time using the same SingleRowPredictor
// we want them to still provide correct values, so the mutex is necessary due to the shared
// resources in the predictor.
// However the recommended approach is to instantiate one SingleRowPredictor per thread,
// to avoid contention here.
mutable yamc::alternate::shared_mutex single_row_predictor_mutex;
};

class Booster {
public:
explicit Booster(const char* filename) {
Expand Down Expand Up @@ -374,15 +428,26 @@ class Booster {
boosting_->RollbackOneIter();
}

void SetSingleRowPredictor(int start_iteration, int num_iteration, int predict_type, const Config& config) {
void SetSingleRowPredictorInner(int start_iteration, int num_iteration, int predict_type, const Config& config) {
UNIQUE_LOCK(mutex_)
if (single_row_predictor_[predict_type].get() == nullptr ||
!single_row_predictor_[predict_type]->IsPredictorEqual(config, num_iteration, boosting_.get())) {
single_row_predictor_[predict_type].reset(new SingleRowPredictor(predict_type, boosting_.get(),
single_row_predictor_[predict_type].reset(new SingleRowPredictorInner(predict_type, boosting_.get(),
config, start_iteration, num_iteration));
}
}

std::unique_ptr<SingleRowPredictor> InitSingleRowPredictor(int predict_type, int start_iteration, int num_iteration, int data_type, int32_t num_cols, const char *parameters) {
// Workaround https://github.com/microsoft/LightGBM/issues/6142 by locking here
// This is only a workaround because if predictors are initialized differently it may still behave incorrectly,
// and because multiple racing Predictor initializations through LGBM_BoosterPredictForMat suffers from that same issue of Predictor init writing things in the booster.
// Once #6142 is fixed (predictor doesn't write in the Booster as should have been the case since 1c35c3b9ede9adab8ccc5fd7b4b2b6af188a79f0), this line can be removed.
UNIQUE_LOCK(mutex_)

return std::unique_ptr<SingleRowPredictor>(new SingleRowPredictor(
&mutex_, parameters, data_type, num_cols, predict_type, boosting_.get(), start_iteration, num_iteration));
}

void PredictSingleRow(int predict_type, int ncol,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
Expand Down Expand Up @@ -815,7 +880,7 @@ class Booster {
private:
const Dataset* train_data_;
std::unique_ptr<Boosting> boosting_;
std::unique_ptr<SingleRowPredictor> single_row_predictor_[PREDICTOR_TYPES];
std::unique_ptr<SingleRowPredictorInner> single_row_predictor_[PREDICTOR_TYPES];

/*! \brief All configs */
Config config_;
Expand Down Expand Up @@ -850,6 +915,7 @@ using LightGBM::Log;
using LightGBM::Network;
using LightGBM::Random;
using LightGBM::ReduceScatterFunction;
using LightGBM::SingleRowPredictor;

// some help functions used to convert data

Expand Down Expand Up @@ -2163,35 +2229,15 @@ int LGBM_BoosterCalcNumPredict(BoosterHandle handle,
API_END();
}

/*!
* \brief Object to store resources meant for single-row Fast Predict methods.
*
* Meant to be used as a basic struct by the *Fast* predict methods only.
* It stores the configuration resources for reuse during prediction.
*
* Even the row function is stored. We score the instance at the same memory
* address all the time. One just replaces the feature values at that address
* and scores again with the *Fast* methods.
*/
struct FastConfig {
FastConfig(Booster *const booster_ptr,
const char *parameter,
const int predict_type_,
const int data_type_,
const int32_t num_cols) : booster(booster_ptr), predict_type(predict_type_), data_type(data_type_), ncol(num_cols) {
config.Set(Config::Str2Map(parameter));
}

Booster* const booster;
Config config;
const int predict_type;
const int data_type;
const int32_t ncol;
};

// Naming: In future versions of LightGBM, public API named around `FastConfig` should be made named around
// `SingleRowPredictor`, because it is specific to single row prediction, and doesn't actually hold only config.
// For now this is kept as `FastConfig` for backwards compatibility.
// At the same time, one should consider removing the old non-fast single row public API that stores its Predictor
// in the Booster, because that will enable removing these Predictors from the Booster, and associated initialization
// code.
int LGBM_FastConfigFree(FastConfigHandle fastConfig) {
API_BEGIN();
delete reinterpret_cast<FastConfig*>(fastConfig);
delete reinterpret_cast<SingleRowPredictor*>(fastConfig);
API_END();
}

Expand Down Expand Up @@ -2339,7 +2385,7 @@ int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
OMP_SET_NUM_THREADS(config.num_threads);
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
ref_booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, config);
ref_booster->SetSingleRowPredictorInner(start_iteration, num_iteration, predict_type, config);
ref_booster->PredictSingleRow(predict_type, static_cast<int32_t>(num_col), get_row_fun, config, out_result, out_len);
API_END();
}
Expand All @@ -2359,18 +2405,14 @@ int LGBM_BoosterPredictForCSRSingleRowFastInit(BoosterHandle handle,
Log::Fatal("The number of columns should be smaller than INT32_MAX.");
}

auto fastConfig_ptr = std::unique_ptr<FastConfig>(new FastConfig(
reinterpret_cast<Booster*>(handle),
parameter,
predict_type,
data_type,
static_cast<int32_t>(num_col)));
Booster* ref_booster = reinterpret_cast<Booster*>(handle);

OMP_SET_NUM_THREADS(fastConfig_ptr->config.num_threads);
std::unique_ptr<SingleRowPredictor> single_row_predictor =
ref_booster->InitSingleRowPredictor(start_iteration, num_iteration, predict_type, data_type, static_cast<int32_t>(num_col), parameter);

fastConfig_ptr->booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, fastConfig_ptr->config);
OMP_SET_NUM_THREADS(single_row_predictor->config.num_threads);

*out_fastConfig = fastConfig_ptr.release();
*out_fastConfig = single_row_predictor.release();
API_END();
}

Expand All @@ -2384,10 +2426,9 @@ int LGBM_BoosterPredictForCSRSingleRowFast(FastConfigHandle fastConfig_handle,
int64_t* out_len,
double* out_result) {
API_BEGIN();
FastConfig *fastConfig = reinterpret_cast<FastConfig*>(fastConfig_handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, fastConfig->data_type, nindptr, nelem);
fastConfig->booster->PredictSingleRow(fastConfig->predict_type, fastConfig->ncol,
get_row_fun, fastConfig->config, out_result, out_len);
SingleRowPredictor *single_row_predictor = reinterpret_cast<SingleRowPredictor*>(fastConfig_handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, single_row_predictor->data_type, nindptr, nelem);
single_row_predictor->Predict(get_row_fun, out_result, out_len);
API_END();
}

Expand Down Expand Up @@ -2502,7 +2543,7 @@ int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
OMP_SET_NUM_THREADS(config.num_threads);
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, ncol, data_type, is_row_major);
ref_booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, config);
ref_booster->SetSingleRowPredictorInner(start_iteration, num_iteration, predict_type, config);
ref_booster->PredictSingleRow(predict_type, ncol, get_row_fun, config, out_result, out_len);
API_END();
}
Expand All @@ -2516,18 +2557,14 @@ int LGBM_BoosterPredictForMatSingleRowFastInit(BoosterHandle handle,
const char* parameter,
FastConfigHandle *out_fastConfig) {
API_BEGIN();
auto fastConfig_ptr = std::unique_ptr<FastConfig>(new FastConfig(
reinterpret_cast<Booster*>(handle),
parameter,
predict_type,
data_type,
ncol));
Booster* ref_booster = reinterpret_cast<Booster*>(handle);

OMP_SET_NUM_THREADS(fastConfig_ptr->config.num_threads);
std::unique_ptr<SingleRowPredictor> single_row_predictor =
ref_booster->InitSingleRowPredictor(predict_type, start_iteration, num_iteration, data_type, ncol, parameter);

fastConfig_ptr->booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, fastConfig_ptr->config);
OMP_SET_NUM_THREADS(single_row_predictor->config.num_threads);

*out_fastConfig = fastConfig_ptr.release();
*out_fastConfig = single_row_predictor.release();
API_END();
}

Expand All @@ -2536,12 +2573,10 @@ int LGBM_BoosterPredictForMatSingleRowFast(FastConfigHandle fastConfig_handle,
int64_t* out_len,
double* out_result) {
API_BEGIN();
FastConfig *fastConfig = reinterpret_cast<FastConfig*>(fastConfig_handle);
SingleRowPredictor *single_row_predictor = reinterpret_cast<SingleRowPredictor*>(fastConfig_handle);
// Single row in row-major format:
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, fastConfig->ncol, fastConfig->data_type, 1);
fastConfig->booster->PredictSingleRow(fastConfig->predict_type, fastConfig->ncol,
get_row_fun, fastConfig->config,
out_result, out_len);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, single_row_predictor->num_cols, single_row_predictor->data_type, 1);
single_row_predictor->Predict(get_row_fun, out_result, out_len);
API_END();
}

Expand Down

0 comments on commit 0a3e1a5

Please sign in to comment.