Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ int main(int argc, char** argv) {
ml::counter_t::E_TSADNumberSamplesOutsideLatencyWindow,
ml::counter_t::E_TSADNumberMemoryLimitModelCreationFailures,
ml::counter_t::E_TSADNumberPrunedItems,
ml::counter_t::E_TSADAssignmentMemoryBasis};
ml::counter_t::E_TSADAssignmentMemoryBasis,
ml::counter_t::E_TSADOutputMemoryAllocatorUsage};

ml::core::CProgramCounters::registerProgramCounterTypes(counters);

Expand Down
6 changes: 6 additions & 0 deletions dev-tools/run_es_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ SELECTED_BRANCH=main

function pickCloneTarget {

if isCloneTargetValid "$GITHUB_PR_OWNER" "$GITHUB_PR_BRANCH" ; then
SELECTED_FORK="$GITHUB_PR_OWNER"
SELECTED_BRANCH="$GITHUB_PR_BRANCH"
return 0
fi

if isCloneTargetValid "$PR_AUTHOR" "$PR_SOURCE_BRANCH" ; then
SELECTED_FORK="$PR_AUTHOR"
SELECTED_BRANCH="$PR_SOURCE_BRANCH"
Expand Down
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
=== Bug Fixes

* Remove ineffective optimizations for duplicate strings. (See {ml-pull}2652[#2652], issue: {ml-issue}2130[#2130].)
* Use custom Boost.JSON resource allocator. (See {ml-pull}2674[#2674].)

== {es} version 8.13.0

Expand Down
3 changes: 3 additions & 0 deletions include/api/CJsonOutputWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ class API_EXPORT CJsonOutputWriter {
//! from the CResourceMonitor via a callback
void reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& modelSizeStats);

//! Return the number of bytes currently used to output JSON documents. .
std::size_t getOutputMemoryAllocatorUsage() const;

//! Write categorizer stats
void writeCategorizerStats(const std::string& partitionFieldName,
const std::string& partitionFieldValue,
Expand Down
50 changes: 40 additions & 10 deletions include/core/CBoostJsonPoolAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,44 @@ namespace json = boost::json;

namespace ml {
namespace core {

namespace {

class custom_resource : public boost::container::pmr::memory_resource {
public:
custom_resource(std::size_t& allocatedBytes)
: m_AllocatedBytes{allocatedBytes} {}
std::size_t allocatedBytes() const { return m_AllocatedBytes; }

private:
void* do_allocate(std::size_t bytes, std::size_t /*align*/) override {
m_AllocatedBytes += bytes;
return ::operator new(bytes);
}

void do_deallocate(void* ptr, std::size_t bytes, std::size_t /*align*/) override {
m_AllocatedBytes -= bytes;
return ::operator delete(ptr);
}

bool do_is_equal(memory_resource const& other) const noexcept override {
// since the global allocation and de-allocation functions are used,
// any instance of a custom_resource can deallocate memory allocated
// by another instance of a logging_resource
return dynamic_cast<custom_resource const*>(&other) != nullptr;
}

private:
std::size_t& m_AllocatedBytes;
};
}
//! \brief
//! A boost::json memory allocator using a fixed size buffer
//! A custom boost::json memory allocator
//!
//! DESCRIPTION:\n
//! Encapsulates a boost::json monotonic_resource optimized with a fixed size buffer, see https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html
//! Encapsulates a custom boost::json memory_resource, see https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html
//!
//! IMPLEMENTATION DECISIONS:\n
//! Use a fixed size buffer for the allocator for performance reasons
//!
//! Retain documents created to ensure that the associated memory allocator exists for the documents
//! lifetime
Expand Down Expand Up @@ -57,17 +87,17 @@ class CBoostJsonPoolAllocator {
//! \return reference to the underlying storage pointer
json::storage_ptr& get() { return m_JsonStoragePointer; }

private:
//! Size of the fixed buffer to allocate for parsing JSON
static const size_t FIXED_BUFFER_SIZE = 4096;
std::size_t getAllocatedBytes() const { return m_AllocatedBytes; }

private:
//! fixed size memory buffer used to optimize allocator performance
unsigned char m_FixedBuffer[FIXED_BUFFER_SIZE];

std::size_t m_AllocatedBytes{0};
//! storage pointer to use for allocating boost::json objects
//! We use a custom resource allocator for more predictable
//! and timely allocation/de-allocations, see
//! https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html#json.allocators.storage_ptr.user_defined_resource
//! for more details.
json::storage_ptr m_JsonStoragePointer{
json::make_shared_resource<json::monotonic_resource>(m_FixedBuffer)};
json::make_shared_resource<custom_resource>(m_AllocatedBytes)};

//! Container used to persist boost::json documents
TDocumentPtrVec m_JsonDocumentStore;
Expand Down
9 changes: 9 additions & 0 deletions include/core/CBoostJsonWriterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include <cmath>
#include <memory>
#include <numeric>
#include <regex>
#include <stack>

namespace json = boost::json;
Expand Down Expand Up @@ -84,6 +86,13 @@ class CBoostJsonWriterBase {

void reset(OUTPUT_STREAM& os) { m_Os = &os; }

std::size_t getOutputMemoryAllocatorUsage() const {
return std::accumulate(m_AllocatorCache.begin(), m_AllocatorCache.end(),
0l, [](std::size_t a, auto& b) {
return a + b.second->getAllocatedBytes();
});
}

//! Push a named allocator on to the stack
//! Look in the cache for the allocator - creating it if not present
void pushAllocator(const std::string& allocatorName) {
Expand Down
7 changes: 6 additions & 1 deletion include/core/CProgramCounters.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ enum ECounterTypes {
//! Which option is being used to get model memory for node assignment?
E_TSADAssignmentMemoryBasis = 29,

//! The memory currently used by the allocators to output JSON documents, in bytes.
E_TSADOutputMemoryAllocatorUsage = 30,

// Data Frame Outlier Detection

//! The estimated peak memory usage for outlier detection in bytes
Expand Down Expand Up @@ -143,7 +146,7 @@ enum ECounterTypes {
// Add any new values here

//! This MUST be last, increment the value for every new enum added
E_LastEnumCounter = 30
E_LastEnumCounter = 31
};

static constexpr std::size_t NUM_COUNTERS = static_cast<std::size_t>(E_LastEnumCounter);
Expand Down Expand Up @@ -350,6 +353,8 @@ class CORE_EXPORT CProgramCounters {
"The number of old people or attributes pruned from the models"},
{counter_t::E_TSADAssignmentMemoryBasis, "E_TSADAssignmentMemoryBasis",
"Which option is being used to get model memory for node assignment?"},
{counter_t::E_TSADOutputMemoryAllocatorUsage, "E_TSADOutputMemoryAllocatorUsage",
"The amount of memory used to output JSON documents, in bytes."},
{counter_t::E_DFOEstimatedPeakMemoryUsage, "E_DFOEstimatedPeakMemoryUsage",
"The upfront estimate of the peak memory outlier detection would use"},
{counter_t::E_DFOPeakMemoryUsage, "E_DFOPeakMemoryUsage", "The peak memory outlier detection used"},
Expand Down
1 change: 1 addition & 0 deletions include/model/CResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MODEL_EXPORT CResourceMonitor {
core_t::TTime s_BucketStartTime{0};
std::size_t s_BytesExceeded{0};
std::size_t s_BytesMemoryLimit{0};
std::size_t s_OutputMemoryAllocatorUsage{0};
SCategorizerStats s_OverallCategorizerStats;
};

Expand Down
3 changes: 3 additions & 0 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ void CAnomalyJob::writeOutResults(bool interim,
bucketTime, results.root()->s_AnnotatedProbability.s_Probability,
results.root()->s_RawAnomalyScore, results.root()->s_NormalizedAnomalyScore);

core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage) =
m_JsonOutputWriter.getOutputMemoryAllocatorUsage();

if (m_JsonOutputWriter.endOutputBatch(interim, processingTime) == false) {
LOG_ERROR(<< "Problem writing anomaly output");
}
Expand Down
4 changes: 4 additions & 0 deletions lib/api/CJsonOutputWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,10 @@ void CJsonOutputWriter::popAllocator() {
m_Writer.popAllocator();
}

std::size_t CJsonOutputWriter::getOutputMemoryAllocatorUsage() const {
return m_Writer.getOutputMemoryAllocatorUsage();
}

void CJsonOutputWriter::reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& results) {
m_Writer.onObjectBegin();
CModelSizeStatsJsonWriter::write(m_JobId, results, m_Writer);
Expand Down
4 changes: 4 additions & 0 deletions lib/api/CModelSizeStatsJsonWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const std::string LOG_TIME{"log_time"};
const std::string CATEGORIZER_STATS{"categorizer_stats"};
const std::string PARTITION_FIELD_NAME{"partition_field_name"};
const std::string PARTITION_FIELD_VALUE{"partition_field_value"};
const std::string OUTPUT_MEMORY_ALLOCATOR_BYTES("output_memory_allocator_bytes");
}

void CModelSizeStatsJsonWriter::write(const std::string& jobId,
Expand Down Expand Up @@ -85,6 +86,9 @@ void CModelSizeStatsJsonWriter::write(const std::string& jobId,
writer.onString(model_t::print(results.s_AssignmentMemoryBasis));
}

writer.onKey(OUTPUT_MEMORY_ALLOCATOR_BYTES);
writer.onUint64(results.s_OutputMemoryAllocatorUsage);

CModelSizeStatsJsonWriter::writeCommonFields(
jobId, results.s_OverallCategorizerStats, results.s_BucketStartTime, writer);

Expand Down
10 changes: 5 additions & 5 deletions lib/api/unittest/CAnomalyJobLimitTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) {
std::size_t s_ExpectedByMemoryUsageRelativeErrorDivisor;
std::size_t s_ExpectedPartitionUsageRelativeErrorDivisor;
std::size_t s_ExpectedOverUsageRelativeErrorDivisor;
} testParams[]{{600, 500, 6000, 290, 33, 25, 30},
{3600, 500, 5500, 280, 27, 25, 20},
{172800, 150, 850, 110, 6, 5, 3}};
} testParams[]{{600, 500, 5200, 290, 27, 25, 2},
{3600, 500, 5500, 280, 27, 25, 2},
{172800, 65, 850, 50, 6, 5, 2}};

for (const auto& testParam : testParams) {
TGeneratorVec generators{periodic, tradingDays, level, ramp, sparse};
Expand Down Expand Up @@ -442,7 +442,7 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) {
LOG_DEBUG(<< "**** Test over with bucketLength = " << testParam.s_BucketLength
<< " ****");
{
std::size_t memoryLimit{5 /*MB*/};
std::size_t memoryLimit{20 /*MB*/};
model::CLimits limits;
limits.resourceMonitor().memoryLimit(memoryLimit);
ml::api::CAnomalyJobConfig jobConfig =
Expand Down Expand Up @@ -481,7 +481,7 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) {
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
LOG_DEBUG(<< "Memory limit bytes = " << memoryLimit * 1024 * 1024);
BOOST_TEST_REQUIRE(used.s_OverFields > testParam.s_ExpectedOverFields);
BOOST_TEST_REQUIRE(used.s_OverFields < 7100);
BOOST_TEST_REQUIRE(used.s_OverFields <= 9000);
BOOST_REQUIRE_CLOSE_ABSOLUTE(
memoryLimit * core::constants::BYTES_IN_MEGABYTES / 2, used.s_Usage,
memoryLimit * core::constants::BYTES_IN_MEGABYTES /
Expand Down
3 changes: 3 additions & 0 deletions lib/api/unittest/CModelSnapshotJsonWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using namespace api;
BOOST_AUTO_TEST_CASE(testWrite) {
std::ostringstream sstream;

// clang-format off
// The output writer won't close the JSON structures until is is destroyed
{
model::CResourceMonitor::SModelSizeStats modelSizeStats{
Expand All @@ -44,13 +45,15 @@ BOOST_AUTO_TEST_CASE(testWrite) {
core_t::TTime(1521046309), // bucket start time
0, // model bytes exceeded
50000, // model bytes memory limit
60000, // JSON memory allocator bytes used
{1000, // categorized messages
100, // total categories
7, // frequent categories
13, // rare categories
2, // dead categories
8, // failed categories
model_t::E_CategorizationStatusWarn}};
// clang-format on

CModelSnapshotJsonWriter::SModelSnapshotReport report{
"6.3.0",
Expand Down
1 change: 1 addition & 0 deletions lib/model/CForecastDataSink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ CForecastDataSink::CForecastDataSink(const std::string& jobId,
m_Writer(outStream), m_NumRecordsWritten(0), m_CreateTime(createTime),
m_StartTime(startTime), m_EndTime(endTime), m_ExpiryTime(expiryTime),
m_MemoryUsage(memoryUsage) {
m_MemoryUsage += m_Writer.getOutputMemoryAllocatorUsage();
}

void CForecastDataSink::writeStats(const double progress,
Expand Down
6 changes: 5 additions & 1 deletion lib/model/CResourceMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ CResourceMonitor::createMemoryUsageReport(core_t::TTime bucketStartTime) {
resource.first->updateModelSizeStats(res);
}
res.s_AllocationFailures += m_AllocationFailuresCount;
res.s_OutputMemoryAllocatorUsage = static_cast<std::size_t>(
core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage));
res.s_OverallCategorizerStats.s_MemoryCategorizationFailures += m_CategorizerAllocationFailures;
return res;
}
Expand Down Expand Up @@ -487,7 +489,9 @@ std::size_t CResourceMonitor::lowLimit() const {
std::size_t CResourceMonitor::totalMemory() const {
return m_MonitoredResourceCurrentMemory + m_ExtraMemory +
CStringStore::names().memoryUsage() +
CStringStore::influencers().memoryUsage();
CStringStore::influencers().memoryUsage() +
static_cast<size_t>(core::CProgramCounters::counter(
counter_t::E_TSADOutputMemoryAllocatorUsage));
}

} // model
Expand Down