diff --git a/bin/autodetect/Makefile b/bin/autodetect/Makefile index a0fa8da061..cf5a227f4b 100644 --- a/bin/autodetect/Makefile +++ b/bin/autodetect/Makefile @@ -21,6 +21,7 @@ INSTALL_DIR=$(CPP_PLATFORM_HOME)/bin ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_BOOST_PROGRAMOPTIONS_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/include/api/CForecastRunner.h b/include/api/CForecastRunner.h index 1e0c4ae923..5e4396c6b0 100644 --- a/include/api/CForecastRunner.h +++ b/include/api/CForecastRunner.h @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -77,10 +78,22 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY; //! max memory allowed to use for forecast models - static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB + static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB + + // Note: This value measures the size in memory, not the size of the persistence, + // which is likely higher and would be hard to calculate upfront + //! max memory allowed to use for forecast models persisting to disk + static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB + + // Note: This value is lower than on X-pack side to prevent side-effects, + // if you change this value also change the limit on X-pack side. + // The purpose of this value is to guard the rest of the system regarding + // an out of disk space + //! minimum disk space required for disk persistence + static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB //! minimum time between stat updates to prevent to many updates in a short time - static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s + static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s private: static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE; @@ -91,6 +104,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { static const std::string ERROR_NO_CREATE_TIME; static const std::string ERROR_BAD_MEMORY_STATUS; static const std::string ERROR_MEMORY_LIMIT; + static const std::string ERROR_MEMORY_LIMIT_DISK; + static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS; static const std::string ERROR_NO_SUPPORTED_FUNCTIONS; static const std::string WARNING_DURATION_LIMIT; @@ -109,6 +124,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper; using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries; using TForecastResultSeriesVec = std::vector; + using TMathsModelPtr = boost::shared_ptr; using TStrUSet = boost::unordered_set; @@ -194,6 +210,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! A collection storing important messages from forecasting TStrUSet s_Messages; + + //! A directory to persist models on disk + std::string s_TemporaryFolder; }; private: @@ -206,6 +225,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable { //! Check for new jobs, blocks while waiting bool tryGetJob(SForecast& forecastJob); + //! check for sufficient disk space + bool sufficientAvailableDiskSpace(const boost::filesystem::path& path); + //! pushes new jobs into the internal 'queue' (thread boundary) bool push(SForecast& forecastJob); diff --git a/include/core/RestoreMacros.h b/include/core/RestoreMacros.h index 20e344c1de..4d8a903c7d 100644 --- a/include/core/RestoreMacros.h +++ b/include/core/RestoreMacros.h @@ -48,6 +48,23 @@ namespace core { continue; \ } +#define RESTORE_ENUM(tag, target, enumtype) \ + if (name == tag) { \ + int value; \ + if (core::CStringUtils::stringToType(traverser.value(), value) == false) { \ + LOG_ERROR("Failed to restore " #tag ", got " << traverser.value()); \ + return false; \ + } \ + target = enumtype(value); \ + continue; \ + } + +#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \ + if (name == tag) { \ + restoreSuccess = true; \ + RESTORE_ENUM(tag, target, enumtype) \ + } + #define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \ if (name == tag) { \ setup; \ diff --git a/include/model/CAnomalyDetector.h b/include/model/CAnomalyDetector.h index e6819faa3f..a39026f843 100644 --- a/include/model/CAnomalyDetector.h +++ b/include/model/CAnomalyDetector.h @@ -237,7 +237,8 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable { CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const; //! Generate maths models for forecasting - CForecastDataSink::SForecastResultSeries getForecastModels() const; + CForecastDataSink::SForecastResultSeries getForecastModels(bool persistOnDisk = false, + const std::string& persistenceFolder = EMPTY_STRING) const; //! Remove dead models, i.e. those models that have more-or-less //! reverted back to their non-informative state. BE CAREFUL WHEN diff --git a/include/model/CCountingModelFactory.h b/include/model/CCountingModelFactory.h index 736e5406e9..74080ee9b4 100644 --- a/include/model/CCountingModelFactory.h +++ b/include/model/CCountingModelFactory.h @@ -126,6 +126,9 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRateModelFactory.h b/include/model/CEventRateModelFactory.h index e20b8f25e3..9ee3cabe32 100644 --- a/include/model/CEventRateModelFactory.h +++ b/include/model/CEventRateModelFactory.h @@ -136,6 +136,9 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CEventRatePopulationModelFactory.h b/include/model/CEventRatePopulationModelFactory.h index 9f3225e40d..35cce6c6df 100644 --- a/include/model/CEventRatePopulationModelFactory.h +++ b/include/model/CEventRatePopulationModelFactory.h @@ -138,6 +138,9 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CForecastDataSink.h b/include/model/CForecastDataSink.h index 73335fcb17..c89e3094d8 100644 --- a/include/model/CForecastDataSink.h +++ b/include/model/CForecastDataSink.h @@ -23,6 +23,7 @@ #include +#include #include #include @@ -47,7 +48,7 @@ namespace model { //! to change (e.g. the json writing should not happen in this class). class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { public: - using TMathsModelPtr = std::unique_ptr; + using TMathsModelPtr = boost::shared_ptr; using TStrUMap = boost::unordered_set; //! Wrapper for 1 timeseries model, its feature and by Field @@ -66,18 +67,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable { //! Everything that defines 1 series of forecasts struct MODEL_EXPORT SForecastResultSeries { - SForecastResultSeries(); + SForecastResultSeries(const SModelParams& modelParams); SForecastResultSeries(SForecastResultSeries&& other); SForecastResultSeries(const SForecastResultSeries& that) = delete; SForecastResultSeries& operator=(const SForecastResultSeries&) = delete; + SModelParams s_ModelParams; int s_DetectorIndex; std::vector s_ToForecast; + std::string s_ToForecastPersisted; std::string s_PartitionFieldName; std::string s_PartitionFieldValue; std::string s_ByFieldName; + double s_MinimumSeasonalVarianceScale; }; //! \brief Data describing prerequisites prior predictions diff --git a/include/model/CForecastModelPersist.h b/include/model/CForecastModelPersist.h new file mode 100644 index 0000000000..a8b20409a0 --- /dev/null +++ b/include/model/CForecastModelPersist.h @@ -0,0 +1,113 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#ifndef INCLUDED_ml_model_CForecastModelPersist_h +#define INCLUDED_ml_model_CForecastModelPersist_h + +#include +#include + +#include + +#include +#include +#include + +#include + +#include +#include + +namespace ml { +namespace model { + +//! \brief Persist/Restore CModel sub-classes to/from text representations for +//! the purpose of forecasting. +//! +//! DESCRIPTION:\n +//! Persists/Restores models to disk for the purpose of restoring and forecasting +//! on them. +//! +//! IMPLEMENTATION DECISIONS:\n +//! Only as complete as required for forecasting. +//! +//! Persist and Restore are only done to avoid heap memory usage using temporary disk space. +//! No need for backwards compatibility and version'ing as code will only be used +//! locally never leaving process/io boundaries. +class MODEL_EXPORT CForecastModelPersist final { +public: + using TMathsModelPtr = boost::shared_ptr; + +public: + class CPersist final { + public: + explicit CPersist(const std::string& temporaryPath); + + //! add a model to the persistence + void addModel(const maths::CModel* model, const model_t::EFeature feature, const std::string& byFieldValue); + + //! close the outputStream + const std::string& finalizePersistAndGetFile(); + + private: + static void persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue); + + private: + //! the filename where to persist to + boost::filesystem::path m_FileName; + + //! the actual file where it models are persisted to + std::ofstream m_OutStream; + + //! number of models persisted + size_t m_ModelCount; + }; + + class CRestore final { + public: + explicit CRestore(const SModelParams& modelParams, double minimumSeasonalVarianceScale, const std::string& fileName); + + //! add a model to the persistence + bool nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue); + + private: + static bool restoreOneModel(core::CStateRestoreTraverser& traverser, + SModelParams modelParams, + double inimumSeasonalVarianceScale, + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue); + + private: + //! model parameters required in order to restore the model + SModelParams m_ModelParams; + + //! minimum seasonal variance scale specific to the model + double m_MinimumSeasonalVarianceScale; + + //! the actual file where it models are persisted to + std::ifstream m_InStream; + + //! the persist inserter + core::CJsonStateRestoreTraverser m_RestoreTraverser; + }; // class CRestore +}; // class CForecastModelPersist +} +} + +#endif // INCLUDED_ml_model_CForecastModelPersist_h diff --git a/include/model/CMetricModelFactory.h b/include/model/CMetricModelFactory.h index 23b2f15f83..fe98baf57a 100644 --- a/include/model/CMetricModelFactory.h +++ b/include/model/CMetricModelFactory.h @@ -139,6 +139,9 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CMetricPopulationModelFactory.h b/include/model/CMetricPopulationModelFactory.h index e4b1984451..dfb1376957 100644 --- a/include/model/CMetricPopulationModelFactory.h +++ b/include/model/CMetricPopulationModelFactory.h @@ -138,6 +138,9 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory { virtual void bucketResultsDelay(std::size_t bucketResultsDelay); //@} + //! Get the minimum seasonal variance scale + virtual double minimumSeasonalVarianceScale() const; + private: //! Get the field values which partition the data for modeling. virtual TStrCRefVec partitioningFields() const; diff --git a/include/model/CModelFactory.h b/include/model/CModelFactory.h index 5cc96d86d6..9aa2e70100 100644 --- a/include/model/CModelFactory.h +++ b/include/model/CModelFactory.h @@ -345,6 +345,9 @@ class MODEL_EXPORT CModelFactory { //! component. std::size_t componentSize() const; + // Get the minimum seasonal variance scale, specific to the model + virtual double minimumSeasonalVarianceScale() const = 0; + protected: using TMultivariatePriorPtrVec = std::vector; using TOptionalSearchKey = boost::optional; diff --git a/lib/api/CForecastRunner.cc b/lib/api/CForecastRunner.cc index 41c9049d2b..082645963e 100644 --- a/lib/api/CForecastRunner.cc +++ b/lib/api/CForecastRunner.cc @@ -20,12 +20,14 @@ #include #include +#include #include #include #include -#include +#include +#include #include namespace ml { @@ -45,7 +47,12 @@ const std::string CForecastRunner::ERROR_NO_DATA_PROCESSED("Forecast cannot be executed as job requires data to have been processed and modeled"); const std::string CForecastRunner::ERROR_NO_CREATE_TIME("Forecast create time must be specified and non zero"); const std::string CForecastRunner::ERROR_BAD_MEMORY_STATUS("Forecast cannot be executed as model memory status is not OK"); -const std::string CForecastRunner::ERROR_MEMORY_LIMIT("Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT( + "Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB while disk space is exceeded"); +const std::string + CForecastRunner::ERROR_MEMORY_LIMIT_DISK("Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); +const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE( + "Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); const std::string CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS("Forecast is not supported for population analysis"); const std::string CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS("Forecast is not supported for the used functions"); const std::string CForecastRunner::WARNING_DURATION_LIMIT("Forecast duration exceeds internal limit, setting to 8 weeks"); @@ -66,7 +73,8 @@ CForecastRunner::SForecast::SForecast() s_NumberOfModels(0), s_NumberOfForecastableModels(0), s_MemoryUsage(0), - s_Messages() { + s_Messages(), + s_TemporaryFolder() { } CForecastRunner::SForecast::SForecast(SForecast&& other) @@ -81,7 +89,8 @@ CForecastRunner::SForecast::SForecast(SForecast&& other) s_NumberOfModels(other.s_NumberOfModels), s_NumberOfForecastableModels(other.s_NumberOfForecastableModels), s_MemoryUsage(other.s_MemoryUsage), - s_Messages(other.s_Messages) { + s_Messages(other.s_Messages), + s_TemporaryFolder(std::move(other.s_TemporaryFolder)) { } CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& other) { @@ -97,6 +106,7 @@ CForecastRunner::SForecast& CForecastRunner::SForecast::operator=(SForecast&& ot s_NumberOfForecastableModels = other.s_NumberOfForecastableModels; s_MemoryUsage = other.s_MemoryUsage; s_Messages = other.s_Messages; + s_TemporaryFolder = std::move(other.s_TemporaryFolder); return *this; } @@ -161,8 +171,30 @@ void CForecastRunner::forecastWorker() { // while loops allow us to free up memory for every model right after each forecast is done while (!forecastJob.s_ForecastSeries.empty()) { TForecastResultSeries& series = forecastJob.s_ForecastSeries.back(); + std::unique_ptr modelRestore; + + // initialize persistence restore exactly once + if (!series.s_ToForecastPersisted.empty()) { + modelRestore.reset(new model::CForecastModelPersist::CRestore( + series.s_ModelParams, series.s_MinimumSeasonalVarianceScale, series.s_ToForecastPersisted)); + } + + while (series.s_ToForecast.empty() == false || modelRestore != nullptr) { + // check if we should backfill from persistence + if (series.s_ToForecast.empty()) { + TMathsModelPtr model; + model_t::EFeature feature; + std::string byFieldValue; + + if (modelRestore->nextModel(model, feature, byFieldValue)) { + series.s_ToForecast.emplace_back(feature, std::move(model), byFieldValue); + } else // restorer exhausted, no need for further restoring + { + modelRestore.reset(); + break; + } + } - while (!series.s_ToForecast.empty()) { const TForecastModelWrapper& model = series.s_ToForecast.back(); model_t::TDouble1VecDouble1VecPr support = model_t::support(model.s_Feature); bool success = model.s_ForecastModel->forecast(forecastJob.s_StartTime, @@ -213,6 +245,19 @@ void CForecastRunner::forecastWorker() { // signal that job is done m_WorkCompleteCondition.notify_all(); + + // cleanup + if (!forecastJob.s_TemporaryFolder.empty()) { + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); + boost::system::error_code errorCode; + boost::filesystem::remove_all(temporaryFolder, errorCode); + if (errorCode) { + // not an error: there is also cleanup code on X-pack side + LOG_WARN("Failed to cleanup temporary data from: " << forecastJob.s_TemporaryFolder << " error " + << errorCode.message()); + return; + } + } } } @@ -279,13 +324,18 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, atLeastOneSupportedFunction = atLeastOneSupportedFunction || prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; - if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY && forecastJob.s_TemporaryFolder.empty()) { // note: for now MAX_FORECAST_MODEL_MEMORY is a static limit, a user can not change it this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT); return false; } } + if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISK); + return false; + } + if (atLeastOneNonPopulationModel == false) { this->sendErrorMessage(forecastJob, ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS); return false; @@ -309,7 +359,31 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, this->sendScheduledMessage(forecastJob); // 2nd loop over the detectors to clone models for forecasting - TForecastResultSeriesVec s; + bool persistOnDisk = false; + if (totalMemoryUsage >= MAX_FORECAST_MODEL_MEMORY) { + boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); + + if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) { + this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); + return false; + } + + LOG_INFO("Forecast of large model requested (requires " << std::to_string(1 + (totalMemoryUsage >> 20)) << " MB), using disk."); + + boost::system::error_code errorCode; + boost::filesystem::create_directories(temporaryFolder, errorCode); + if (errorCode) { + this->sendErrorMessage(forecastJob, + "Forecast internal error, failed to create temporary folder " + temporaryFolder.string() + + " error: " + errorCode.message()); + return false; + } + + LOG_DEBUG("Persisting to: " << temporaryFolder.string()); + persistOnDisk = true; + } else { + forecastJob.s_TemporaryFolder.clear(); + } for (const auto& detector : detectors) { if (detector.get() == nullptr) { @@ -317,7 +391,7 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage, continue; } - forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels()); + forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels(persistOnDisk, forecastJob.s_TemporaryFolder)); } return this->push(forecastJob); @@ -360,6 +434,8 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control forecastJob.s_Duration = properties.get("duration", 0); forecastJob.s_CreateTime = properties.get("create_time", 0); + // tmp storage if available + forecastJob.s_TemporaryFolder = properties.get("tmp_storage", EMPTY_STRING); // use -1 as default to allow 0 as 'never expires' expiresIn = properties.get("expires_in", -1l); @@ -390,7 +466,6 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control // to be replaced by https://github.com/elastic/machine-learning-cpp/issues/443 // TODO this is a temporary fix to prevent the analysis blowing up // if you change this value, also change the log string - // todo: refactor validation out from here core_t::TTime maxDuration = 8 * core::constants::WEEK; if (forecastJob.s_Duration > maxDuration) { LOG_INFO(WARNING_DURATION_LIMIT); @@ -419,6 +494,18 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control return true; } +bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path& path) { + boost::system::error_code errorCode; + auto spaceInfo = boost::filesystem::space(path, errorCode); + + if (errorCode) { + LOG_ERROR("Failed to retrieve disk information for " << path << " error " << errorCode.message()); + return false; + } + + return spaceInfo.available > MIN_FORECAST_AVAILABLE_DISK_SPACE; +} + void CForecastRunner::sendScheduledMessage(const SForecast& forecastJob) const { LOG_DEBUG("job passed forecast validation, scheduled for forecasting"); model::CForecastDataSink sink(m_JobId, diff --git a/lib/api/dump_state/Makefile b/lib/api/dump_state/Makefile index a854f159f7..ca1d0e187d 100644 --- a/lib/api/dump_state/Makefile +++ b/lib/api/dump_state/Makefile @@ -19,6 +19,7 @@ TARGET=dump_state$(EXE_EXT) ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 diff --git a/lib/model/CAnomalyDetector.cc b/lib/model/CAnomalyDetector.cc index 0fec8235d6..86a98b3a49 100644 --- a/lib/model/CAnomalyDetector.cc +++ b/lib/model/CAnomalyDetector.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -495,8 +496,9 @@ CForecastDataSink::SForecastModelPrerequisites CAnomalyDetector::getForecastPrer return prerequisites; } -CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() const { - CForecastDataSink::SForecastResultSeries series; +CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(bool persistOnDisk, + const std::string& persistenceFolder) const { + CForecastDataSink::SForecastResultSeries series(m_ModelFactory->modelParams()); if (m_DataGatherer->isPopulation()) { return series; @@ -514,15 +516,34 @@ CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels() c series.s_DetectorIndex = m_DetectorIndex; series.s_PartitionFieldName = key.partitionFieldName(); series.s_PartitionFieldValue = m_DataGatherer->partitionFieldValue(); + series.s_MinimumSeasonalVarianceScale = m_ModelFactory->minimumSeasonalVarianceScale(); + + if (persistOnDisk) { + CForecastModelPersist::CPersist persister(persistenceFolder); + + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { + persister.addModel(model, feature, m_DataGatherer->personName(pid)); + } + } + } + } - for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { - // todo: Add terms filtering here - if (m_DataGatherer->isPersonActive(pid)) { - for (auto feature : view->features()) { - const maths::CModel* model = view->model(feature, pid); - if (model != nullptr && model->isForecastPossible()) { - series.s_ToForecast.emplace_back( - feature, CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), m_DataGatherer->personName(pid)); + series.s_ToForecastPersisted = persister.finalizePersistAndGetFile(); + } else { + for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { + // todo: Add terms filtering here + if (m_DataGatherer->isPersonActive(pid)) { + for (auto feature : view->features()) { + const maths::CModel* model = view->model(feature, pid); + if (model != nullptr && model->isForecastPossible()) { + series.s_ToForecast.emplace_back( + feature, CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), m_DataGatherer->personName(pid)); + } } } } diff --git a/lib/model/CCountingModelFactory.cc b/lib/model/CCountingModelFactory.cc index 3c83fee99c..a1cf0e6265 100644 --- a/lib/model/CCountingModelFactory.cc +++ b/lib/model/CCountingModelFactory.cc @@ -178,5 +178,9 @@ CCountingModelFactory::TStrCRefVec CCountingModelFactory::partitioningFields() c } return result; } +double CCountingModelFactory::minimumSeasonalVarianceScale() const { + // unused, return something + return 0.0; +} } } diff --git a/lib/model/CEventRateModelFactory.cc b/lib/model/CEventRateModelFactory.cc index 52b0bab56f..08be6bb3a4 100644 --- a/lib/model/CEventRateModelFactory.cc +++ b/lib/model/CEventRateModelFactory.cc @@ -66,13 +66,14 @@ CAnomalyDetectorModel* CEventRateModelFactory::makeModel(const SModelInitializat influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CEventRateModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - this->defaultCategoricalPrior(), - influenceCalculators); + return new CEventRateModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), true), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + this->defaultCategoricalPrior(), + influenceCalculators); } CAnomalyDetectorModel* CEventRateModelFactory::makeModel(const SModelInitializationData& initData, @@ -286,6 +287,10 @@ void CEventRateModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) m_BucketResultsDelay = bucketResultsDelay; } +double CEventRateModelFactory::minimumSeasonalVarianceScale() const { + return 0.4; +} + CEventRateModelFactory::TStrCRefVec CEventRateModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CEventRatePopulationModelFactory.cc b/lib/model/CEventRatePopulationModelFactory.cc index 4489483833..6a68c40ddb 100644 --- a/lib/model/CEventRatePopulationModelFactory.cc +++ b/lib/model/CEventRatePopulationModelFactory.cc @@ -66,12 +66,13 @@ CAnomalyDetectorModel* CEventRatePopulationModelFactory::makeModel(const SModelI influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CEventRatePopulationModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - influenceCalculators); + return new CEventRatePopulationModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), false), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + influenceCalculators); } CAnomalyDetectorModel* CEventRatePopulationModelFactory::makeModel(const SModelInitializationData& initData, @@ -300,5 +301,9 @@ CEventRatePopulationModelFactory::TStrCRefVec CEventRatePopulationModelFactory:: } return result; } + +double CEventRatePopulationModelFactory::minimumSeasonalVarianceScale() const { + return 1.0; +} } } diff --git a/lib/model/CForecastDataSink.cc b/lib/model/CForecastDataSink.cc index cb8805268e..2d5f3f0f83 100644 --- a/lib/model/CForecastDataSink.cc +++ b/lib/model/CForecastDataSink.cc @@ -74,16 +74,19 @@ CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(SForecastModelWr : s_Feature(other.s_Feature), s_ForecastModel(std::move(other.s_ForecastModel)), s_ByFieldValue(std::move(other.s_ByFieldValue)) { } -CForecastDataSink::SForecastResultSeries::SForecastResultSeries() - : s_DetectorIndex(), s_ToForecast(), s_PartitionFieldValue(), s_ByFieldName() { +CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams& modelParams) + : s_ModelParams(modelParams), s_DetectorIndex(), s_ToForecastPersisted(), s_ByFieldName(), s_MinimumSeasonalVarianceScale(0.0) { } CForecastDataSink::SForecastResultSeries::SForecastResultSeries(SForecastResultSeries&& other) - : s_DetectorIndex(other.s_DetectorIndex), + : s_ModelParams(std::move(other.s_ModelParams)), + s_DetectorIndex(other.s_DetectorIndex), s_ToForecast(std::move(other.s_ToForecast)), + s_ToForecastPersisted(std::move(other.s_ToForecastPersisted)), s_PartitionFieldName(std::move(other.s_PartitionFieldName)), s_PartitionFieldValue(std::move(other.s_PartitionFieldValue)), - s_ByFieldName(std::move(other.s_ByFieldName)) { + s_ByFieldName(std::move(other.s_ByFieldName)), + s_MinimumSeasonalVarianceScale(other.s_MinimumSeasonalVarianceScale) { } CForecastDataSink::CForecastDataSink(const std::string& jobId, diff --git a/lib/model/CForecastModelPersist.cc b/lib/model/CForecastModelPersist.cc new file mode 100644 index 0000000000..8f7c67e590 --- /dev/null +++ b/lib/model/CForecastModelPersist.cc @@ -0,0 +1,164 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include + +namespace ml { +namespace model { + +namespace { +static const std::string FORECAST_MODEL_PERSIST_TAG("forecast_persist"); +static const std::string FEATURE_TAG("feature"); +static const std::string DATA_TYPE_TAG("datatype"); +static const std::string MODEL_TAG("model"); +static const std::string BY_FIELD_VALUE_TAG("by_field_value"); +} + +CForecastModelPersist::CPersist::CPersist(const std::string& temporaryPath) : m_FileName(temporaryPath), m_OutStream(), m_ModelCount(0) { + m_FileName /= boost::filesystem::unique_path("forecast-persist-%%%%-%%%%-%%%%-%%%%"); + m_OutStream.open(m_FileName.string()); + m_OutStream << "["; +} + +void CForecastModelPersist::CPersist::addModel(const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue) { + if (m_ModelCount++ > 0) { + m_OutStream << ","; + } + + core::CJsonStatePersistInserter inserter(m_OutStream); + inserter.insertLevel(FORECAST_MODEL_PERSIST_TAG, + boost::bind(CForecastModelPersist::CPersist::persistOneModel, _1, model, feature, byFieldValue)); +} + +void CForecastModelPersist::CPersist::persistOneModel(core::CStatePersistInserter& inserter, + const maths::CModel* model, + const model_t::EFeature feature, + const std::string& byFieldValue) { + inserter.insertValue(FEATURE_TAG, feature); + inserter.insertValue(DATA_TYPE_TAG, model->dataType()); + inserter.insertValue(BY_FIELD_VALUE_TAG, byFieldValue); + inserter.insertLevel(MODEL_TAG, boost::bind(maths::CModelStateSerialiser(), boost::cref(*model), _1)); +} + +const std::string& CForecastModelPersist::CPersist::finalizePersistAndGetFile() { + m_OutStream << "]"; + m_OutStream.close(); + return m_FileName.string(); +} + +CForecastModelPersist::CRestore::CRestore(const SModelParams& modelParams, double minimumSeasonalVarianceScale, const std::string& fileName) + : m_ModelParams(modelParams), + m_MinimumSeasonalVarianceScale(minimumSeasonalVarianceScale), + m_InStream(fileName), + m_RestoreTraverser(m_InStream) { +} + +bool CForecastModelPersist::CRestore::nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue) { + if (m_RestoreTraverser.isEof() || m_RestoreTraverser.name().empty()) { + return false; + } + + if (m_RestoreTraverser.name() != FORECAST_MODEL_PERSIST_TAG) { + LOG_ERROR("Failed to restore forecast model, unexpected tag"); + return false; + } + + if (!m_RestoreTraverser.hasSubLevel()) { + LOG_ERROR("Failed to restore forecast model, unexpected format"); + return false; + } + + TMathsModelPtr originalModel; + if (!m_RestoreTraverser.traverseSubLevel(boost::bind(CForecastModelPersist::CRestore::restoreOneModel, + _1, + boost::cref(m_ModelParams), + m_MinimumSeasonalVarianceScale, + boost::ref(originalModel), + boost::ref(feature), + boost::ref(byFieldValue)))) { + LOG_ERROR("Failed to restore forecast model, internal error"); + return false; + } + + model.reset(originalModel->cloneForForecast()); + m_RestoreTraverser.nextObject(); + + return true; +} + +bool CForecastModelPersist::CRestore::restoreOneModel(core::CStateRestoreTraverser& traverser, + const SModelParams modelParams, + double minimumSeasonalVarianceScale, + TMathsModelPtr& model, + model_t::EFeature& feature, + std::string& byFieldValue) { + // reset all + model.reset(); + bool restoredFeature = false; + bool restoredDataType = false; + byFieldValue.clear(); + maths_t::EDataType dataType; + + do { + const std::string& name = traverser.name(); + RESTORE_ENUM_CHECKED(FEATURE_TAG, feature, model_t::EFeature, restoredFeature) + RESTORE_ENUM_CHECKED(DATA_TYPE_TAG, dataType, maths_t::EDataType, restoredDataType) + RESTORE_BUILT_IN(BY_FIELD_VALUE_TAG, byFieldValue) + if (name == MODEL_TAG) { + if (!restoredDataType) { + LOG_ERROR("Failed to restore forecast model, datatype missing"); + return false; + } + + maths::SModelRestoreParams params{ + maths::CModelParams( + modelParams.s_BucketLength, modelParams.s_LearnRate, modelParams.s_DecayRate, minimumSeasonalVarianceScale), + maths::STimeSeriesDecompositionRestoreParams{ + modelParams.s_DecayRate, modelParams.s_BucketLength, modelParams.s_ComponentSize}, + modelParams.distributionRestoreParams(dataType)}; + + if (!traverser.traverseSubLevel( + boost::bind(maths::CModelStateSerialiser(), boost::cref(params), boost::ref(model), _1))) { + LOG_ERROR("Failed to restore forecast model, model missing"); + return false; + } + } + } while (traverser.next()); + + // only the by_field_value can be empty + if (!model || !restoredFeature || !restoredDataType) { + LOG_ERROR("Failed to restore forecast model, data missing"); + return false; + } + + return true; +} + +} /* namespace model */ +} /* namespace ml */ diff --git a/lib/model/CHierarchicalResults.cc b/lib/model/CHierarchicalResults.cc index a44e3b0faf..3585586ea8 100644 --- a/lib/model/CHierarchicalResults.cc +++ b/lib/model/CHierarchicalResults.cc @@ -959,7 +959,8 @@ bool CHierarchicalResultsVisitor::isPopulation(const TNode& node) { const CHierarchicalResultsVisitor::TNode* CHierarchicalResultsVisitor::nearestAncestorForWhichWeWriteResults(const TNode& node) { const TNode* result = &node; - for (result = result->s_Parent; result && !isTypeForWhichWeWriteResults(*result, false); result = result->s_Parent) {} + for (result = result->s_Parent; result && !isTypeForWhichWeWriteResults(*result, false); result = result->s_Parent) { + } return result; } diff --git a/lib/model/CMetricModelFactory.cc b/lib/model/CMetricModelFactory.cc index 2a3c9c799b..8c461a0635 100644 --- a/lib/model/CMetricModelFactory.cc +++ b/lib/model/CMetricModelFactory.cc @@ -67,7 +67,7 @@ CAnomalyDetectorModel* CMetricModelFactory::makeModel(const SModelInitialization return new CMetricModel(this->modelParams(), dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 0.4, true), + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), true), this->defaultCorrelatePriors(features), this->defaultCorrelates(features), influenceCalculators); @@ -287,6 +287,10 @@ void CMetricModelFactory::bucketResultsDelay(std::size_t bucketResultsDelay) { m_BucketResultsDelay = bucketResultsDelay; } +double CMetricModelFactory::minimumSeasonalVarianceScale() const { + return 0.4; +} + CMetricModelFactory::TStrCRefVec CMetricModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(2); diff --git a/lib/model/CMetricPopulationModelFactory.cc b/lib/model/CMetricPopulationModelFactory.cc index 74b029a160..31f3f87ef0 100644 --- a/lib/model/CMetricPopulationModelFactory.cc +++ b/lib/model/CMetricPopulationModelFactory.cc @@ -64,12 +64,13 @@ CAnomalyDetectorModel* CMetricPopulationModelFactory::makeModel(const SModelInit influenceCalculators.push_back(this->defaultInfluenceCalculators(name, features)); } - return new CMetricPopulationModel(this->modelParams(), - dataGatherer, - this->defaultFeatureModels(features, dataGatherer->bucketLength(), 1.0, false), - this->defaultCorrelatePriors(features), - this->defaultCorrelates(features), - influenceCalculators); + return new CMetricPopulationModel( + this->modelParams(), + dataGatherer, + this->defaultFeatureModels(features, dataGatherer->bucketLength(), this->minimumSeasonalVarianceScale(), false), + this->defaultCorrelatePriors(features), + this->defaultCorrelates(features), + influenceCalculators); } CAnomalyDetectorModel* CMetricPopulationModelFactory::makeModel(const SModelInitializationData& initData, @@ -284,6 +285,10 @@ void CMetricPopulationModelFactory::bucketResultsDelay(std::size_t bucketResults m_BucketResultsDelay = bucketResultsDelay; } +double CMetricPopulationModelFactory::minimumSeasonalVarianceScale() const { + return 1.0; +} + CMetricPopulationModelFactory::TStrCRefVec CMetricPopulationModelFactory::partitioningFields() const { TStrCRefVec result; result.reserve(3); diff --git a/lib/model/Makefile b/lib/model/Makefile index 3a20287f4a..76c8a5b0d7 100644 --- a/lib/model/Makefile +++ b/lib/model/Makefile @@ -17,6 +17,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=$(OBJS_DIR)/libMlModel$(DYNAMIC_LIB_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -47,6 +48,7 @@ CEventRatePopulationModel.cc \ CEventRatePopulationModelFactory.cc \ CFeatureData.cc \ CForecastDataSink.cc \ +CForecastModelPersist.cc \ CGathererTools.cc \ CHierarchicalResults.cc \ CHierarchicalResultsAggregator.cc \ diff --git a/lib/model/unittest/CForecastModelPersistTest.cc b/lib/model/unittest/CForecastModelPersistTest.cc new file mode 100644 index 0000000000..a72875fc08 --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.cc @@ -0,0 +1,149 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#include "CForecastModelPersistTest.h" + +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +using namespace ml; +using namespace model; + +void CForecastModelPersistTest::testPersistAndRestore() { + LOG_DEBUG("+----------------------------------------------------+"); + LOG_DEBUG("| CForecastModelPersistTest::testPersistAndRestore |"); + LOG_DEBUG("+----------------------------------------------------+"); + + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + params.s_DecayRate = 0.001; + params.s_LearnRate = 1.0; + maths::CTimeSeriesDecomposition trend(params.s_DecayRate, bucketLength); + maths::CNormalMeanPrecConjugate prior{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_ContinuousData, params.s_DecayRate)}; + maths::CModelParams timeSeriesModelParams{bucketLength, params.s_LearnRate, params.s_DecayRate, minimumSeasonalVarianceScale}; + maths::CUnivariateTimeSeriesModel timeSeriesModel{timeSeriesModelParams, 1, trend, prior}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + persister.addModel(&timeSeriesModel, model_t::EFeature::E_IndividualCountByBucketAndPerson, "some_by_field"); + + maths::CNormalMeanPrecConjugate otherPrior{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_MixedData, params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModel{timeSeriesModelParams, 2, trend, otherPrior}; + + persister.addModel(&otherTimeSeriesModel, model_t::EFeature::E_IndividualLowMeanByPerson, "some_other_by_field"); + + maths::CNormalMeanPrecConjugate otherPriorEmptyByField{ + maths::CNormalMeanPrecConjugate::nonInformativePrior(maths_t::E_DiscreteData, params.s_DecayRate)}; + maths::CUnivariateTimeSeriesModel otherTimeSeriesModelEmptyByField{timeSeriesModelParams, 3, trend, otherPriorEmptyByField}; + + persister.addModel(&otherTimeSeriesModelEmptyByField, model_t::EFeature::E_IndividualHighMedianByPerson, ""); + std::string persistedModels = persister.finalizePersistAndGetFile(); + + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + // test timeSeriesModel + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualCountByBucketAndPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(1), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_ContinuousData, restoredModel->dataType()); + + CForecastModelPersist::TMathsModelPtr timeSeriesModelForForecast{timeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + + CPPUNIT_ASSERT_EQUAL(timeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModel + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualLowMeanByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string("some_other_by_field"), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(2), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_MixedData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelForForecast{otherTimeSeriesModel.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->params().learnRate(), restoredModel->params().learnRate()); + CPPUNIT_ASSERT_EQUAL(params.s_DecayRate, restoredModel->params().decayRate()); + CPPUNIT_ASSERT_EQUAL(minimumSeasonalVarianceScale, restoredModel->params().minimumSeasonalVarianceScale()); + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelForForecast->checksum(42), restoredModel->checksum(42)); + + // test otherTimeSeriesModelEmptyByField + restoredModel.reset(); + CPPUNIT_ASSERT(restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + CPPUNIT_ASSERT(restoredModel); + CPPUNIT_ASSERT_EQUAL(model_t::EFeature::E_IndividualHighMedianByPerson, restoredFeature); + CPPUNIT_ASSERT_EQUAL(std::string(), restoredByFieldValue); + CPPUNIT_ASSERT_EQUAL(bucketLength, restoredModel->params().bucketLength()); + CPPUNIT_ASSERT_EQUAL(size_t(3), restoredModel->identifier()); + CPPUNIT_ASSERT_EQUAL(maths_t::E_DiscreteData, restoredModel->dataType()); + CForecastModelPersist::TMathsModelPtr otherTimeSeriesModelEmptyByFieldForForecast{ + otherTimeSeriesModelEmptyByField.cloneForForecast()}; + CPPUNIT_ASSERT_EQUAL(otherTimeSeriesModelEmptyByFieldForForecast->checksum(42), restoredModel->checksum(42)); + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } + std::remove(persistedModels.c_str()); +} + +void CForecastModelPersistTest::testPersistAndRestoreEmpty() { + core_t::TTime bucketLength{1800}; + double minimumSeasonalVarianceScale = 0.2; + SModelParams params{bucketLength}; + + CForecastModelPersist::CPersist persister(ml::test::CTestTmpDir::tmpDir()); + std::string persistedModels = persister.finalizePersistAndGetFile(); + { + CForecastModelPersist::CRestore restorer(params, minimumSeasonalVarianceScale, persistedModels); + CForecastModelPersist::TMathsModelPtr restoredModel; + std::string restoredByFieldValue; + model_t::EFeature restoredFeature; + + CPPUNIT_ASSERT(!restorer.nextModel(restoredModel, restoredFeature, restoredByFieldValue)); + } + std::remove(persistedModels.c_str()); +} + +CppUnit::Test* CForecastModelPersistTest::suite(void) { + CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CForecastModelPersistTest"); + + suiteOfTests->addTest(new CppUnit::TestCaller("CForecastModelPersistTest::testPersistAndRestore", + &CForecastModelPersistTest::testPersistAndRestore)); + suiteOfTests->addTest(new CppUnit::TestCaller("CForecastModelPersistTest::testPersistAndRestoreEmpty", + &CForecastModelPersistTest::testPersistAndRestoreEmpty)); + + return suiteOfTests; +} diff --git a/lib/model/unittest/CForecastModelPersistTest.h b/lib/model/unittest/CForecastModelPersistTest.h new file mode 100644 index 0000000000..bfc6edf7da --- /dev/null +++ b/lib/model/unittest/CForecastModelPersistTest.h @@ -0,0 +1,29 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * + * Copyright (c) 2018 Elasticsearch BV. All Rights Reserved. + * + * Notice: this software, and all information contained + * therein, is the exclusive property of Elasticsearch BV + * and its licensors, if any, and is protected under applicable + * domestic and foreign law, and international treaties. + * + * Reproduction, republication or distribution without the + * express written consent of Elasticsearch BV is + * strictly prohibited. + */ + +#ifndef INCLUDED_CForecastModelPersistTest_h +#define INCLUDED_CForecastModelPersistTest_h + +#include + +class CForecastModelPersistTest : public CppUnit::TestFixture { +public: + void testPersistAndRestore(); + void testPersistAndRestoreEmpty(); + + static CppUnit::Test* suite(); +}; + +#endif // INCLUDED_CForecastModelPersistTest_h diff --git a/lib/model/unittest/Main.cc b/lib/model/unittest/Main.cc index e2bae3f34f..2d6d64fdf4 100644 --- a/lib/model/unittest/Main.cc +++ b/lib/model/unittest/Main.cc @@ -27,6 +27,7 @@ #include "CEventRateModelTest.h" #include "CEventRatePopulationDataGathererTest.h" #include "CEventRatePopulationModelTest.h" +#include "CForecastModelPersistTest.h" #include "CFunctionTypesTest.h" #include "CGathererToolsTest.h" #include "CHierarchicalResultsLevelSetTest.h" @@ -67,6 +68,7 @@ int main(int argc, const char** argv) { runner.addTest(CEventRatePopulationDataGathererTest::suite()); runner.addTest(CEventRatePopulationModelTest::suite()); runner.addTest(CFunctionTypesTest::suite()); + runner.addTest(CForecastModelPersistTest::suite()); runner.addTest(CGathererToolsTest::suite()); runner.addTest(CHierarchicalResultsTest::suite()); runner.addTest(CHierarchicalResultsLevelSetTest::suite()); diff --git a/lib/model/unittest/Makefile b/lib/model/unittest/Makefile index 67bd4417f8..a34c137f36 100644 --- a/lib/model/unittest/Makefile +++ b/lib/model/unittest/Makefile @@ -17,6 +17,7 @@ include $(CPP_SRC_HOME)/mk/defines.mk TARGET=ml_test$(EXE_EXT) USE_BOOST=1 +USE_BOOST_FILESYSTEM_LIBS=1 USE_RAPIDJSON=1 USE_EIGEN=1 @@ -39,6 +40,7 @@ SRCS=\ CEventRateModelTest.cc \ CEventRatePopulationDataGathererTest.cc \ CEventRatePopulationModelTest.cc \ + CForecastModelPersistTest.cc \ CFunctionTypesTest.cc \ CGathererToolsTest.cc \ CHierarchicalResultsTest.cc \