From 597bb8f257fceb0ed74ac67fe357d70ad4ee8a2b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 26 Jan 2018 12:11:51 +0100 Subject: [PATCH 01/10] factor out minimumSeasonalVarianceScale (#531) Re-factor minimumSeasonalVarianceScale to make it available as method. This is required in order to restore models from a file stream, more precisely it is required as a parameter for maths::CModelParams which is required for maths::SModelRestoreParams --- include/model/CCountingModelFactory.h | 3 +++ include/model/CEventRateModelFactory.h | 3 +++ include/model/CEventRatePopulationModelFactory.h | 3 +++ include/model/CMetricModelFactory.h | 3 +++ include/model/CMetricPopulationModelFactory.h | 3 +++ include/model/CModelFactory.h | 3 +++ lib/model/CCountingModelFactory.cc | 6 ++++++ lib/model/CEventRateModelFactory.cc | 10 +++++++++- lib/model/CEventRatePopulationModelFactory.cc | 10 +++++++++- lib/model/CMetricModelFactory.cc | 10 +++++++++- lib/model/CMetricPopulationModelFactory.cc | 10 +++++++++- 11 files changed, 60 insertions(+), 4 deletions(-) diff --git a/include/model/CCountingModelFactory.h b/include/model/CCountingModelFactory.h index 736e5406e9..e7c0b429e0 100644 --- a/include/model/CCountingModelFactory.h +++ b/include/model/CCountingModelFactory.h @@ -126,6 +126,9 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRateModelFactory.h b/include/model/CEventRateModelFactory.h index e20b8f25e3..7f1611d6b8 100644 --- a/include/model/CEventRateModelFactory.h +++ b/include/model/CEventRateModelFactory.h @@ -136,6 +136,9 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRatePopulationModelFactory.h b/include/model/CEventRatePopulationModelFactory.h index 9f3225e40d..36aec0d78e 100644 --- a/include/model/CEventRatePopulationModelFactory.h +++ b/include/model/CEventRatePopulationModelFactory.h @@ -138,6 +138,9 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CMetricModelFactory.h b/include/model/CMetricModelFactory.h index 23b2f15f83..2717dd71bb 100644 --- a/include/model/CMetricModelFactory.h +++ b/include/model/CMetricModelFactory.h @@ -139,6 +139,9 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CMetricPopulationModelFactory.h b/include/model/CMetricPopulationModelFactory.h index e4b1984451..5b3aafff69 100644 --- a/include/model/CMetricPopulationModelFactory.h +++ b/include/model/CMetricPopulationModelFactory.h @@ -138,6 +138,9 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CModelFactory.h b/include/model/CModelFactory.h index 5cc96d86d6..d49067f695 100644 --- a/include/model/CModelFactory.h +++ b/include/model/CModelFactory.h @@ -345,6 +345,9 @@ class MODEL_EXPORT CModelFactory { //! component. std::size_t componentSize() const; + // Get the minimum seasonal variance scale, specific to the model + virtual double minimumSeasonalVarianceScale() const = 0; + protected: using TMultivariatePriorPtrVec = std::vector; using TOptionalSearchKey = boost::optional; diff --git a/lib/model/CCountingModelFactory.cc b/lib/model/CCountingModelFactory.cc index 3c83fee99c..83083b18b5 100644 --- a/lib/model/CCountingModelFactory.cc +++ b/lib/model/CCountingModelFactory.cc @@ -178,5 +178,11 @@ CCountingModelFactory::TStrCRefVec CCountingModelFactory::partitioningFields() c } return result; } +double CCountingModelFactory::minimumSeasonalVarianceScale() const +{ + // unused, return something + return 0.0; +} + } } diff --git a/lib/model/CEventRateModelFactory.cc b/lib/model/CEventRateModelFactory.cc index 52b0bab56f..84efec3ff6 100644 --- a/lib/model/CEventRateModelFactory.cc +++ b/lib/model/CEventRateModelFactory.cc @@ -68,7 +68,10 @@ CAnomalyDetectorModel* CEventRateModelFactory::makeModel(const SModelInitializat return new CEventRateModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), + this->defaultFeatureModels(features, + dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), + true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), this->defaultCategoricalPrior(), @@ -286,6 +289,11 @@ void CEventRateModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) m_BucketResultsDelay = bucketResultsDelay; } +double CEventRateModelFactory::minimumSeasonalVarianceScale() const +{ + return 0.4; +} + CEventRateModelFactory::TStrCRefVec CEventRateModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CEventRatePopulationModelFactory.cc b/lib/model/CEventRatePopulationModelFactory.cc index 4489483833..b29e0b3808 100644 --- a/lib/model/CEventRatePopulationModelFactory.cc +++ b/lib/model/CEventRatePopulationModelFactory.cc @@ -68,7 +68,10 @@ CAnomalyDetectorModel* CEventRatePopulationModelFactory::makeModel(const SModelI return new CEventRatePopulationModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), + this->defaultFeatureModels(features, + dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), + false), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); @@ -300,5 +303,10 @@ CEventRatePopulationModelFactory::TStrCRefVec CEventRatePopulationModelFactory:: } return result; } + +double CEventRatePopulationModelFactory::minimumSeasonalVarianceScale() const +{ + return 1.0; +} } } diff --git a/lib/model/CMetricModelFactory.cc b/lib/model/CMetricModelFactory.cc index 2a3c9c799b..22334ee25f 100644 --- a/lib/model/CMetricModelFactory.cc +++ b/lib/model/CMetricModelFactory.cc @@ -67,7 +67,10 @@ CAnomalyDetectorModel* CMetricModelFactory::makeModel(const SModelInitialization return new CMetricModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), + this->defaultFeatureModels(features, + dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), + true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); @@ -287,6 +290,11 @@ void CMetricModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) { m_BucketResultsDelay = bucketResultsDelay; } +double CMetricModelFactory::minimumSeasonalVarianceScale() const +{ + return 0.4; +} + CMetricModelFactory::TStrCRefVec CMetricModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CMetricPopulationModelFactory.cc b/lib/model/CMetricPopulationModelFactory.cc index 74b029a160..5dd88d8035 100644 --- a/lib/model/CMetricPopulationModelFactory.cc +++ b/lib/model/CMetricPopulationModelFactory.cc @@ -66,7 +66,10 @@ CAnomalyDetectorModel* CMetricPopulationModelFactory::makeModel(const SModelInit return new CMetricPopulationModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), + this->defaultFeatureModels(features, + dataGatherer->bucketLength(), + this->minimumSeasonalVarianceScale(), + false), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); @@ -284,6 +287,11 @@ void CMetricPopulationModelFactory::bucketResultsDelay(std::size_t bucketResults m_BucketResultsDelay = bucketResultsDelay; } +double CMetricPopulationModelFactory::minimumSeasonalVarianceScale() const +{ + return 1.0; +} + CMetricPopulationModelFactory::TStrCRefVec CMetricPopulationModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(3); From 0eea33b38d390142c0b281e5111dd3db711abf36 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 23 Mar 2018 15:35:56 +0100 Subject: [PATCH 02/10] implement forecast temporary persistence to scale out for larger models --- bin/autodetect/Makefile | 1 + include/api/CForecastRunner.h | 18 ++ include/core/RestoreMacros.h | 14 ++ include/model/CAnomalyDetector.h | 3 +- include/model/CForecastDataSink.h | 8 +- include/model/CForecastModelPersist.h | 130 ++++++++++++ lib/api/CForecastRunner.cc | 118 ++++++++++- lib/api/dump_state/Makefile | 1 + lib/model/CAnomalyDetector.cc | 54 ++++- lib/model/CForecastDataSink.cc | 15 +- lib/model/CForecastModelPersist.cc | 195 ++++++++++++++++++ lib/model/Makefile | 2 + .../unittest/CForecastModelPersistTest.cc | 140 +++++++++++++ .../unittest/CForecastModelPersistTest.h | 29 +++ lib/model/unittest/Main.cc | 3 +- lib/model/unittest/Makefile | 2 + 16 files changed, 706 insertions(+), 27 deletions(-) create mode 100644 include/model/CForecastModelPersist.h create mode 100644 lib/model/CForecastModelPersist.cc create mode 100644 lib/model/unittest/CForecastModelPersistTest.cc create mode 100644 lib/model/unittest/CForecastModelPersistTest.h diff --git a/bin/autodetect/Makefile b/bin/autodetect/Makefile index a0fa8da061..cf5a227f4b 100644 --- a/bin/autodetect/Makefile +++ b/bin/autodetect/Makefile @@ -21,6 +21,7 @@ INSTALL_DIR=$(CPP_PLATFORM_HOME)/bin ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_BOOST_PROGRAMOPTIONS_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 1e0c4ae923..22801353d4 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -79,6 +80,14 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! max memory allowed to use for forecast models static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB + //! max memory allowed to use for forecast models persisting to disk + static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000; // 500MB + + // Note: This value us lower than on X-pack side to prevent side-effects, + // if you change this value also change the limit on X-pack side. + //! minimum disk space required for disk persistence + static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296; // 4GB + //! minimum time between stat updates to prevent to many updates in a short time static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s @@ -91,6 +100,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const std::string ERROR_NO_CREATE_TIME; static const std::string ERROR_BAD_MEMORY_STATUS; static const std::string ERROR_MEMORY_LIMIT; + static const std::string ERROR_MEMORY_LIMIT_DISK; + static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS; static const std::string ERROR_NO_SUPPORTED_FUNCTIONS; static const std::string WARNING_DURATION_LIMIT; @@ -109,6 +120,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper; using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries; using TForecastResultSeriesVec = std::vector; + using TMathsModelPtr = boost::shared_ptr; using TStrUSet = boost::unordered_set; @@ -194,6 +206,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! A collection storing important messages from forecasting TStrUSet s_Messages; + + //! A directory to persist models on disk + std::string s_TemporaryFolder; }; private: @@ -206,6 +221,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! Check for new jobs, blocks while waiting bool tryGetJob(SForecast& forecastJob); + //! check for sufficient disk space + bool sufficientAvailableDiskSpace(const boost::filesystem::path &path); + //! pushes new jobs into the internal 'queue' (thread boundary) bool push(SForecast& forecastJob); diff --git a/include/core/RestoreMacros.h b/include/core/RestoreMacros.h index 20e344c1de..9030270b17 100644 --- a/include/core/RestoreMacros.h +++ b/include/core/RestoreMacros.h @@ -48,6 +48,20 @@ namespace core { continue; \ } +#define RESTORE_ENUM(tag, target, enumtype, restoreSuccess) \ + if (name == tag) \ + { \ + int value; \ + if (core::CStringUtils::stringToType(traverser.value(), value) == false) \ + { \ + LOG_ERROR("Failed to restore " #tag ", got " << traverser.value()); \ + return false; \ + } \ + target = enumtype(value); \ + restoreSuccess = true; \ + continue; \ + } + #define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \ if (name == tag) { \ setup; \ diff --git a/include/model/CAnomalyDetector.h b/include/model/CAnomalyDetector.h index e6819faa3f..3188bd7d9d 100644 --- a/include/model/CAnomalyDetector.h +++ b/include/model/CAnomalyDetector.h @@ -237,7 +237,8 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable { CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const; //! Generate maths models for forecasting - CForecastDataSink::SForecastResultSeries getForecastModels() const; + CForecastDataSink::SForecastResultSeries getForecastModels(bool persistOnDisk = false, + const std::string &persistenceFolder = EMPTY_STRING) const; //! Remove dead models, i.e. those models that have more-or-less //! reverted back to their non-informative state. BE CAREFUL WHEN diff --git a/include/model/CForecastDataSink.h b/include/model/CForecastDataSink.h index 73335fcb17..a864f4a272 100644 --- a/include/model/CForecastDataSink.h +++ b/include/model/CForecastDataSink.h @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -47,7 +48,7 @@ namespace model { //! to change (e.g. the json writing should not happen in this class). class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { public: - using TMathsModelPtr = std::unique_ptr; + using TMathsModelPtr = boost::shared_ptr; using TStrUMap = boost::unordered_set; //! Wrapper for 1 timeseries model, its feature and by Field @@ -66,18 +67,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { //! Everything that defines 1 series of forecasts struct MODEL_EXPORT SForecastResultSeries { - SForecastResultSeries(); + SForecastResultSeries(const SModelParams &modelParams); SForecastResultSeries(SForecastResultSeries&& other); SForecastResultSeries(const SForecastResultSeries& that) = delete; SForecastResultSeries& operator=(const SForecastResultSeries&) = delete; + SModelParams s_ModelParams; int s_DetectorIndex; std::vector s_ToForecast; + std::string s_ToForecastPersisted; std::string s_PartitionFieldName; std::string s_PartitionFieldValue; std::string s_ByFieldName; + double s_MinimumSeasonalVarianceScale; }; //! \brief Data describing prerequisites prior predictions diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h new file mode 100644 index 0000000000..c3ef750c29 --- /dev/null +++ b/include/model/CForecastModelPersist.h @@ -0,0 +1,130 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#ifndef INCLUDED_ml_model_CForecastModelPersist_h +#define INCLUDED_ml_model_CForecastModelPersist_h + +#include +#include + +#include + +#include +#include +#include + +#include + +#include +#include + +namespace ml +{ +namespace model +{ + +//! \brief Persist/Restore CModel sub-classes to/from text representations for +//! the purpose of forecasting. +//! +//! DESCRIPTION:\n +//! Persists/Restores models to disk for the purpose of restoring and forecasting +//! on them. +//! +//! IMPLEMENTATION DECISIONS:\n +//! Only as complete as required for forecasting. +//! +//! Persist and Restore are only done to avoid heap memory usage using temporary disk space. +//! No need for backwards compatibility and version'ing as code will only be used +//! locally never leaving process/io boundaries. +class MODEL_EXPORT CForecastModelPersist final +{ + public: + using TMathsModelPtr = boost::shared_ptr; + + private: + static const std::string FORECAST_MODEL_PERSIST_TAG; + static const std::string FEATURE_TAG; + static const std::string DATA_TYPE_TAG; + static const std::string MODEL_TAG; + static const std::string BY_FIELD_VALUE_TAG; + + public: + class CPersist final { + public: + explicit CPersist(const std::string &temporaryPath); + + //! add a model to the persistence + void addModel(const maths::CModel *model, + const model_t::EFeature feature, + const std::string &byFieldValue); + + //! close the outputStream + const std::string &finalizePersistAndGetFile(); + + private: + static void persistOneModel(core::CStatePersistInserter &inserter, + const maths::CModel *model, + const model_t::EFeature feature, + const std::string &byFieldValue); + + private: + //! the filename where to persist to + boost::filesystem::path m_FileName; + + //! the actual file where it models are persisted to + std::ofstream m_OutStream; + + //! the persist inserter + std::unique_ptr m_PersistInserter; + }; + + class CRestore final { + public: + explicit CRestore(const SModelParams &modelParams, + double minimumSeasonalVarianceScale, + const std::string &fileName); + + //! add a model to the persistence + bool nextModel(TMathsModelPtr &model, + model_t::EFeature &feature, + std::string &byFieldValue); + + private: + static bool restoreOneModel(core::CStateRestoreTraverser &traverser, + SModelParams modelParams, + double inimumSeasonalVarianceScale, + TMathsModelPtr &model, + model_t::EFeature &feature, + std::string &byFieldValue); + + private: + //! model parameters required in order to restore the model + SModelParams m_ModelParams; + + //! minimum seasonal variance scale specific to the model + double m_MinimumSeasonalVarianceScale; + + //! the actual file where it models are persisted to + std::ifstream m_InStream; + + //! the persist inserter + core::CJsonStateRestoreTraverser m_RestoreTraverser; + }; // class CRestore +}; // class CForecastModelPersist + +} +} + +#endif // INCLUDED_ml_model_CForecastModelPersist_h diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index 41c9049d2b..694600a2dc 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -20,12 +20,14 @@ #include #include +#include #include #include #include -#include +#include +#include #include namespace ml { @@ -45,7 +47,9 @@ const std::string CForecastRunner::ERROR_NO_DATA_PROCESSED("Forecast cannot be executed as job requires data to have been processed and modeled"); const std::string CForecastRunner::ERROR_NO_CREATE_TIME("Forecast create time must be specified and non zero"); const std::string CForecastRunner::ERROR_BAD_MEMORY_STATUS("Forecast cannot be executed as model memory status is not OK"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB while disk space is exceeded"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISK("Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE("Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); const std::string CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS("Forecast is not supported for population analysis"); const std::string CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS("Forecast is not supported for the used functions"); const std::string CForecastRunner::WARNING_DURATION_LIMIT("Forecast duration exceeds internal limit, setting to 8 weeks"); @@ -66,7 +70,8 @@ CForecastRunner::SForecast::SForecast() s_NumberOfModels(0), s_NumberOfForecastableModels(0), s_MemoryUsage(0), - s_Messages() { + s_Messages(), + s_TemporaryFolder() } CForecastRunner::SForecast::SForecast(SForecast&& other) @@ -81,7 +86,8 @@ CForecastRunner::SForecast::SForecast(SForecast&& other) s_NumberOfModels(other.s_NumberOfModels), s_NumberOfForecastableModels(other.s_NumberOfForecastableModels), s_MemoryUsage(other.s_MemoryUsage), - s_Messages(other.s_Messages) { + s_Messages(other.s_Messages), + s_TemporaryFolder(std::move(other.s_TemporaryFolder)) } CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& other) { @@ -97,6 +103,7 @@ CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& ot s_NumberOfForecastableModels = other.s_NumberOfForecastableModels; s_MemoryUsage = other.s_MemoryUsage; s_Messages = other.s_Messages; + s_TemporaryFolder = std::move(other.s_TemporaryFolder); return *this; } @@ -161,6 +168,29 @@ void CForecastRunner::forecastWorker() { // while loops allow us to free up memory for every model right after each forecast is done while (!forecastJob.s_ForecastSeries.empty()) { TForecastResultSeries& series = forecastJob.s_ForecastSeries.back(); + std::unique_ptr modelRestore; + + if (!series.s_ToForecastPersisted.empty()) { + modelRestore.reset(new model::CForecastModelPersist::CRestore (series.s_ModelParams, + series.s_MinimumSeasonalVarianceScale, + series.s_ToForecastPersisted)); + // if in memory models are empty check if we can load persisted ones + if (series.s_ToForecast.empty() && modelRestore) + { + TMathsModelPtr model; + model_t::EFeature feature; + std::string byFieldValue; + + if (modelRestore->nextModel(model, feature, byFieldValue)) + { + series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); + } + else + { + modelRestore.reset(); + } + } + } while (!series.s_ToForecast.empty()) { const TForecastModelWrapper& model = series.s_ToForecast.back(); @@ -201,7 +231,25 @@ void CForecastRunner::forecastWorker() { lastStatsUpdate = elapsedTime; } } + + // if in memory models are empty check if we can load persisted ones + if (series.s_ToForecast.empty() && modelRestore) + { + TMathsModelPtr model; + model_t::EFeature feature; + std::string byFieldValue; + + if (modelRestore->nextModel(model, feature, byFieldValue)) + { + series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); + } + else + { + modelRestore.reset(); + } + } } + modelRestore.reset(); forecastJob.s_ForecastSeries.pop_back(); } // write final message @@ -213,6 +261,13 @@ void CForecastRunner::forecastWorker() { // signal that job is done m_WorkCompleteCondition.notify_all(); + + // cleanup + if (!forecastJob.s_TemporaryFolder.empty()) + { + boost::filesystem::path temporaryFolder (forecastJob.s_TemporaryFolder); + boost::filesystem::remove_all(temporaryFolder); + } } } @@ -279,14 +334,19 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, atLeastOneSupportedFunction = atLeastOneSupportedFunction || prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty ()) // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); return false; } } - if (atLeastOneNonPopulationModel == false) { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) + { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISK); + return false; + } + this->sendErrorMessage(forecastJob, ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS); return false; } @@ -309,7 +369,30 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, this->sendScheduledMessage(forecastJob); // 2nd loop over the detectors to clone models for forecasting - TForecastResultSeriesVec s; + bool persistOnDisk = false; + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) + { + boost::filesystem::path temporaryFolder (forecastJob.s_TemporaryFolder); + + if (!sufficientAvailableDiskSpace(temporaryFolder)) + { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); + return false; + } + + LOG_INFO("Forecast of large model requested (requires " + << std::to_string(1 + (totalMemoryUsage >> 20)) + << " MB), using disk."); + + + boost::filesystem::create_directories(temporaryFolder); + LOG_INFO("Persisting to: " << temporaryFolder.string()); + persistOnDisk = true; + } + else + { + forecastJob.s_TemporaryFolder.clear(); + } for (const auto& detector : detectors) { if (detector.get() == nullptr) { @@ -317,7 +400,9 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, continue; } - forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels()); + forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels( + persistOnDisk, + forecastJob.s_TemporaryFolder)); } return this->push(forecastJob); @@ -360,6 +445,8 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control forecastJob.s_Duration = properties.get("duration", 0); forecastJob.s_CreateTime = properties.get("create_time", 0); + // tmp storage if available + forecastJob.s_TemporaryFolder = properties.get("tmp_storage", EMPTY_STRING); // use -1 as default to allow 0 as 'never expires' expiresIn = properties.get("expires_in", -1l); @@ -390,7 +477,6 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control // to be replaced by https://github.com/elastic/machine-learning-cpp/issues/443 // TODO this is a temporary fix to prevent the analysis blowing up // if you change this value, also change the log string - // todo: refactor validation out from here core_t::TTime maxDuration = 8 * core::constants::WEEK; if (forecastJob.s_Duration > maxDuration) { LOG_INFO(WARNING_DURATION_LIMIT); @@ -419,6 +505,20 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control return true; } +bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path &path) +{ + boost::system::error_code errorCode; + auto spaceInfo = boost::filesystem::space (path, errorCode); + + if (errorCode) + { + LOG_ERROR("Failed to retrieve disk information for " << path << " error " << errorCode.value()); + return false; + } + + return spaceInfo.available > MIN_FORECAST_AVAILABLE_DISK_SPACE; +} + void CForecastRunner::sendScheduledMessage(const SForecast& forecastJob) const { LOG_DEBUG("job passed forecast validation, scheduled for forecasting"); model::CForecastDataSink sink(m_JobId, diff --git a/lib/api/dump_state/Makefile b/lib/api/dump_state/Makefile index a854f159f7..ca1d0e187d 100644 --- a/lib/api/dump_state/Makefile +++ b/lib/api/dump_state/Makefile @@ -19,6 +19,7 @@ TARGET=dump_state$(EXE_EXT) ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/lib/model/CAnomalyDetector.cc b/lib/model/CAnomalyDetector.cc index 0fec8235d6..7e7a4f4207 100644 --- a/lib/model/CAnomalyDetector.cc +++ b/lib/model/CAnomalyDetector.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -495,8 +496,9 @@ CForecastDataSink::SForecastModelPrerequisites CAnomalyDetector::getForecastPrer return prerequisites; } -CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() const { - CForecastDataSink::SForecastResultSeries series; +CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(bool persistOnDisk, + const std::string &persistenceFolder) const + CForecastDataSink::SForecastResultSeries series(m_ModelFactory->modelParams()); if (m_DataGatherer->isPopulation()) { return series; @@ -514,15 +516,47 @@ CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() c series.s_DetectorIndex = m_DetectorIndex; series.s_PartitionFieldName = key.partitionFieldName(); series.s_PartitionFieldValue = m_DataGatherer->partitionFieldValue(); + series.s_MinimumSeasonalVarianceScale = m_ModelFactory->minimumSeasonalVarianceScale(); + + if (persistOnDisk) + { + CForecastModelPersist::CPersist persister(persistenceFolder); + + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) + { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) + { + for (auto feature : view->features()) + { + const maths::CModel *model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) + { + persister.addModel(model, feature, m_DataGatherer->personName(pid)); + } + } + } + } - for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { - // todo: Add terms filtering here - if (m_DataGatherer->isPersonActive(pid)) { - for (auto feature : view->features()) { - const maths::CModel* model = view->model(feature, pid); - if (model != nullptr && model->isForecastPossible()) { - series.s_ToForecast.emplace_back( - feature, CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), m_DataGatherer->personName(pid)); + series.s_ToForecastPersisted = persister.finalizePersistAndGetFile(); + } + else + { + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) + { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) + { + for (auto feature : view->features()) + { + const maths::CModel *model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) + { + series.s_ToForecast.emplace_back( + feature, + CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), + m_DataGatherer->personName(pid)); + } } } } diff --git a/lib/model/CForecastDataSink.cc b/lib/model/CForecastDataSink.cc index cb8805268e..f5bdacf1a1 100644 --- a/lib/model/CForecastDataSink.cc +++ b/lib/model/CForecastDataSink.cc @@ -74,16 +74,23 @@ CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(SForecastModelWr : s_Feature(other.s_Feature), s_ForecastModel(std::move(other.s_ForecastModel)), s_ByFieldValue(std::move(other.s_ByFieldValue)) { } -CForecastDataSink::SForecastResultSeries::SForecastResultSeries() - : s_DetectorIndex(), s_ToForecast(), s_PartitionFieldValue(), s_ByFieldName() { +CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams &modelParams) + :s_ModelParams(modelParams), + s_DetectorIndex(), + s_ToForecastPersisted(), + s_ByFieldName(), + s_MinimumSeasonalVarianceScale(0.0) } CForecastDataSink::SForecastResultSeries::SForecastResultSeries(SForecastResultSeries&& other) - : s_DetectorIndex(other.s_DetectorIndex), + :s_ModelParams(std::move(other.s_ModelParams)), + s_DetectorIndex(other.s_DetectorIndex), s_ToForecast(std::move(other.s_ToForecast)), + s_ToForecastPersisted(std::move(other.s_ToForecastPersisted)), s_PartitionFieldName(std::move(other.s_PartitionFieldName)), s_PartitionFieldValue(std::move(other.s_PartitionFieldValue)), - s_ByFieldName(std::move(other.s_ByFieldName)) { + s_ByFieldName(std::move(other.s_ByFieldName)), + s_MinimumSeasonalVarianceScale(other.s_MinimumSeasonalVarianceScale) } CForecastDataSink::CForecastDataSink(const std::string& jobId, diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc new file mode 100644 index 0000000000..f413053ef7 --- /dev/null +++ b/lib/model/CForecastModelPersist.cc @@ -0,0 +1,195 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include + +namespace ml +{ +namespace model +{ + +const std::string CForecastModelPersist::FORECAST_MODEL_PERSIST_TAG("forecast_persist"); +const std::string CForecastModelPersist::FEATURE_TAG("feature"); +const std::string CForecastModelPersist::DATA_TYPE_TAG("datatype"); +const std::string CForecastModelPersist::MODEL_TAG("model"); +const std::string CForecastModelPersist::BY_FIELD_VALUE_TAG("by_field_value"); + +CForecastModelPersist::CPersist::CPersist(const std::string &temporaryPath): + m_FileName(temporaryPath), + m_OutStream(), + m_PersistInserter() +{ + m_FileName /= boost::filesystem::unique_path("forecast-persist-%%%%-%%%%-%%%%-%%%%"); + m_OutStream.open(m_FileName.string()); + m_PersistInserter.reset(new core::CJsonStatePersistInserter(m_OutStream)); +} + +void CForecastModelPersist::CPersist::addModel(const maths::CModel *model, + const model_t::EFeature feature, + const std::string &byFieldValue) +{ + if (!m_PersistInserter) + { + LOG_ERROR("Internal error: Can not add model to an already closed stream."); + return; + } + + m_PersistInserter->insertLevel(FORECAST_MODEL_PERSIST_TAG, + boost::bind(CForecastModelPersist::CPersist::persistOneModel, + _1, model, feature, byFieldValue)); +} + +void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter &inserter, + const maths::CModel *model, + const model_t::EFeature feature, + const std::string &byFieldValue) +{ + inserter.insertValue(FEATURE_TAG, feature); + inserter.insertValue(DATA_TYPE_TAG, model->dataType()); + inserter.insertValue(BY_FIELD_VALUE_TAG, byFieldValue); + inserter.insertLevel(MODEL_TAG, boost::bind(maths::CModelStateSerialiser(), boost::cref(*model), _1)); +} + +const std::string &CForecastModelPersist::CPersist::finalizePersistAndGetFile() +{ + // destruct m_PersistInserter to force close it + m_PersistInserter.reset(); + m_OutStream.close(); + return m_FileName.string(); +} + +CForecastModelPersist::CRestore::CRestore(const SModelParams &modelParams, + double minimumSeasonalVarianceScale, + const std::string &fileName): + m_ModelParams(modelParams), + m_MinimumSeasonalVarianceScale(minimumSeasonalVarianceScale), + m_InStream(fileName), + m_RestoreTraverser(m_InStream) +{ +} + +bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr &model, + model_t::EFeature &feature, + std::string &byFieldValue) +{ + if (m_RestoreTraverser.isEof()) + { + return false; + } + + if (m_RestoreTraverser.name() != FORECAST_MODEL_PERSIST_TAG) + { + LOG_ERROR("Failed to restore forecast model, unexpected tag"); + return false; + } + + if (!m_RestoreTraverser.hasSubLevel()) { + LOG_ERROR("Failed to restore forecast model, unexpected format"); + return false; + } + + TMathsModelPtr originalModel; + if(!m_RestoreTraverser.traverseSubLevel(boost::bind(CForecastModelPersist::CRestore::restoreOneModel, + _1, + boost::cref(m_ModelParams), + m_MinimumSeasonalVarianceScale, + boost::ref(originalModel), + boost::ref(feature), + boost::ref(byFieldValue)))) + { + LOG_ERROR("Failed to restore forecast model, internal error"); + return false; + } + + model.reset(originalModel->cloneForForecast()); + + m_RestoreTraverser.next(); + return true; +} + +bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTraverser &traverser, + const SModelParams modelParams, + double minimumSeasonalVarianceScale, + TMathsModelPtr &model, + model_t::EFeature &feature, + std::string &byFieldValue) +{ + // reset all + model.reset(); + bool restoredFeature = false; + bool restoredDataType = false; + byFieldValue.clear(); + maths_t::EDataType dataType; + + do + { + const std::string &name = traverser.name(); + RESTORE_ENUM(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) + RESTORE_ENUM(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) + RESTORE_BUILT_IN(BY_FIELD_VALUE_TAG, byFieldValue) + if (name == MODEL_TAG) + { + if (!restoredDataType) + { + LOG_ERROR("Failed to restore forecast model, datatype missing"); + return false; + } + + maths::SModelRestoreParams params{ + maths::CModelParams(modelParams.s_BucketLength, + modelParams.s_LearnRate, + modelParams.s_DecayRate, + minimumSeasonalVarianceScale + ), + maths::STimeSeriesDecompositionRestoreParams{ + modelParams.s_DecayRate, + modelParams.s_BucketLength, + modelParams.s_ComponentSize}, + modelParams.distributionRestoreParams(dataType)}; + + if(!traverser.traverseSubLevel(boost::bind(maths::CModelStateSerialiser(), + boost::cref(params), boost::ref(model), _1))) + { + LOG_ERROR("Failed to restore forecast model, model missing"); + return false; + } + } + } + while (traverser.next()); + + // only the by_field_value can be empty + if (!model || !restoredFeature || !restoredDataType) + { + LOG_ERROR("Failed to restore forecast model, data missing"); + return false; + } + + return true; +} + +} /* namespace model */ +} /* namespace ml */ diff --git a/lib/model/Makefile b/lib/model/Makefile index 3a20287f4a..76c8a5b0d7 100644 --- a/lib/model/Makefile +++ b/lib/model/Makefile @@ -17,6 +17,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=$(OBJS_DIR)/libMlModel$(DYNAMIC_LIB_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -47,6 +48,7 @@ CEventRatePopulationModel.cc \ CEventRatePopulationModelFactory.cc \ CFeatureData.cc \ CForecastDataSink.cc \ +CForecastModelPersist.cc \ CGathererTools.cc \ CHierarchicalResults.cc \ CHierarchicalResultsAggregator.cc \ diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc new file mode 100644 index 0000000000..c4de906459 --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -0,0 +1,140 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#include "CForecastModelPersistTest.h" + +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +using namespace ml; +using namespace model; + +void CForecastModelPersistTest::testPersistAndRestore() +{ + LOG_DEBUG("+----------------------------------------------------+"); + LOG_DEBUG("| CForecastModelPersistTest::testPersistAndRestore |"); + LOG_DEBUG("+----------------------------------------------------+"); + + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + params.s_DecayRate = 0.001; + params.s_LearnRate = 1.0; + maths::CTimeSeriesDecomposition trend(params.s_DecayRate, bucketLength); + maths::CNormalMeanPrecConjugate prior{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_ContinuousData, params.s_DecayRate)}; + maths::CModelParams timeSeriesModelParams{bucketLength, + params.s_LearnRate, + params.s_DecayRate, + minimumSeasonalVarianceScale}; + maths::CUnivariateTimeSeriesModel timeSeriesModel{timeSeriesModelParams, 1, trend, prior}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + persister.addModel(&timeSeriesModel, + model_t::EFeature::E_IndividualCountByBucketAndPerson, + "some_by_field"); + + maths::CNormalMeanPrecConjugate otherPrior{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_MixedData, params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModel{timeSeriesModelParams, 2, trend, otherPrior}; + + persister.addModel(&otherTimeSeriesModel, + model_t::EFeature::E_IndividualLowMeanByPerson, + "some_other_by_field"); + + maths::CNormalMeanPrecConjugate otherPriorEmptyByField{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_DiscreteData, params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModelEmptyByField{timeSeriesModelParams, 3, trend, otherPriorEmptyByField}; + + persister.addModel(&otherTimeSeriesModelEmptyByField, + model_t::EFeature::E_IndividualHighMedianByPerson, + ""); + std::string persistedModels = persister.finalizePersistAndGetFile(); + + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + // test timeSeriesModel + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualCountByBucketAndPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); + + CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast {timeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModel + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualLowMeanByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_other_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast {otherTimeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModelEmptyByField + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualHighMedianByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string(), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast + {otherTimeSeriesModelEmptyByField.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), + restoredModel->checksum(42)); + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + std::remove(persistedModels.c_str()); +} + +CppUnit::Test* CForecastModelPersistTest::suite(void) +{ + CppUnit::TestSuite *suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); + + suiteOfTests->addTest( new CppUnit::TestCaller( + "CForecastModelPersistTest::testPersistAndRestore", + &CForecastModelPersistTest::testPersistAndRestore) ); + + return suiteOfTests; +} diff --git a/lib/model/unittest/CForecastModelPersistTest.h b/lib/model/unittest/CForecastModelPersistTest.h new file mode 100644 index 0000000000..d0313a98b6 --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.h @@ -0,0 +1,29 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#ifndef INCLUDED_CForecastModelPersistTest_h +#define INCLUDED_CForecastModelPersistTest_h + +#include + +class CForecastModelPersistTest : public CppUnit::TestFixture +{ + public: + void testPersistAndRestore(); + + static CppUnit::Test *suite(); +}; + +#endif // INCLUDED_CForecastModelPersistTest_h diff --git a/lib/model/unittest/Main.cc b/lib/model/unittest/Main.cc index e2bae3f34f..ed76b39f8d 100644 --- a/lib/model/unittest/Main.cc +++ b/lib/model/unittest/Main.cc @@ -25,7 +25,7 @@ #include "CEventRateAnomalyDetectorTest.h" #include "CEventRateDataGathererTest.h" #include "CEventRateModelTest.h" -#include "CEventRatePopulationDataGathererTest.h" +#include "CForecastModelPersistTest.h" #include "CEventRatePopulationModelTest.h" #include "CFunctionTypesTest.h" #include "CGathererToolsTest.h" @@ -67,6 +67,7 @@ int main(int argc, const char** argv) { runner.addTest(CEventRatePopulationDataGathererTest::suite()); runner.addTest(CEventRatePopulationModelTest::suite()); runner.addTest(CFunctionTypesTest::suite()); + runner.addTest( CForecastModelPersistTest::suite() ); runner.addTest(CGathererToolsTest::suite()); runner.addTest(CHierarchicalResultsTest::suite()); runner.addTest(CHierarchicalResultsLevelSetTest::suite()); diff --git a/lib/model/unittest/Makefile b/lib/model/unittest/Makefile index 67bd4417f8..a34c137f36 100644 --- a/lib/model/unittest/Makefile +++ b/lib/model/unittest/Makefile @@ -17,6 +17,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=ml_test$(EXE_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -39,6 +40,7 @@ SRCS=\ CEventRateModelTest.cc \ CEventRatePopulationDataGathererTest.cc \ CEventRatePopulationModelTest.cc \ + CForecastModelPersistTest.cc \ CFunctionTypesTest.cc \ CGathererToolsTest.cc \ CHierarchicalResultsTest.cc \ From ab53802945c7345d25f69085d6dc59df9d14992a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Mar 2018 14:07:50 +0200 Subject: [PATCH 03/10] refactor restore macro for enums --- include/core/RestoreMacros.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/include/core/RestoreMacros.h b/include/core/RestoreMacros.h index 9030270b17..9cb2e74945 100644 --- a/include/core/RestoreMacros.h +++ b/include/core/RestoreMacros.h @@ -48,7 +48,7 @@ namespace core { continue; \ } -#define RESTORE_ENUM(tag, target, enumtype, restoreSuccess) \ +#define RESTORE_ENUM(tag, target, enumtype) \ if (name == tag) \ { \ int value; \ @@ -58,10 +58,16 @@ namespace core { return false; \ } \ target = enumtype(value); \ - restoreSuccess = true; \ continue; \ } +#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \ + if (name == tag) \ + { \ + restoreSuccess = true; \ + RESTORE_ENUM(tag, target, enumtype) \ + } + #define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \ if (name == tag) { \ setup; \ From 05127bc9b5e428148c57f9c998e1e42ba6db0681 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Mar 2018 14:08:33 +0200 Subject: [PATCH 04/10] move constants out of header --- include/model/CForecastModelPersist.h | 7 ------- lib/model/CForecastModelPersist.cc | 16 +++++++++------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h index c3ef750c29..573f3f9c4d 100644 --- a/include/model/CForecastModelPersist.h +++ b/include/model/CForecastModelPersist.h @@ -53,13 +53,6 @@ class MODEL_EXPORT CForecastModelPersist final public: using TMathsModelPtr = boost::shared_ptr; - private: - static const std::string FORECAST_MODEL_PERSIST_TAG; - static const std::string FEATURE_TAG; - static const std::string DATA_TYPE_TAG; - static const std::string MODEL_TAG; - static const std::string BY_FIELD_VALUE_TAG; - public: class CPersist final { public: diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc index f413053ef7..6b2a74630e 100644 --- a/lib/model/CForecastModelPersist.cc +++ b/lib/model/CForecastModelPersist.cc @@ -32,11 +32,13 @@ namespace ml namespace model { -const std::string CForecastModelPersist::FORECAST_MODEL_PERSIST_TAG("forecast_persist"); -const std::string CForecastModelPersist::FEATURE_TAG("feature"); -const std::string CForecastModelPersist::DATA_TYPE_TAG("datatype"); -const std::string CForecastModelPersist::MODEL_TAG("model"); -const std::string CForecastModelPersist::BY_FIELD_VALUE_TAG("by_field_value"); +namespace { +static const std::string FORECAST_MODEL_PERSIST_TAG("forecast_persist"); +static const std::string FEATURE_TAG("feature"); +static const std::string DATA_TYPE_TAG("datatype"); +static const std::string MODEL_TAG("model"); +static const std::string BY_FIELD_VALUE_TAG("by_field_value"); +} CForecastModelPersist::CPersist::CPersist(const std::string &temporaryPath): m_FileName(temporaryPath), @@ -148,8 +150,8 @@ bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTravers do { const std::string &name = traverser.name(); - RESTORE_ENUM(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) - RESTORE_ENUM(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) + RESTORE_ENUM_CHECKED(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) + RESTORE_ENUM_CHECKED(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) RESTORE_BUILT_IN(BY_FIELD_VALUE_TAG, byFieldValue) if (name == MODEL_TAG) { From d6e376a7335bf629e8ead1ae42344c8053e17c4d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Mar 2018 14:08:55 +0200 Subject: [PATCH 05/10] address review comments --- include/api/CForecastRunner.h | 14 +++++--- lib/api/CForecastRunner.cc | 68 ++++++++++++++++++----------------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 22801353d4..4c85eb6a3f 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -78,18 +78,22 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY; //! max memory allowed to use for forecast models - static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB + static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB + // Note: This value measures the size in memory, not the size of the persistence, + // which is likely higher and would be hard to calculate upfront //! max memory allowed to use for forecast models persisting to disk - static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000; // 500MB + static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB - // Note: This value us lower than on X-pack side to prevent side-effects, + // Note: This value is lower than on X-pack side to prevent side-effects, // if you change this value also change the limit on X-pack side. + // The purpose of this value is to guard the rest of the system regarding + // an out of disk space //! minimum disk space required for disk persistence - static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296; // 4GB + static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB //! minimum time between stat updates to prevent to many updates in a short time - static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s + static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s private: static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE; diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index 694600a2dc..d29d211ce4 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -170,12 +170,17 @@ void CForecastRunner::forecastWorker() { TForecastResultSeries& series = forecastJob.s_ForecastSeries.back(); std::unique_ptr modelRestore; + // initialize persistence restore exactly once if (!series.s_ToForecastPersisted.empty()) { - modelRestore.reset(new model::CForecastModelPersist::CRestore (series.s_ModelParams, + modelRestore.reset(new model::CForecastModelPersist::CRestore(series.s_ModelParams, series.s_MinimumSeasonalVarianceScale, series.s_ToForecastPersisted)); - // if in memory models are empty check if we can load persisted ones - if (series.s_ToForecast.empty() && modelRestore) + } + + while (series.s_ToForecast.empty() == false && modelRestore != nullptr) + { + // check if we should backfill from persistence + if (series.s_ToForecast.empty()) { TMathsModelPtr model; model_t::EFeature feature; @@ -185,14 +190,13 @@ void CForecastRunner::forecastWorker() { { series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); } - else + else // restorer exhausted, no need for further restoring { modelRestore.reset(); + break; } } - } - while (!series.s_ToForecast.empty()) { const TForecastModelWrapper& model = series.s_ToForecast.back(); model_t::TDouble1VecDouble1VecPr support = model_t::support(model.s_Feature); bool success = model.s_ForecastModel->forecast(forecastJob.s_StartTime, @@ -231,25 +235,7 @@ void CForecastRunner::forecastWorker() { lastStatsUpdate = elapsedTime; } } - - // if in memory models are empty check if we can load persisted ones - if (series.s_ToForecast.empty() && modelRestore) - { - TMathsModelPtr model; - model_t::EFeature feature; - std::string byFieldValue; - - if (modelRestore->nextModel(model, feature, byFieldValue)) - { - series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); - } - else - { - modelRestore.reset(); - } - } } - modelRestore.reset(); forecastJob.s_ForecastSeries.pop_back(); } // write final message @@ -265,8 +251,16 @@ void CForecastRunner::forecastWorker() { // cleanup if (!forecastJob.s_TemporaryFolder.empty()) { - boost::filesystem::path temporaryFolder (forecastJob.s_TemporaryFolder); - boost::filesystem::remove_all(temporaryFolder); + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); + boost::system::error_code errorCode; + boost::filesystem::remove_all(temporaryFolder, errorCode); + if (errorCode) + { + // not an error: there is also cleanup code on X-pack side + LOG_WARN("Failed to cleanup temporary data from: " << forecastJob.s_TemporaryFolder << " error " << errorCode.message()); + return; + } + } } } @@ -334,7 +328,7 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, atLeastOneSupportedFunction = atLeastOneSupportedFunction || prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty ()) + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty()) // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); return false; @@ -372,9 +366,9 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, bool persistOnDisk = false; if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { - boost::filesystem::path temporaryFolder (forecastJob.s_TemporaryFolder); + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); - if (!sufficientAvailableDiskSpace(temporaryFolder)) + if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) { this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); return false; @@ -384,9 +378,17 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, << std::to_string(1 + (totalMemoryUsage >> 20)) << " MB), using disk."); + boost::system::error_code errorCode; + boost::filesystem::create_directories(temporaryFolder, errorCode); + if (errorCode) + { + this->sendErrorMessage(forecastJob, + "Forecast internal error, failed to create temporary folder " + + temporaryFolder.string() + " error: " + errorCode.message()); + return false; + } - boost::filesystem::create_directories(temporaryFolder); - LOG_INFO("Persisting to: " << temporaryFolder.string()); + LOG_DEBUG("Persisting to: " << temporaryFolder.string()); persistOnDisk = true; } else @@ -508,11 +510,11 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path &path) { boost::system::error_code errorCode; - auto spaceInfo = boost::filesystem::space (path, errorCode); + auto spaceInfo = boost::filesystem::space(path, errorCode); if (errorCode) { - LOG_ERROR("Failed to retrieve disk information for " << path << " error " << errorCode.value()); + LOG_ERROR("Failed to retrieve disk information for " << path << " error " << errorCode.message()); return false; } From 0d88f308453fbf33de4f92515140b8f655ee2eee Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Mar 2018 14:15:26 +0200 Subject: [PATCH 06/10] improve unit test --- .../unittest/CForecastModelPersistTest.cc | 108 +++++++++--------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc index c4de906459..8ec2b4d3a5 100644 --- a/lib/model/unittest/CForecastModelPersistTest.cc +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -37,7 +37,7 @@ void CForecastModelPersistTest::testPersistAndRestore() LOG_DEBUG("| CForecastModelPersistTest::testPersistAndRestore |"); LOG_DEBUG("+----------------------------------------------------+"); - core_t::TTime bucketLength{1800}; + core_t::TTime bucketLength{1800}; double minimumSeasonalVarianceScale = 0.2; SModelParams params{bucketLength}; params.s_DecayRate = 0.001; @@ -73,58 +73,60 @@ void CForecastModelPersistTest::testPersistAndRestore() ""); std::string persistedModels = persister.finalizePersistAndGetFile(); - CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); - CForecastModelPersist::TMathsModelPtr restoredModel; - std::string restoredByFieldValue; - model_t::EFeature restoredFeature; - - // test timeSeriesModel - CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); - - CPPUNIT_ASSERT(restoredModel); - CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualCountByBucketAndPerson, restoredFeature); - CPPUNIT_ASSERT_EQUAL(std::string("some_by_field"), restoredByFieldValue); - CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); - CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); - CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); - - CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast {timeSeriesModel.cloneForForecast()}; - CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); - CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); - CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); - - CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); - - // test otherTimeSeriesModel - restoredModel.reset(); - CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); - CPPUNIT_ASSERT(restoredModel); - CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualLowMeanByPerson, restoredFeature); - CPPUNIT_ASSERT_EQUAL(std::string("some_other_by_field"), restoredByFieldValue); - CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); - CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); - CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); - CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast {otherTimeSeriesModel.cloneForForecast()}; - CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); - CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); - CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); - CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); - - // test otherTimeSeriesModelEmptyByField - restoredModel.reset(); - CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); - CPPUNIT_ASSERT(restoredModel); - CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualHighMedianByPerson, restoredFeature); - CPPUNIT_ASSERT_EQUAL(std::string(), restoredByFieldValue); - CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); - CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); - CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); - CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast - {otherTimeSeriesModelEmptyByField.cloneForForecast()}; - CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), - restoredModel->checksum(42)); - - CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + // test timeSeriesModel + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualCountByBucketAndPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); + + CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast {timeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModel + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualLowMeanByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_other_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast {otherTimeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModelEmptyByField + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualHighMedianByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string(), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast + {otherTimeSeriesModelEmptyByField.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), + restoredModel->checksum(42)); + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } std::remove(persistedModels.c_str()); } From c1ea3e3f53d6f41a98740f9ffbaac2b73d22157f Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Mar 2018 16:17:19 +0200 Subject: [PATCH 07/10] fix logic --- lib/api/CForecastRunner.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index d29d211ce4..b6cbddb277 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -177,7 +177,7 @@ void CForecastRunner::forecastWorker() { series.s_ToForecastPersisted)); } - while (series.s_ToForecast.empty() == false && modelRestore != nullptr) + while (series.s_ToForecast.empty() == false || modelRestore != nullptr) { // check if we should backfill from persistence if (series.s_ToForecast.empty()) From 2843468eed2dcc3b3335348a7821c43a4359ba56 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 5 Apr 2018 14:14:39 +0200 Subject: [PATCH 08/10] apply clang-format --- include/api/CForecastRunner.h | 38 +++--- include/core/RestoreMacros.h | 33 ++--- include/model/CAnomalyDetector.h | 2 +- include/model/CCountingModelFactory.h | 4 +- include/model/CEventRateModelFactory.h | 4 +- .../model/CEventRatePopulationModelFactory.h | 4 +- include/model/CForecastDataSink.h | 12 +- include/model/CForecastModelPersist.h | 128 ++++++++---------- include/model/CMetricModelFactory.h | 4 +- include/model/CMetricPopulationModelFactory.h | 4 +- include/model/CModelFactory.h | 4 +- lib/api/CForecastRunner.cc | 82 +++++------ lib/model/CAnomalyDetector.cc | 41 ++---- lib/model/CCountingModelFactory.cc | 4 +- lib/model/CEventRateModelFactory.cc | 21 ++- lib/model/CEventRatePopulationModelFactory.cc | 19 ++- lib/model/CForecastDataSink.cc | 18 +-- lib/model/CForecastModelPersist.cc | 119 ++++++---------- lib/model/CHierarchicalResults.cc | 3 +- lib/model/CMetricModelFactory.cc | 8 +- lib/model/CMetricPopulationModelFactory.cc | 19 ++- .../unittest/CForecastModelPersistTest.cc | 43 ++---- .../unittest/CForecastModelPersistTest.h | 9 +- lib/model/unittest/Main.cc | 4 +- 24 files changed, 261 insertions(+), 366 deletions(-) diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 4c85eb6a3f..5e4396c6b0 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -78,22 +78,22 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY; //! max memory allowed to use for forecast models - static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB + static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB - // Note: This value measures the size in memory, not the size of the persistence, - // which is likely higher and would be hard to calculate upfront - //! max memory allowed to use for forecast models persisting to disk - static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB + // Note: This value measures the size in memory, not the size of the persistence, + // which is likely higher and would be hard to calculate upfront + //! max memory allowed to use for forecast models persisting to disk + static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB - // Note: This value is lower than on X-pack side to prevent side-effects, - // if you change this value also change the limit on X-pack side. - // The purpose of this value is to guard the rest of the system regarding - // an out of disk space - //! minimum disk space required for disk persistence - static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB + // Note: This value is lower than on X-pack side to prevent side-effects, + // if you change this value also change the limit on X-pack side. + // The purpose of this value is to guard the rest of the system regarding + // an out of disk space + //! minimum disk space required for disk persistence + static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB //! minimum time between stat updates to prevent to many updates in a short time - static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s + static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s private: static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE; @@ -104,8 +104,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const std::string ERROR_NO_CREATE_TIME; static const std::string ERROR_BAD_MEMORY_STATUS; static const std::string ERROR_MEMORY_LIMIT; - static const std::string ERROR_MEMORY_LIMIT_DISK; - static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; + static const std::string ERROR_MEMORY_LIMIT_DISK; + static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS; static const std::string ERROR_NO_SUPPORTED_FUNCTIONS; static const std::string WARNING_DURATION_LIMIT; @@ -124,7 +124,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper; using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries; using TForecastResultSeriesVec = std::vector; - using TMathsModelPtr = boost::shared_ptr; + using TMathsModelPtr = boost::shared_ptr; using TStrUSet = boost::unordered_set; @@ -211,8 +211,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! A collection storing important messages from forecasting TStrUSet s_Messages; - //! A directory to persist models on disk - std::string s_TemporaryFolder; + //! A directory to persist models on disk + std::string s_TemporaryFolder; }; private: @@ -225,8 +225,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! Check for new jobs, blocks while waiting bool tryGetJob(SForecast& forecastJob); - //! check for sufficient disk space - bool sufficientAvailableDiskSpace(const boost::filesystem::path &path); + //! check for sufficient disk space + bool sufficientAvailableDiskSpace(const boost::filesystem::path& path); //! pushes new jobs into the internal 'queue' (thread boundary) bool push(SForecast& forecastJob); diff --git a/include/core/RestoreMacros.h b/include/core/RestoreMacros.h index 9cb2e74945..4d8a903c7d 100644 --- a/include/core/RestoreMacros.h +++ b/include/core/RestoreMacros.h @@ -48,25 +48,22 @@ namespace core { continue; \ } -#define RESTORE_ENUM(tag, target, enumtype) \ - if (name == tag) \ - { \ - int value; \ - if (core::CStringUtils::stringToType(traverser.value(), value) == false) \ - { \ - LOG_ERROR("Failed to restore " #tag ", got " << traverser.value()); \ - return false; \ - } \ - target = enumtype(value); \ - continue; \ - } +#define RESTORE_ENUM(tag, target, enumtype) \ + if (name == tag) { \ + int value; \ + if (core::CStringUtils::stringToType(traverser.value(), value) == false) { \ + LOG_ERROR("Failed to restore " #tag ", got " << traverser.value()); \ + return false; \ + } \ + target = enumtype(value); \ + continue; \ + } -#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \ - if (name == tag) \ - { \ - restoreSuccess = true; \ - RESTORE_ENUM(tag, target, enumtype) \ - } +#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \ + if (name == tag) { \ + restoreSuccess = true; \ + RESTORE_ENUM(tag, target, enumtype) \ + } #define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \ if (name == tag) { \ diff --git a/include/model/CAnomalyDetector.h b/include/model/CAnomalyDetector.h index 3188bd7d9d..a39026f843 100644 --- a/include/model/CAnomalyDetector.h +++ b/include/model/CAnomalyDetector.h @@ -238,7 +238,7 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable { //! Generate maths models for forecasting CForecastDataSink::SForecastResultSeries getForecastModels(bool persistOnDisk = false, - const std::string &persistenceFolder = EMPTY_STRING) const; + const std::string& persistenceFolder = EMPTY_STRING) const; //! Remove dead models, i.e. those models that have more-or-less //! reverted back to their non-informative state. BE CAREFUL WHEN diff --git a/include/model/CCountingModelFactory.h b/include/model/CCountingModelFactory.h index e7c0b429e0..74080ee9b4 100644 --- a/include/model/CCountingModelFactory.h +++ b/include/model/CCountingModelFactory.h @@ -126,8 +126,8 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} - //! Get the minimum seasonal variance scale - virtual double minimumSeasonalVarianceScale() const; + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; private: //! Get the field values which partition the data for modeling. diff --git a/include/model/CEventRateModelFactory.h b/include/model/CEventRateModelFactory.h index 7f1611d6b8..9ee3cabe32 100644 --- a/include/model/CEventRateModelFactory.h +++ b/include/model/CEventRateModelFactory.h @@ -136,8 +136,8 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} - //! Get the minimum seasonal variance scale - virtual double minimumSeasonalVarianceScale() const; + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; private: //! Get the field values which partition the data for modeling. diff --git a/include/model/CEventRatePopulationModelFactory.h b/include/model/CEventRatePopulationModelFactory.h index 36aec0d78e..35cce6c6df 100644 --- a/include/model/CEventRatePopulationModelFactory.h +++ b/include/model/CEventRatePopulationModelFactory.h @@ -138,8 +138,8 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} - //! Get the minimum seasonal variance scale - virtual double minimumSeasonalVarianceScale() const; + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; private: //! Get the field values which partition the data for modeling. diff --git a/include/model/CForecastDataSink.h b/include/model/CForecastDataSink.h index a864f4a272..c89e3094d8 100644 --- a/include/model/CForecastDataSink.h +++ b/include/model/CForecastDataSink.h @@ -23,9 +23,9 @@ #include +#include #include #include -#include #include #include @@ -48,7 +48,7 @@ namespace model { //! to change (e.g. the json writing should not happen in this class). class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { public: - using TMathsModelPtr = boost::shared_ptr; + using TMathsModelPtr = boost::shared_ptr; using TStrUMap = boost::unordered_set; //! Wrapper for 1 timeseries model, its feature and by Field @@ -67,21 +67,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { //! Everything that defines 1 series of forecasts struct MODEL_EXPORT SForecastResultSeries { - SForecastResultSeries(const SModelParams &modelParams); + SForecastResultSeries(const SModelParams& modelParams); SForecastResultSeries(SForecastResultSeries&& other); SForecastResultSeries(const SForecastResultSeries& that) = delete; SForecastResultSeries& operator=(const SForecastResultSeries&) = delete; - SModelParams s_ModelParams; + SModelParams s_ModelParams; int s_DetectorIndex; std::vector s_ToForecast; - std::string s_ToForecastPersisted; + std::string s_ToForecastPersisted; std::string s_PartitionFieldName; std::string s_PartitionFieldValue; std::string s_ByFieldName; - double s_MinimumSeasonalVarianceScale; + double s_MinimumSeasonalVarianceScale; }; //! \brief Data describing prerequisites prior predictions diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h index 573f3f9c4d..60d026d697 100644 --- a/include/model/CForecastModelPersist.h +++ b/include/model/CForecastModelPersist.h @@ -30,10 +30,8 @@ #include #include -namespace ml -{ -namespace model -{ +namespace ml { +namespace model { //! \brief Persist/Restore CModel sub-classes to/from text representations for //! the purpose of forecasting. @@ -48,75 +46,67 @@ namespace model //! Persist and Restore are only done to avoid heap memory usage using temporary disk space. //! No need for backwards compatibility and version'ing as code will only be used //! locally never leaving process/io boundaries. -class MODEL_EXPORT CForecastModelPersist final -{ +class MODEL_EXPORT CForecastModelPersist final { +public: + using TMathsModelPtr = boost::shared_ptr; + +public: + class CPersist final { public: - using TMathsModelPtr = boost::shared_ptr; + explicit CPersist(const std::string& temporaryPath); + + //! add a model to the persistence + void addModel(const maths::CModel* model, const model_t::EFeature feature, const std::string& byFieldValue); + + //! close the outputStream + const std::string& finalizePersistAndGetFile(); + + private: + static void persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue); + + private: + //! the filename where to persist to + boost::filesystem::path m_FileName; + + //! the actual file where it models are persisted to + std::ofstream m_OutStream; + //! the persist inserter + std::unique_ptr m_PersistInserter; + }; + + class CRestore final { public: - class CPersist final { - public: - explicit CPersist(const std::string &temporaryPath); - - //! add a model to the persistence - void addModel(const maths::CModel *model, - const model_t::EFeature feature, - const std::string &byFieldValue); - - //! close the outputStream - const std::string &finalizePersistAndGetFile(); - - private: - static void persistOneModel(core::CStatePersistInserter &inserter, - const maths::CModel *model, - const model_t::EFeature feature, - const std::string &byFieldValue); - - private: - //! the filename where to persist to - boost::filesystem::path m_FileName; - - //! the actual file where it models are persisted to - std::ofstream m_OutStream; - - //! the persist inserter - std::unique_ptr m_PersistInserter; - }; - - class CRestore final { - public: - explicit CRestore(const SModelParams &modelParams, - double minimumSeasonalVarianceScale, - const std::string &fileName); - - //! add a model to the persistence - bool nextModel(TMathsModelPtr &model, - model_t::EFeature &feature, - std::string &byFieldValue); - - private: - static bool restoreOneModel(core::CStateRestoreTraverser &traverser, - SModelParams modelParams, - double inimumSeasonalVarianceScale, - TMathsModelPtr &model, - model_t::EFeature &feature, - std::string &byFieldValue); - - private: - //! model parameters required in order to restore the model - SModelParams m_ModelParams; - - //! minimum seasonal variance scale specific to the model - double m_MinimumSeasonalVarianceScale; - - //! the actual file where it models are persisted to - std::ifstream m_InStream; - - //! the persist inserter - core::CJsonStateRestoreTraverser m_RestoreTraverser; - }; // class CRestore -}; // class CForecastModelPersist + explicit CRestore(const SModelParams& modelParams, double minimumSeasonalVarianceScale, const std::string& fileName); + + //! add a model to the persistence + bool nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue); + + private: + static bool restoreOneModel(core::CStateRestoreTraverser& traverser, + SModelParams modelParams, + double inimumSeasonalVarianceScale, + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue); + + private: + //! model parameters required in order to restore the model + SModelParams m_ModelParams; + + //! minimum seasonal variance scale specific to the model + double m_MinimumSeasonalVarianceScale; + + //! the actual file where it models are persisted to + std::ifstream m_InStream; + //! the persist inserter + core::CJsonStateRestoreTraverser m_RestoreTraverser; + }; // class CRestore +}; // class CForecastModelPersist } } diff --git a/include/model/CMetricModelFactory.h b/include/model/CMetricModelFactory.h index 2717dd71bb..fe98baf57a 100644 --- a/include/model/CMetricModelFactory.h +++ b/include/model/CMetricModelFactory.h @@ -139,8 +139,8 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} - //! Get the minimum seasonal variance scale - virtual double minimumSeasonalVarianceScale() const; + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; private: //! Get the field values which partition the data for modeling. diff --git a/include/model/CMetricPopulationModelFactory.h b/include/model/CMetricPopulationModelFactory.h index 5b3aafff69..dfb1376957 100644 --- a/include/model/CMetricPopulationModelFactory.h +++ b/include/model/CMetricPopulationModelFactory.h @@ -138,8 +138,8 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} - //! Get the minimum seasonal variance scale - virtual double minimumSeasonalVarianceScale() const; + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; private: //! Get the field values which partition the data for modeling. diff --git a/include/model/CModelFactory.h b/include/model/CModelFactory.h index d49067f695..9aa2e70100 100644 --- a/include/model/CModelFactory.h +++ b/include/model/CModelFactory.h @@ -345,8 +345,8 @@ class MODEL_EXPORT CModelFactory { //! component. std::size_t componentSize() const; - // Get the minimum seasonal variance scale, specific to the model - virtual double minimumSeasonalVarianceScale() const = 0; + // Get the minimum seasonal variance scale, specific to the model + virtual double minimumSeasonalVarianceScale() const = 0; protected: using TMultivariatePriorPtrVec = std::vector; diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index b6cbddb277..b09f73005a 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -47,9 +47,12 @@ const std::string CForecastRunner::ERROR_NO_DATA_PROCESSED("Forecast cannot be executed as job requires data to have been processed and modeled"); const std::string CForecastRunner::ERROR_NO_CREATE_TIME("Forecast create time must be specified and non zero"); const std::string CForecastRunner::ERROR_BAD_MEMORY_STATUS("Forecast cannot be executed as model memory status is not OK"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB while disk space is exceeded"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISK("Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE("Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT( + "Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB while disk space is exceeded"); +const std::string + CForecastRunner::ERROR_MEMORY_LIMIT_DISK("Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE( + "Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); const std::string CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS("Forecast is not supported for population analysis"); const std::string CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS("Forecast is not supported for the used functions"); const std::string CForecastRunner::WARNING_DURATION_LIMIT("Forecast duration exceeds internal limit, setting to 8 weeks"); @@ -70,8 +73,8 @@ CForecastRunner::SForecast::SForecast() s_NumberOfModels(0), s_NumberOfForecastableModels(0), s_MemoryUsage(0), - s_Messages(), - s_TemporaryFolder() + s_Messages(), + s_TemporaryFolder() { } CForecastRunner::SForecast::SForecast(SForecast&& other) @@ -86,8 +89,8 @@ CForecastRunner::SForecast::SForecast(SForecast&& other) s_NumberOfModels(other.s_NumberOfModels), s_NumberOfForecastableModels(other.s_NumberOfForecastableModels), s_MemoryUsage(other.s_MemoryUsage), - s_Messages(other.s_Messages), - s_TemporaryFolder(std::move(other.s_TemporaryFolder)) + s_Messages(other.s_Messages), + s_TemporaryFolder(std::move(other.s_TemporaryFolder)) { } CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& other) { @@ -172,25 +175,20 @@ void CForecastRunner::forecastWorker() { // initialize persistence restore exactly once if (!series.s_ToForecastPersisted.empty()) { - modelRestore.reset(new model::CForecastModelPersist::CRestore(series.s_ModelParams, - series.s_MinimumSeasonalVarianceScale, - series.s_ToForecastPersisted)); + modelRestore.reset(new model::CForecastModelPersist::CRestore( + series.s_ModelParams, series.s_MinimumSeasonalVarianceScale, series.s_ToForecastPersisted)); } - while (series.s_ToForecast.empty() == false || modelRestore != nullptr) - { + while (series.s_ToForecast.empty() == false || modelRestore != nullptr) { // check if we should backfill from persistence - if (series.s_ToForecast.empty()) - { + if (series.s_ToForecast.empty()) { TMathsModelPtr model; model_t::EFeature feature; std::string byFieldValue; - if (modelRestore->nextModel(model, feature, byFieldValue)) - { + if (modelRestore->nextModel(model, feature, byFieldValue)) { series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); - } - else // restorer exhausted, no need for further restoring + } else // restorer exhausted, no need for further restoring { modelRestore.reset(); break; @@ -249,18 +247,16 @@ void CForecastRunner::forecastWorker() { m_WorkCompleteCondition.notify_all(); // cleanup - if (!forecastJob.s_TemporaryFolder.empty()) - { + if (!forecastJob.s_TemporaryFolder.empty()) { boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); boost::system::error_code errorCode; boost::filesystem::remove_all(temporaryFolder, errorCode); - if (errorCode) - { + if (errorCode) { // not an error: there is also cleanup code on X-pack side - LOG_WARN("Failed to cleanup temporary data from: " << forecastJob.s_TemporaryFolder << " error " << errorCode.message()); + LOG_WARN("Failed to cleanup temporary data from: " << forecastJob.s_TemporaryFolder << " error " + << errorCode.message()); return; } - } } } @@ -331,16 +327,15 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty()) // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); - return false; - } + return false; } - if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) - { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) { this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISK); return false; } + if (atLeastOneNonPopulationModel == false) { this->sendErrorMessage(forecastJob, ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS); return false; } @@ -364,35 +359,28 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, // 2nd loop over the detectors to clone models for forecasting bool persistOnDisk = false; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) - { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); - if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) - { + if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) { this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); return false; } - LOG_INFO("Forecast of large model requested (requires " - << std::to_string(1 + (totalMemoryUsage >> 20)) - << " MB), using disk."); + LOG_INFO("Forecast of large model requested (requires " << std::to_string(1 + (totalMemoryUsage >> 20)) << " MB), using disk."); boost::system::error_code errorCode; boost::filesystem::create_directories(temporaryFolder, errorCode); - if (errorCode) - { + if (errorCode) { this->sendErrorMessage(forecastJob, - "Forecast internal error, failed to create temporary folder " - + temporaryFolder.string() + " error: " + errorCode.message()); + "Forecast internal error, failed to create temporary folder " + temporaryFolder.string() + + " error: " + errorCode.message()); return false; } LOG_DEBUG("Persisting to: " << temporaryFolder.string()); persistOnDisk = true; - } - else - { + } else { forecastJob.s_TemporaryFolder.clear(); } @@ -402,9 +390,7 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, continue; } - forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels( - persistOnDisk, - forecastJob.s_TemporaryFolder)); + forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels(persistOnDisk, forecastJob.s_TemporaryFolder)); } return this->push(forecastJob); @@ -507,13 +493,11 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control return true; } -bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path &path) -{ +bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path& path) { boost::system::error_code errorCode; auto spaceInfo = boost::filesystem::space(path, errorCode); - if (errorCode) - { + if (errorCode) { LOG_ERROR("Failed to retrieve disk information for " << path << " error " << errorCode.message()); return false; } diff --git a/lib/model/CAnomalyDetector.cc b/lib/model/CAnomalyDetector.cc index 7e7a4f4207..86a98b3a49 100644 --- a/lib/model/CAnomalyDetector.cc +++ b/lib/model/CAnomalyDetector.cc @@ -497,7 +497,7 @@ CForecastDataSink::SForecastModelPrerequisites CAnomalyDetector::getForecastPrer } CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(bool persistOnDisk, - const std::string &persistenceFolder) const + const std::string& persistenceFolder) const { CForecastDataSink::SForecastResultSeries series(m_ModelFactory->modelParams()); if (m_DataGatherer->isPopulation()) { @@ -518,20 +518,15 @@ CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(boo series.s_PartitionFieldValue = m_DataGatherer->partitionFieldValue(); series.s_MinimumSeasonalVarianceScale = m_ModelFactory->minimumSeasonalVarianceScale(); - if (persistOnDisk) - { + if (persistOnDisk) { CForecastModelPersist::CPersist persister(persistenceFolder); - for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) - { + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { // todo: Add terms filtering here - if (m_DataGatherer->isPersonActive(pid)) - { - for (auto feature : view->features()) - { - const maths::CModel *model = view->model(feature, pid); - if (model != nullptr && model->isForecastPossible()) - { + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { persister.addModel(model, feature, m_DataGatherer->personName(pid)); } } @@ -539,23 +534,15 @@ CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(boo } series.s_ToForecastPersisted = persister.finalizePersistAndGetFile(); - } - else - { - for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) - { + } else { + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { // todo: Add terms filtering here - if (m_DataGatherer->isPersonActive(pid)) - { - for (auto feature : view->features()) - { - const maths::CModel *model = view->model(feature, pid); - if (model != nullptr && model->isForecastPossible()) - { + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { series.s_ToForecast.emplace_back( - feature, - CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), - m_DataGatherer->personName(pid)); + feature, CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), m_DataGatherer->personName(pid)); } } } diff --git a/lib/model/CCountingModelFactory.cc b/lib/model/CCountingModelFactory.cc index 83083b18b5..a1cf0e6265 100644 --- a/lib/model/CCountingModelFactory.cc +++ b/lib/model/CCountingModelFactory.cc @@ -178,11 +178,9 @@ CCountingModelFactory::TStrCRefVec CCountingModelFactory::partitioningFields() c } return result; } -double CCountingModelFactory::minimumSeasonalVarianceScale() const -{ +double CCountingModelFactory::minimumSeasonalVarianceScale() const { // unused, return something return 0.0; } - } } diff --git a/lib/model/CEventRateModelFactory.cc b/lib/model/CEventRateModelFactory.cc index 84efec3ff6..08be6bb3a4 100644 --- a/lib/model/CEventRateModelFactory.cc +++ b/lib/model/CEventRateModelFactory.cc @@ -66,16 +66,14 @@ CAnomalyDetectorModel* CEventRateModelFactory::makeModel(const SModelInitializat influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CEventRateModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, - dataGatherer->bucketLength(), - this->minimumSeasonalVarianceScale(), - true), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - this->defaultCategoricalPrior(), - influenceCalculators); + return new CEventRateModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), true), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + this->defaultCategoricalPrior(), + influenceCalculators); } CAnomalyDetectorModel* CEventRateModelFactory::makeModel(const SModelInitializationData& initData, @@ -289,8 +287,7 @@ void CEventRateModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) m_BucketResultsDelay = bucketResultsDelay; } -double CEventRateModelFactory::minimumSeasonalVarianceScale() const -{ +double CEventRateModelFactory::minimumSeasonalVarianceScale() const { return 0.4; } diff --git a/lib/model/CEventRatePopulationModelFactory.cc b/lib/model/CEventRatePopulationModelFactory.cc index b29e0b3808..6a68c40ddb 100644 --- a/lib/model/CEventRatePopulationModelFactory.cc +++ b/lib/model/CEventRatePopulationModelFactory.cc @@ -66,15 +66,13 @@ CAnomalyDetectorModel* CEventRatePopulationModelFactory::makeModel(const SModelI influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CEventRatePopulationModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, - dataGatherer->bucketLength(), - this->minimumSeasonalVarianceScale(), - false), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - influenceCalculators); + return new CEventRatePopulationModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), false), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + influenceCalculators); } CAnomalyDetectorModel* CEventRatePopulationModelFactory::makeModel(const SModelInitializationData& initData, @@ -304,8 +302,7 @@ CEventRatePopulationModelFactory::TStrCRefVec CEventRatePopulationModelFactory:: return result; } -double CEventRatePopulationModelFactory::minimumSeasonalVarianceScale() const -{ +double CEventRatePopulationModelFactory::minimumSeasonalVarianceScale() const { return 1.0; } } diff --git a/lib/model/CForecastDataSink.cc b/lib/model/CForecastDataSink.cc index f5bdacf1a1..2d5f3f0f83 100644 --- a/lib/model/CForecastDataSink.cc +++ b/lib/model/CForecastDataSink.cc @@ -74,23 +74,19 @@ CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(SForecastModelWr : s_Feature(other.s_Feature), s_ForecastModel(std::move(other.s_ForecastModel)), s_ByFieldValue(std::move(other.s_ByFieldValue)) { } -CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams &modelParams) - :s_ModelParams(modelParams), - s_DetectorIndex(), - s_ToForecastPersisted(), - s_ByFieldName(), - s_MinimumSeasonalVarianceScale(0.0) +CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams& modelParams) + : s_ModelParams(modelParams), s_DetectorIndex(), s_ToForecastPersisted(), s_ByFieldName(), s_MinimumSeasonalVarianceScale(0.0) { } CForecastDataSink::SForecastResultSeries::SForecastResultSeries(SForecastResultSeries&& other) - :s_ModelParams(std::move(other.s_ModelParams)), - s_DetectorIndex(other.s_DetectorIndex), + : s_ModelParams(std::move(other.s_ModelParams)), + s_DetectorIndex(other.s_DetectorIndex), s_ToForecast(std::move(other.s_ToForecast)), - s_ToForecastPersisted(std::move(other.s_ToForecastPersisted)), + s_ToForecastPersisted(std::move(other.s_ToForecastPersisted)), s_PartitionFieldName(std::move(other.s_PartitionFieldName)), s_PartitionFieldValue(std::move(other.s_PartitionFieldValue)), - s_ByFieldName(std::move(other.s_ByFieldName)), - s_MinimumSeasonalVarianceScale(other.s_MinimumSeasonalVarianceScale) + s_ByFieldName(std::move(other.s_ByFieldName)), + s_MinimumSeasonalVarianceScale(other.s_MinimumSeasonalVarianceScale) { } CForecastDataSink::CForecastDataSink(const std::string& jobId, diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc index 6b2a74630e..7387753f5d 100644 --- a/lib/model/CForecastModelPersist.cc +++ b/lib/model/CForecastModelPersist.cc @@ -27,10 +27,8 @@ #include -namespace ml -{ -namespace model -{ +namespace ml { +namespace model { namespace { static const std::string FORECAST_MODEL_PERSIST_TAG("forecast_persist"); @@ -40,71 +38,55 @@ static const std::string MODEL_TAG("model"); static const std::string BY_FIELD_VALUE_TAG("by_field_value"); } -CForecastModelPersist::CPersist::CPersist(const std::string &temporaryPath): - m_FileName(temporaryPath), - m_OutStream(), - m_PersistInserter() -{ +CForecastModelPersist::CPersist::CPersist(const std::string& temporaryPath) + : m_FileName(temporaryPath), m_OutStream(), m_PersistInserter() { m_FileName /= boost::filesystem::unique_path("forecast-persist-%%%%-%%%%-%%%%-%%%%"); m_OutStream.open(m_FileName.string()); m_PersistInserter.reset(new core::CJsonStatePersistInserter(m_OutStream)); } -void CForecastModelPersist::CPersist::addModel(const maths::CModel *model, +void CForecastModelPersist::CPersist::addModel(const maths::CModel* model, const model_t::EFeature feature, - const std::string &byFieldValue) -{ - if (!m_PersistInserter) - { + const std::string& byFieldValue) { + if (!m_PersistInserter) { LOG_ERROR("Internal error: Can not add model to an already closed stream."); return; } m_PersistInserter->insertLevel(FORECAST_MODEL_PERSIST_TAG, - boost::bind(CForecastModelPersist::CPersist::persistOneModel, - _1, model, feature, byFieldValue)); + boost::bind(CForecastModelPersist::CPersist::persistOneModel, _1, model, feature, byFieldValue)); } -void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter &inserter, - const maths::CModel *model, +void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, const model_t::EFeature feature, - const std::string &byFieldValue) -{ + const std::string& byFieldValue) { inserter.insertValue(FEATURE_TAG, feature); inserter.insertValue(DATA_TYPE_TAG, model->dataType()); inserter.insertValue(BY_FIELD_VALUE_TAG, byFieldValue); inserter.insertLevel(MODEL_TAG, boost::bind(maths::CModelStateSerialiser(), boost::cref(*model), _1)); } -const std::string &CForecastModelPersist::CPersist::finalizePersistAndGetFile() -{ +const std::string& CForecastModelPersist::CPersist::finalizePersistAndGetFile() { // destruct m_PersistInserter to force close it m_PersistInserter.reset(); m_OutStream.close(); return m_FileName.string(); } -CForecastModelPersist::CRestore::CRestore(const SModelParams &modelParams, - double minimumSeasonalVarianceScale, - const std::string &fileName): - m_ModelParams(modelParams), - m_MinimumSeasonalVarianceScale(minimumSeasonalVarianceScale), - m_InStream(fileName), - m_RestoreTraverser(m_InStream) -{ +CForecastModelPersist::CRestore::CRestore(const SModelParams& modelParams, double minimumSeasonalVarianceScale, const std::string& fileName) + : m_ModelParams(modelParams), + m_MinimumSeasonalVarianceScale(minimumSeasonalVarianceScale), + m_InStream(fileName), + m_RestoreTraverser(m_InStream) { } -bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr &model, - model_t::EFeature &feature, - std::string &byFieldValue) -{ - if (m_RestoreTraverser.isEof()) - { +bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue) { + if (m_RestoreTraverser.isEof()) { return false; } - if (m_RestoreTraverser.name() != FORECAST_MODEL_PERSIST_TAG) - { + if (m_RestoreTraverser.name() != FORECAST_MODEL_PERSIST_TAG) { LOG_ERROR("Failed to restore forecast model, unexpected tag"); return false; } @@ -115,14 +97,13 @@ bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr &model, } TMathsModelPtr originalModel; - if(!m_RestoreTraverser.traverseSubLevel(boost::bind(CForecastModelPersist::CRestore::restoreOneModel, - _1, - boost::cref(m_ModelParams), - m_MinimumSeasonalVarianceScale, - boost::ref(originalModel), - boost::ref(feature), - boost::ref(byFieldValue)))) - { + if (!m_RestoreTraverser.traverseSubLevel(boost::bind(CForecastModelPersist::CRestore::restoreOneModel, + _1, + boost::cref(m_ModelParams), + m_MinimumSeasonalVarianceScale, + boost::ref(originalModel), + boost::ref(feature), + boost::ref(byFieldValue)))) { LOG_ERROR("Failed to restore forecast model, internal error"); return false; } @@ -133,13 +114,12 @@ bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr &model, return true; } -bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTraverser &traverser, +bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTraverser& traverser, const SModelParams modelParams, double minimumSeasonalVarianceScale, - TMathsModelPtr &model, - model_t::EFeature &feature, - std::string &byFieldValue) -{ + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue) { // reset all model.reset(); bool restoredFeature = false; @@ -147,45 +127,34 @@ bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTravers byFieldValue.clear(); maths_t::EDataType dataType; - do - { - const std::string &name = traverser.name(); + do { + const std::string& name = traverser.name(); RESTORE_ENUM_CHECKED(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) RESTORE_ENUM_CHECKED(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) RESTORE_BUILT_IN(BY_FIELD_VALUE_TAG, byFieldValue) - if (name == MODEL_TAG) - { - if (!restoredDataType) - { + if (name == MODEL_TAG) { + if (!restoredDataType) { LOG_ERROR("Failed to restore forecast model, datatype missing"); return false; } maths::SModelRestoreParams params{ - maths::CModelParams(modelParams.s_BucketLength, - modelParams.s_LearnRate, - modelParams.s_DecayRate, - minimumSeasonalVarianceScale - ), + maths::CModelParams( + modelParams.s_BucketLength, modelParams.s_LearnRate, modelParams.s_DecayRate, minimumSeasonalVarianceScale), maths::STimeSeriesDecompositionRestoreParams{ - modelParams.s_DecayRate, - modelParams.s_BucketLength, - modelParams.s_ComponentSize}, - modelParams.distributionRestoreParams(dataType)}; - - if(!traverser.traverseSubLevel(boost::bind(maths::CModelStateSerialiser(), - boost::cref(params), boost::ref(model), _1))) - { + modelParams.s_DecayRate, modelParams.s_BucketLength, modelParams.s_ComponentSize}, + modelParams.distributionRestoreParams(dataType)}; + + if (!traverser.traverseSubLevel( + boost::bind(maths::CModelStateSerialiser(), boost::cref(params), boost::ref(model), _1))) { LOG_ERROR("Failed to restore forecast model, model missing"); return false; } } - } - while (traverser.next()); + } while (traverser.next()); // only the by_field_value can be empty - if (!model || !restoredFeature || !restoredDataType) - { + if (!model || !restoredFeature || !restoredDataType) { LOG_ERROR("Failed to restore forecast model, data missing"); return false; } diff --git a/lib/model/CHierarchicalResults.cc b/lib/model/CHierarchicalResults.cc index a44e3b0faf..3585586ea8 100644 --- a/lib/model/CHierarchicalResults.cc +++ b/lib/model/CHierarchicalResults.cc @@ -959,7 +959,8 @@ bool CHierarchicalResultsVisitor::isPopulation(const TNode& node) { const CHierarchicalResultsVisitor::TNode* CHierarchicalResultsVisitor::nearestAncestorForWhichWeWriteResults(const TNode& node) { const TNode* result = &node; - for (result = result->s_Parent; result && !isTypeForWhichWeWriteResults(*result, false); result = result->s_Parent) {} + for (result = result->s_Parent; result && !isTypeForWhichWeWriteResults(*result, false); result = result->s_Parent) { + } return result; } diff --git a/lib/model/CMetricModelFactory.cc b/lib/model/CMetricModelFactory.cc index 22334ee25f..8c461a0635 100644 --- a/lib/model/CMetricModelFactory.cc +++ b/lib/model/CMetricModelFactory.cc @@ -67,10 +67,7 @@ CAnomalyDetectorModel* CMetricModelFactory::makeModel(const SModelInitialization return new CMetricModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, - dataGatherer->bucketLength(), - this->minimumSeasonalVarianceScale(), - true), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); @@ -290,8 +287,7 @@ void CMetricModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) { m_BucketResultsDelay = bucketResultsDelay; } -double CMetricModelFactory::minimumSeasonalVarianceScale() const -{ +double CMetricModelFactory::minimumSeasonalVarianceScale() const { return 0.4; } diff --git a/lib/model/CMetricPopulationModelFactory.cc b/lib/model/CMetricPopulationModelFactory.cc index 5dd88d8035..31f3f87ef0 100644 --- a/lib/model/CMetricPopulationModelFactory.cc +++ b/lib/model/CMetricPopulationModelFactory.cc @@ -64,15 +64,13 @@ CAnomalyDetectorModel* CMetricPopulationModelFactory::makeModel(const SModelInit influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CMetricPopulationModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, - dataGatherer->bucketLength(), - this->minimumSeasonalVarianceScale(), - false), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - influenceCalculators); + return new CMetricPopulationModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), false), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + influenceCalculators); } CAnomalyDetectorModel* CMetricPopulationModelFactory::makeModel(const SModelInitializationData& initData, @@ -287,8 +285,7 @@ void CMetricPopulationModelFactory::bucketResultsDelay(std::size_t bucketResults m_BucketResultsDelay = bucketResultsDelay; } -double CMetricPopulationModelFactory::minimumSeasonalVarianceScale() const -{ +double CMetricPopulationModelFactory::minimumSeasonalVarianceScale() const { return 1.0; } diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc index 8ec2b4d3a5..d81d356d89 100644 --- a/lib/model/unittest/CForecastModelPersistTest.cc +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -15,8 +15,8 @@ #include "CForecastModelPersistTest.h" -#include #include +#include #include #include @@ -31,8 +31,7 @@ using namespace ml; using namespace model; -void CForecastModelPersistTest::testPersistAndRestore() -{ +void CForecastModelPersistTest::testPersistAndRestore() { LOG_DEBUG("+----------------------------------------------------+"); LOG_DEBUG("| CForecastModelPersistTest::testPersistAndRestore |"); LOG_DEBUG("+----------------------------------------------------+"); @@ -45,32 +44,23 @@ void CForecastModelPersistTest::testPersistAndRestore() maths::CTimeSeriesDecomposition trend(params.s_DecayRate, bucketLength); maths::CNormalMeanPrecConjugate prior{ maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_ContinuousData, params.s_DecayRate)}; - maths::CModelParams timeSeriesModelParams{bucketLength, - params.s_LearnRate, - params.s_DecayRate, - minimumSeasonalVarianceScale}; + maths::CModelParams timeSeriesModelParams{bucketLength, params.s_LearnRate, params.s_DecayRate, minimumSeasonalVarianceScale}; maths::CUnivariateTimeSeriesModel timeSeriesModel{timeSeriesModelParams, 1, trend, prior}; CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); - persister.addModel(&timeSeriesModel, - model_t::EFeature::E_IndividualCountByBucketAndPerson, - "some_by_field"); + persister.addModel(&timeSeriesModel, model_t::EFeature::E_IndividualCountByBucketAndPerson, "some_by_field"); maths::CNormalMeanPrecConjugate otherPrior{ maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_MixedData, params.s_DecayRate)}; maths::CUnivariateTimeSeriesModel otherTimeSeriesModel{timeSeriesModelParams, 2, trend, otherPrior}; - persister.addModel(&otherTimeSeriesModel, - model_t::EFeature::E_IndividualLowMeanByPerson, - "some_other_by_field"); + persister.addModel(&otherTimeSeriesModel, model_t::EFeature::E_IndividualLowMeanByPerson, "some_other_by_field"); maths::CNormalMeanPrecConjugate otherPriorEmptyByField{ maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_DiscreteData, params.s_DecayRate)}; maths::CUnivariateTimeSeriesModel otherTimeSeriesModelEmptyByField{timeSeriesModelParams, 3, trend, otherPriorEmptyByField}; - persister.addModel(&otherTimeSeriesModelEmptyByField, - model_t::EFeature::E_IndividualHighMedianByPerson, - ""); + persister.addModel(&otherTimeSeriesModelEmptyByField, model_t::EFeature::E_IndividualHighMedianByPerson, ""); std::string persistedModels = persister.finalizePersistAndGetFile(); { @@ -89,7 +79,7 @@ void CForecastModelPersistTest::testPersistAndRestore() CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); - CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast {timeSeriesModel.cloneForForecast()}; + CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast{timeSeriesModel.cloneForForecast()}; CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); @@ -105,7 +95,7 @@ void CForecastModelPersistTest::testPersistAndRestore() CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); - CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast {otherTimeSeriesModel.cloneForForecast()}; + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast{otherTimeSeriesModel.cloneForForecast()}; CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); @@ -120,23 +110,20 @@ void CForecastModelPersistTest::testPersistAndRestore() CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); - CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast - {otherTimeSeriesModelEmptyByField.cloneForForecast()}; - CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), - restoredModel->checksum(42)); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast{ + otherTimeSeriesModelEmptyByField.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), restoredModel->checksum(42)); CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); } std::remove(persistedModels.c_str()); } -CppUnit::Test* CForecastModelPersistTest::suite(void) -{ - CppUnit::TestSuite *suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); +CppUnit::Test* CForecastModelPersistTest::suite(void) { + CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); - suiteOfTests->addTest( new CppUnit::TestCaller( - "CForecastModelPersistTest::testPersistAndRestore", - &CForecastModelPersistTest::testPersistAndRestore) ); + suiteOfTests->addTest(new CppUnit::TestCaller("CForecastModelPersistTest::testPersistAndRestore", + &CForecastModelPersistTest::testPersistAndRestore)); return suiteOfTests; } diff --git a/lib/model/unittest/CForecastModelPersistTest.h b/lib/model/unittest/CForecastModelPersistTest.h index d0313a98b6..2daa6233dd 100644 --- a/lib/model/unittest/CForecastModelPersistTest.h +++ b/lib/model/unittest/CForecastModelPersistTest.h @@ -18,12 +18,11 @@ #include -class CForecastModelPersistTest : public CppUnit::TestFixture -{ - public: - void testPersistAndRestore(); +class CForecastModelPersistTest : public CppUnit::TestFixture { +public: + void testPersistAndRestore(); - static CppUnit::Test *suite(); + static CppUnit::Test* suite(); }; #endif // INCLUDED_CForecastModelPersistTest_h diff --git a/lib/model/unittest/Main.cc b/lib/model/unittest/Main.cc index ed76b39f8d..d10a6e2ec3 100644 --- a/lib/model/unittest/Main.cc +++ b/lib/model/unittest/Main.cc @@ -25,8 +25,8 @@ #include "CEventRateAnomalyDetectorTest.h" #include "CEventRateDataGathererTest.h" #include "CEventRateModelTest.h" -#include "CForecastModelPersistTest.h" #include "CEventRatePopulationModelTest.h" +#include "CForecastModelPersistTest.h" #include "CFunctionTypesTest.h" #include "CGathererToolsTest.h" #include "CHierarchicalResultsLevelSetTest.h" @@ -67,7 +67,7 @@ int main(int argc, const char** argv) { runner.addTest(CEventRatePopulationDataGathererTest::suite()); runner.addTest(CEventRatePopulationModelTest::suite()); runner.addTest(CFunctionTypesTest::suite()); - runner.addTest( CForecastModelPersistTest::suite() ); + runner.addTest(CForecastModelPersistTest::suite()); runner.addTest(CGathererToolsTest::suite()); runner.addTest(CHierarchicalResultsTest::suite()); runner.addTest(CHierarchicalResultsLevelSetTest::suite()); From beb2a406d3f2a910e4aad3245758de0399f87137 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 5 Apr 2018 15:36:19 +0200 Subject: [PATCH 09/10] fix merge problem --- lib/api/CForecastRunner.cc | 5 +++-- lib/model/unittest/Main.cc | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index b09f73005a..082645963e 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -324,10 +324,11 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, atLeastOneSupportedFunction = atLeastOneSupportedFunction || prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty()) + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty()) { // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); - return false; + return false; + } } if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) { diff --git a/lib/model/unittest/Main.cc b/lib/model/unittest/Main.cc index d10a6e2ec3..2d6d64fdf4 100644 --- a/lib/model/unittest/Main.cc +++ b/lib/model/unittest/Main.cc @@ -25,6 +25,7 @@ #include "CEventRateAnomalyDetectorTest.h" #include "CEventRateDataGathererTest.h" #include "CEventRateModelTest.h" +#include "CEventRatePopulationDataGathererTest.h" #include "CEventRatePopulationModelTest.h" #include "CForecastModelPersistTest.h" #include "CFunctionTypesTest.h" From 5a8999313bd1d44672b55045433663b2ed19bfce Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 11 Apr 2018 11:39:11 +0200 Subject: [PATCH 10/10] fix edge case of empty model persistence and produce valid json --- include/model/CForecastModelPersist.h | 4 ++-- lib/model/CForecastModelPersist.cc | 22 +++++++++---------- .../unittest/CForecastModelPersistTest.cc | 20 +++++++++++++++++ .../unittest/CForecastModelPersistTest.h | 1 + 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h index 60d026d697..a8b20409a0 100644 --- a/include/model/CForecastModelPersist.h +++ b/include/model/CForecastModelPersist.h @@ -74,8 +74,8 @@ class MODEL_EXPORT CForecastModelPersist final { //! the actual file where it models are persisted to std::ofstream m_OutStream; - //! the persist inserter - std::unique_ptr m_PersistInserter; + //! number of models persisted + size_t m_ModelCount; }; class CRestore final { diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc index 7387753f5d..8f7c67e590 100644 --- a/lib/model/CForecastModelPersist.cc +++ b/lib/model/CForecastModelPersist.cc @@ -38,23 +38,22 @@ static const std::string MODEL_TAG("model"); static const std::string BY_FIELD_VALUE_TAG("by_field_value"); } -CForecastModelPersist::CPersist::CPersist(const std::string& temporaryPath) - : m_FileName(temporaryPath), m_OutStream(), m_PersistInserter() { +CForecastModelPersist::CPersist::CPersist(const std::string& temporaryPath) : m_FileName(temporaryPath), m_OutStream(), m_ModelCount(0) { m_FileName /= boost::filesystem::unique_path("forecast-persist-%%%%-%%%%-%%%%-%%%%"); m_OutStream.open(m_FileName.string()); - m_PersistInserter.reset(new core::CJsonStatePersistInserter(m_OutStream)); + m_OutStream << "["; } void CForecastModelPersist::CPersist::addModel(const maths::CModel* model, const model_t::EFeature feature, const std::string& byFieldValue) { - if (!m_PersistInserter) { - LOG_ERROR("Internal error: Can not add model to an already closed stream."); - return; + if (m_ModelCount++ > 0) { + m_OutStream << ","; } - m_PersistInserter->insertLevel(FORECAST_MODEL_PERSIST_TAG, - boost::bind(CForecastModelPersist::CPersist::persistOneModel, _1, model, feature, byFieldValue)); + core::CJsonStatePersistInserter inserter(m_OutStream); + inserter.insertLevel(FORECAST_MODEL_PERSIST_TAG, + boost::bind(CForecastModelPersist::CPersist::persistOneModel, _1, model, feature, byFieldValue)); } void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter& inserter, @@ -68,8 +67,7 @@ void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserte } const std::string& CForecastModelPersist::CPersist::finalizePersistAndGetFile() { - // destruct m_PersistInserter to force close it - m_PersistInserter.reset(); + m_OutStream << "]"; m_OutStream.close(); return m_FileName.string(); } @@ -82,7 +80,7 @@ CForecastModelPersist::CRestore::CRestore(const SModelParams& modelParams, doubl } bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue) { - if (m_RestoreTraverser.isEof()) { + if (m_RestoreTraverser.isEof() || m_RestoreTraverser.name().empty()) { return false; } @@ -109,8 +107,8 @@ bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr& model, model_t:: } model.reset(originalModel->cloneForForecast()); + m_RestoreTraverser.nextObject(); - m_RestoreTraverser.next(); return true; } diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc index d81d356d89..a72875fc08 100644 --- a/lib/model/unittest/CForecastModelPersistTest.cc +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -119,11 +119,31 @@ void CForecastModelPersistTest::testPersistAndRestore() { std::remove(persistedModels.c_str()); } +void CForecastModelPersistTest::testPersistAndRestoreEmpty() { + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + std::string persistedModels = persister.finalizePersistAndGetFile(); + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } + std::remove(persistedModels.c_str()); +} + CppUnit::Test* CForecastModelPersistTest::suite(void) { CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); suiteOfTests->addTest(new CppUnit::TestCaller("CForecastModelPersistTest::testPersistAndRestore", &CForecastModelPersistTest::testPersistAndRestore)); + suiteOfTests->addTest(new CppUnit::TestCaller("CForecastModelPersistTest::testPersistAndRestoreEmpty", + &CForecastModelPersistTest::testPersistAndRestoreEmpty)); return suiteOfTests; } diff --git a/lib/model/unittest/CForecastModelPersistTest.h b/lib/model/unittest/CForecastModelPersistTest.h index 2daa6233dd..bfc6edf7da 100644 --- a/lib/model/unittest/CForecastModelPersistTest.h +++ b/lib/model/unittest/CForecastModelPersistTest.h @@ -21,6 +21,7 @@ class CForecastModelPersistTest : public CppUnit::TestFixture { public: void testPersistAndRestore(); + void testPersistAndRestoreEmpty(); static CppUnit::Test* suite(); };