Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve autodetect logic for persistence #437

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ to the model. (See {pull}214[#214].)

* Handle NaNs when detrending seasonal components. {ml-pull}408[#408]

* Improve autodetect logic for persistence. {ml-pull}437[#437]

== {es} version 7.0.0-alpha1

== {es} version 6.7.0
Expand Down
6 changes: 6 additions & 0 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! How many records did we handle?
virtual uint64_t numRecordsHandled() const;

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Log a list of the detectors and keys
void description() const;

Expand Down Expand Up @@ -454,6 +457,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! The hierarchical results normalizer.
model::CHierarchicalResultsNormalizer m_Normalizer;

//! Flag indicating whether or not time has been advanced.
bool m_TimeAdvanced{false};

friend class ::CBackgroundPersisterTest;
friend class ::CAnomalyJobTest;
};
Expand Down
3 changes: 3 additions & 0 deletions include/api/CDataProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class API_EXPORT CDataProcessor : private core::CNonCopyable {
//! Access the output handler
virtual COutputHandler& outputHandler() = 0;

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const = 0;

//! Create debug for a record. This is expensive so should NOT be
//! called for every record as a matter of course.
static std::string debugPrintRecord(const TStrStrUMap& dataRowFields);
Expand Down
3 changes: 3 additions & 0 deletions include/api/CFieldDataTyper.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class API_EXPORT CFieldDataTyper : public CDataProcessor {
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
core_t::TTime& completeToTime);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Persist current state
virtual bool persistState(core::CDataAdder& persister);

Expand Down
3 changes: 3 additions & 0 deletions include/api/COutputChainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class API_EXPORT COutputChainer : public COutputHandler {
//! Persist current state due to the periodic persistence being triggered.
virtual bool periodicPersistState(CBackgroundPersister& persister);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! The chainer does consume control messages, because it passes them on
//! to whatever processor it's chained to.
virtual bool consumesControlMessages();
Expand Down
3 changes: 3 additions & 0 deletions include/api/COutputHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class API_EXPORT COutputHandler : private core::CNonCopyable {
//! Persist current state due to the periodic persistence being triggered.
virtual bool periodicPersistState(CBackgroundPersister& persister);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Does this handler deal with control messages?
virtual bool consumesControlMessages();

Expand Down
3 changes: 3 additions & 0 deletions include/config/CAutoconfigurer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class CONFIG_EXPORT CAutoconfigurer : public api::CDataProcessor {
//! Generate the report.
virtual void finalise();

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! No-op.
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
core_t::TTime& completeToTime);
Expand Down
18 changes: 16 additions & 2 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields) {
}

void CAnomalyJob::finalise() {
// Persist final state of normalizer
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
// Persist final state of normalizer iff an input record has been handled or time has been advanced.
if (this->isPersistenceNeeded("quantiles state")) {
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
}

// Prune the models so that the final persisted state is as neat as possible
this->pruneAllModels();
Expand Down Expand Up @@ -396,11 +398,23 @@ void CAnomalyJob::advanceTime(const std::string& time_) {
LOG_TRACE(<< "Received request to advance time to " << time);
}

m_TimeAdvanced = true;

this->outputBucketResultsUntil(time);

this->timeNow(time);
}

bool CAnomalyJob::isPersistenceNeeded(const std::string& description) const {
if ((m_NumRecordsHandled == 0) && (m_TimeAdvanced == false)) {
LOG_DEBUG(<< "Will not attempt to persist " << description
<< ". Zero records were handled and time has not been advanced.");
return false;
}

return true;
}

void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
// If the bucket time has increased, output results for all field names
core_t::TTime bucketLength = m_ModelConfig.bucketLength();
Expand Down
3 changes: 1 addition & 2 deletions lib/api/CCmdSkeleton.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ bool CCmdSkeleton::persistState() {
return true;
}

if (m_Processor.numRecordsHandled() == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist state");
if (m_Processor.isPersistenceNeeded("state") == false) {
return true;
}

Expand Down
14 changes: 14 additions & 0 deletions lib/api/CFieldDataTyper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ bool CFieldDataTyper::persistState(core::CDataAdder& persister) {
return this->doPersistState(m_DataTyper->makePersistFunc(), m_ExamplesCollector, persister);
}

bool CFieldDataTyper::isPersistenceNeeded(const std::string& description) const {
// Pass on the request in case we're chained
if (m_OutputHandler.isPersistenceNeeded(description)) {
return true;
}

if (m_NumRecordsHandled == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
<< description << ".");
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a categorizer chained to an anomaly detector this might not be correct - it's possible that time has been advanced on the anomaly detector.

So this method needs a // Pass on the request in case we're chained step like some of the other methods in this class have. If the chained processor's isPersistenceNeeded() returns true then this method should return true, otherwise it should make its own decision using the code that's there now.

}
return true;
}

bool CFieldDataTyper::doPersistState(const CDataTyper::TPersistFunc& dataTyperPersistFunc,
const CCategoryExamplesCollector& examplesCollector,
core::CDataAdder& persister) {
Expand Down
4 changes: 4 additions & 0 deletions lib/api/COutputChainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ bool COutputChainer::periodicPersistState(CBackgroundPersister& persister) {
return m_DataProcessor.periodicPersistState(persister);
}

bool COutputChainer::isPersistenceNeeded(const std::string& description) const {
return m_DataProcessor.isPersistenceNeeded(description);
}

bool COutputChainer::consumesControlMessages() {
return true;
}
Expand Down
5 changes: 5 additions & 0 deletions lib/api/COutputHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ bool COutputHandler::periodicPersistState(CBackgroundPersister& /* persister */)
return true;
}

bool COutputHandler::isPersistenceNeeded(const std::string& /*description*/) const {
// NOOP unless overridden
return false;
}

COutputHandler::CPreComputedHash::CPreComputedHash(size_t hash) : m_Hash(hash) {
}

Expand Down
105 changes: 105 additions & 0 deletions lib/api/unittest/CAnomalyJobTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,109 @@ void CAnomalyJobTest::testSkipTimeControlMessage() {
CPPUNIT_ASSERT_EQUAL(std::size_t(11), countBuckets("bucket", outputStrm.str() + "]"));
}

void CAnomalyJobTest::testIsPersistenceNeeded() {

model::CLimits limits;
api::CFieldConfig fieldConfig;
api::CFieldConfig::TStrVec clauses;
clauses.push_back("count");
fieldConfig.initFromClause(clauses);
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);

{
// check that persistence is not needed if no input records have been handled
// and the time has not been advanced

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

CPPUNIT_ASSERT_EQUAL(false, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that no quantile state was persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(false, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}

core_t::TTime time = 3600;
{
// check that persistence is needed if an input record has been handled

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

api::CAnomalyJob::TStrStrUMap dataRows;

std::ostringstream ss;
ss << time;
dataRows["time"] = ss.str();
CPPUNIT_ASSERT(job.handleRecord(dataRows));

CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that the quantile state has actually been persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}

{
// check that persistence is needed if time has been advanced (via a control message)
// even if no input data has been handled

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

api::CAnomalyJob::TStrStrUMap dataRows;

time = 39600;
dataRows["."] = "t39600";
CPPUNIT_ASSERT(job.handleRecord(dataRows));
CPPUNIT_ASSERT(job.isPersistenceNeeded("test state"));

CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that the quantile state has actually been persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}
}

void CAnomalyJobTest::testModelPlot() {
core_t::TTime bucketSize = 10000;
model::CLimits limits;
Expand Down Expand Up @@ -651,6 +754,8 @@ CppUnit::Test* CAnomalyJobTest::suite() {
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testSkipTimeControlMessage",
&CAnomalyJobTest::testSkipTimeControlMessage));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testIsPersistenceNeeded", &CAnomalyJobTest::testIsPersistenceNeeded));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testModelPlot", &CAnomalyJobTest::testModelPlot));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
Expand Down
1 change: 1 addition & 0 deletions lib/api/unittest/CAnomalyJobTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CAnomalyJobTest : public CppUnit::TestFixture {
void testOutOfSequence();
void testControlMessages();
void testSkipTimeControlMessage();
void testIsPersistenceNeeded();
void testModelPlot();
void testInterimResultEdgeCases();
void testRestoreFailsWithEmptyStream();
Expand Down
9 changes: 9 additions & 0 deletions lib/api/unittest/CMockDataProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ bool CMockDataProcessor::handleRecord(const TStrStrUMap& dataRowFields) {
void CMockDataProcessor::finalise() {
}

bool CMockDataProcessor::isPersistenceNeeded(const std::string& description) const {
if (m_NumRecordsHandled == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
<< description << ".");
return false;
}
return true;
}

bool CMockDataProcessor::restoreState(ml::core::CDataSearcher& restoreSearcher,
ml::core_t::TTime& completeToTime) {
// Pass on the request in case we're chained
Expand Down
2 changes: 2 additions & 0 deletions lib/api/unittest/CMockDataProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class CMockDataProcessor : public ml::api::CDataProcessor {

virtual void finalise();

virtual bool isPersistenceNeeded(const std::string& description) const;

//! Restore previously saved state
virtual bool restoreState(ml::core::CDataSearcher& restoreSearcher,
ml::core_t::TTime& completeToTime);
Expand Down
4 changes: 4 additions & 0 deletions lib/config/CAutoconfigurer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ void CAutoconfigurer::finalise() {
m_Impl->finalise();
}

bool CAutoconfigurer::isPersistenceNeeded(const std::string& /*description*/) const {
return false;
}

bool CAutoconfigurer::restoreState(core::CDataSearcher& /*restoreSearcher*/,
core_t::TTime& /*completeToTime*/) {
return true;
Expand Down