Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@

== {es} version 7.7.0

=== New Features

* Add instrumentation to report statistics related to data frame analytics jobs, i.e.
progress, memory usage, etc. (See {ml-pull}906[#906].)

=== Enhancements

* Improve initialization of learn rate for better and more stable results in regression
Expand All @@ -39,7 +44,7 @@ and classification. (See {ml-pull}948[#948].)

=== New Features

* Add feature importance values to classification and regression results (using tree
* Add feature importance values to classification and regression results (using tree
SHapley Additive exPlanation, or SHAP). (See {ml-pull}857[#857].)

=== Enhancements
Expand Down
100 changes: 100 additions & 0 deletions include/api/CDataFrameAnalysisInstrumentation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h
#define INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h

#include <core/CProgramCounters.h>
#include <core/CRapidJsonConcurrentLineWriter.h>

#include <maths/CDataFrameAnalysisInstrumentationInterface.h>

#include <api/ImportExport.h>

#include <atomic>
#include <cstdint>

namespace ml {
namespace api {

//! \brief Instrumentation class for collecting data frame analysis job statistics.
//!
//! DESCRIPTION:\n
//! Responsible for collecting data frame analysis job statistics, i.e. memory usage,
//! progress, parameters, quality of results. The class also implements the functionality to
//! write the state at different iteration into the results pipe.
class API_EXPORT CDataFrameAnalysisInstrumentation
: public maths::CDataFrameAnalysisInstrumentationInterface {

public:
CDataFrameAnalysisInstrumentation();

virtual ~CDataFrameAnalysisInstrumentation() = default;

//! Adds \p delta to the memory usage statistics.
void updateMemoryUsage(std::int64_t delta) override;

//! This adds \p fractionalProgess to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
//! at the end of the analysis is equal to one.
//! \note This is converted to an integer - so we can atomically add - by
//! scaling by 1024. Therefore, this shouldn't be called with values less
//! than 0.001. In fact, it is unlikely that such high resolution is needed
//! and typically this would be called significantly less frequently.
void updateProgress(double fractionalProgress) override;
void setToFinished();

//! \return True if the running analysis has finished.
bool finished() const;

//! \return The progress of the analysis in the range [0,1] being an estimate
//! of the proportion of total work complete for a single run.
double progress() const;

//! Reset variables related to the job progress.
void resetProgress();

//! Set pointer to the writer object.
void writer(core::CRapidJsonConcurrentLineWriter* writer);

//! Trigger the next step of the job. This will initiate writing the job state
//! to the results pipe.
void nextStep(std::uint32_t step) override;

//! \return The peak memory usage.
std::int64_t memory() const;

protected:
virtual counter_t::ECounterTypes memoryCounterType() = 0;

private:
void writeProgress(std::uint32_t step);
void writeMemory(std::uint32_t step);
void writeState(uint32_t step);

private:
std::atomic_bool m_Finished;
std::atomic_size_t m_FractionalProgress;
std::atomic<std::int64_t> m_Memory;
core::CRapidJsonConcurrentLineWriter* m_Writer;
};

class API_EXPORT CDataFrameOutliersInstrumentation final
: public CDataFrameAnalysisInstrumentation {
protected:
counter_t::ECounterTypes memoryCounterType() override;
};

class API_EXPORT CDataFrameTrainBoostedTreeInstrumentation final
: public CDataFrameAnalysisInstrumentation {
protected:
counter_t::ECounterTypes memoryCounterType() override;
};
}
}

#endif // INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h
33 changes: 7 additions & 26 deletions include/api/CDataFrameAnalysisRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
#include <core/CProgramCounters.h>
#include <core/CStatePersistInserter.h>

#include <api/CDataFrameAnalysisInstrumentation.h>
#include <api/CInferenceModelDefinition.h>
#include <api/ImportExport.h>

#include <rapidjson/fwd.h>

#include <atomic>
#include <cstddef>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -135,26 +135,23 @@ class API_EXPORT CDataFrameAnalysisRunner {
//! This waits to until the analysis has finished and joins the thread.
void waitToFinish();

//! \return True if the running analysis has finished.
bool finished() const;

//! \return The progress of the analysis in the range [0,1] being an estimate
//! of the proportion of total work complete for a single run.
double progress() const;

//! \return A serialisable definition of the trained model.
virtual TInferenceModelDefinitionUPtr
inferenceModelDefinition(const TStrVec& fieldNames, const TStrVecVec& categoryNames) const;

//! \return Reference to the analysis instrumentation.
virtual const CDataFrameAnalysisInstrumentation& instrumentation() const = 0;
//! \return Reference to the analysis instrumentation.
virtual CDataFrameAnalysisInstrumentation& instrumentation() = 0;

protected:
using TMemoryMonitor = std::function<void(std::int64_t)>;
using TStatePersister =
std::function<void(std::function<void(core::CStatePersistInserter&)>)>;

protected:
const CDataFrameAnalysisSpecification& spec() const;
TProgressRecorder progressRecorder();
TMemoryMonitor memoryMonitor(counter_t::ECounterTypes counter);

std::size_t estimateMemoryUsage(std::size_t totalNumberRows,
std::size_t partitionNumberRows,
std::size_t numberColumns) const;
Expand All @@ -169,27 +166,11 @@ class API_EXPORT CDataFrameAnalysisRunner {
std::size_t partitionNumberRows,
std::size_t numberColumns) const = 0;

//! This adds \p fractionalProgess to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
//! at the end of the analysis is equal to one.
//! \note This is converted to an integer - so we can atomically add - by
//! scaling by 1024. Therefore, this shouldn't be called with values less
//! than 0.001. In fact, it is unlikely that such high resolution is needed
//! and typically this would be called significantly less frequently.
void recordProgress(double fractionalProgress);
void setToFinished();

private:
const CDataFrameAnalysisSpecification& m_Spec;

std::size_t m_NumberPartitions = 0;
std::size_t m_MaximumNumberRowsPerPartition = 0;

std::atomic_bool m_Finished;
std::atomic_size_t m_FractionalProgress;
std::atomic<std::int64_t> m_Memory;

std::thread m_Runner;
};

Expand Down
2 changes: 1 addition & 1 deletion include/api/CDataFrameAnalysisSpecification.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class API_EXPORT CDataFrameAnalysisSpecification {
TDataSearcherUPtr restoreSearcher() const;

//! Get pointer to the analysis runner.
const CDataFrameAnalysisRunner* runner();
CDataFrameAnalysisRunner* runner();

private:
void initializeRunner(const rapidjson::Value& jsonAnalysis);
Expand Down
9 changes: 8 additions & 1 deletion include/api/CDataFrameOutliersRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#define INCLUDED_ml_api_CDataFrameOutliersRunner_h

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisInstrumentation.h>
#include <api/CDataFrameAnalysisRunner.h>

#include <api/ImportExport.h>

#include <rapidjson/fwd.h>
Expand All @@ -32,6 +32,11 @@ class API_EXPORT CDataFrameOutliersRunner final : public CDataFrameAnalysisRunne
CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec,
const CDataFrameAnalysisParameters& parameters);

//! \return Reference to the analysis state.
const CDataFrameAnalysisInstrumentation& instrumentation() const override;
//! \return Reference to the analysis state.
CDataFrameAnalysisInstrumentation& instrumentation() override;

//! This is not intended to be called directly: use CDataFrameOutliersRunnerFactory.
CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec);

Expand Down Expand Up @@ -75,6 +80,8 @@ class API_EXPORT CDataFrameOutliersRunner final : public CDataFrameAnalysisRunne
//! The fraction of true outliers amoung the points.
double m_OutlierFraction = 0.05;
//@}

CDataFrameOutliersInstrumentation m_Instrumentation;
};

//! \brief Makes a core::CDataFrame outlier analysis runner.
Expand Down
7 changes: 7 additions & 0 deletions include/api/CDataFrameTrainBoostedTreeRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <maths/CBasicStatistics.h>

#include <api/CDataFrameAnalysisInstrumentation.h>
#include <api/CDataFrameAnalysisRunner.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/ImportExport.h>
Expand Down Expand Up @@ -63,6 +64,11 @@ class API_EXPORT CDataFrameTrainBoostedTreeRunner : public CDataFrameAnalysisRun

std::size_t topShapValues() const;

//! \return Reference to the analysis state.
const CDataFrameAnalysisInstrumentation& instrumentation() const override;
//! \return Reference to the analysis state.
CDataFrameAnalysisInstrumentation& instrumentation() override;

protected:
using TBoostedTreeUPtr = std::unique_ptr<maths::CBoostedTree>;
using TLossFunctionUPtr = std::unique_ptr<maths::boosted_tree::CLoss>;
Expand Down Expand Up @@ -106,6 +112,7 @@ class API_EXPORT CDataFrameTrainBoostedTreeRunner : public CDataFrameAnalysisRun
std::string m_PredictionFieldName;
TBoostedTreeFactoryUPtr m_BoostedTreeFactory;
TBoostedTreeUPtr m_BoostedTree;
CDataFrameTrainBoostedTreeInstrumentation m_Instrumentation;
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion include/core/CProgramCounters.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ enum ECounterTypes {
//! The estimated peak memory usage for training a predictive model
E_DFTPMEstimatedPeakMemoryUsage = 24,

//! The peak memory usage of outlier detection in bytes
//! The peak memory usage for training a predictive model in bytes
E_DFTPMPeakMemoryUsage = 25,

//! The time in ms to train the model
Expand Down
2 changes: 0 additions & 2 deletions include/maths/CBoostedTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,6 @@ class MATHS_EXPORT CBoostedTree final : public CDataFramePredictiveModel {

private:
CBoostedTree(core::CDataFrame& frame,
TProgressCallback recordProgress,
TMemoryUsageCallback recordMemoryUsage,
TTrainingStateCallback recordTrainingState,
TImplUPtr&& impl);

Expand Down
5 changes: 5 additions & 0 deletions include/maths/CBoostedTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <core/CDataFrame.h>

#include <maths/CBoostedTree.h>
#include <maths/CDataFrameAnalysisInstrumentationInterface.h>
#include <maths/CLinearAlgebra.h>
#include <maths/ImportExport.h>

Expand Down Expand Up @@ -38,6 +39,7 @@ class MATHS_EXPORT CBoostedTreeFactory final {
using TMemoryUsageCallback = CBoostedTree::TMemoryUsageCallback;
using TTrainingStateCallback = CBoostedTree::TTrainingStateCallback;
using TLossFunctionUPtr = CBoostedTree::TLossFunctionUPtr;
using TAnalysisInstrumentationPtr = CDataFrameAnalysisInstrumentationInterface*;

public:
//! Construct a boosted tree object from parameters.
Expand Down Expand Up @@ -96,6 +98,8 @@ class MATHS_EXPORT CBoostedTreeFactory final {
//! Set the number of training examples we need per feature we'll include.
CBoostedTreeFactory& topShapValues(std::size_t topShapValues);

//! Set pointer to the analysis instrumentation.
CBoostedTreeFactory& analysisInstrumentation(TAnalysisInstrumentationPtr instrumentation);
//! Set the callback function for progress monitoring.
CBoostedTreeFactory& progressCallback(TProgressCallback callback);
//! Set the callback function for memory monitoring.
Expand Down Expand Up @@ -215,6 +219,7 @@ class MATHS_EXPORT CBoostedTreeFactory final {
TVector m_LogLeafWeightPenaltyMultiplierSearchInterval;
TVector m_SoftDepthLimitSearchInterval;
TVector m_LogEtaSearchInterval;
TAnalysisInstrumentationPtr m_Instrumentation;
TProgressCallback m_RecordProgress = noopRecordProgress;
TMemoryUsageCallback m_RecordMemoryUsage = noopRecordMemoryUsage;
TTrainingStateCallback m_RecordTrainingState = noopRecordTrainingState;
Expand Down
16 changes: 9 additions & 7 deletions include/maths/CBoostedTreeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <maths/CBasicStatistics.h>
#include <maths/CBoostedTree.h>
#include <maths/CBoostedTreeHyperparameters.h>
#include <maths/CDataFrameAnalysisInstrumentationInterface.h>
#include <maths/CDataFrameCategoryEncoder.h>
#include <maths/CDataFrameUtils.h>
#include <maths/CLinearAlgebraEigen.h>
Expand Down Expand Up @@ -65,33 +66,33 @@ class MATHS_EXPORT CBoostedTreeImpl final {
using TRegularization = CBoostedTreeRegularization<double>;
using TSizeVec = std::vector<std::size_t>;
using TSizeRange = boost::integer_range<std::size_t>;
using TAnalysisInstrumentationPtr = CDataFrameAnalysisInstrumentationInterface*;

public:
static const double MINIMUM_RELATIVE_GAIN_PER_SPLIT;

public:
CBoostedTreeImpl(std::size_t numberThreads, TLossFunctionUPtr loss);
CBoostedTreeImpl(std::size_t numberThreads,
TLossFunctionUPtr loss,
TAnalysisInstrumentationPtr instrumentation = nullptr);

~CBoostedTreeImpl();

CBoostedTreeImpl& operator=(const CBoostedTreeImpl&) = delete;
CBoostedTreeImpl& operator=(CBoostedTreeImpl&&);

//! Train the model on the values in \p frame.
void train(core::CDataFrame& frame,
const TProgressCallback& recordProgress,
const TMemoryUsageCallback& recordMemoryUsage,
const TTrainingStateCallback& recordTrainStateCallback);
void train(core::CDataFrame& frame, const TTrainingStateCallback& recordTrainStateCallback);

//! Write the predictions of the best trained model to \p frame.
//!
//! \note Must be called only if a trained model is available.
void predict(core::CDataFrame& frame, const TProgressCallback& /*recordProgress*/) const;
void predict(core::CDataFrame& frame) const;

//! Compute SHAP values using the best trained model to \p frame.
//!
//! \note Must be called only if a trained model is available.
void computeShapValues(core::CDataFrame& frame, const TProgressCallback&);
void computeShapValues(core::CDataFrame& frame);

//! Get the model produced by training if it has been run.
const TNodeVecVec& trainedModel() const;
Expand Down Expand Up @@ -580,6 +581,7 @@ class MATHS_EXPORT CBoostedTreeImpl final {
std::size_t m_FirstShapColumnIndex = 0;
std::size_t m_LastShapColumnIndex = 0;
std::size_t m_NumberInputColumns = 0;
TAnalysisInstrumentationPtr m_Instrumentation; // no persist/restore

private:
friend class CBoostedTreeFactory;
Expand Down
Loading