From 56885d3a5d63a5f0187f0f2be64ab45d2a591b6b Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 18 Jun 2024 17:01:32 +1200 Subject: [PATCH] [ML] Use custom Boost::JSON allocator (#2674) The current code uses the monotonic resource allocator, for allocating memory to boost::json objects, which allocates memory in ever increasing chunks, which can lead to over allocation. The other disadvantage of the monotonic resource allocator is that no deallocations are performed until the resource allocator is destroyed - hence the name monotonic as resource allocations can only increase during its lifetime. These factors make the choice of the monotonic resource allocator unsuitable for its current use. This PR introduces a very simplistic custom allocator that allocates and deallocates individual objects upon request using standard operator ::new and ::delete. This gives a much better experience as only as much memory is allocated at any point in time as absolutely needs to be, and gives a much more predictable memory profile --- bin/autodetect/Main.cc | 3 +- dev-tools/run_es_tests.sh | 6 +++ docs/CHANGELOG.asciidoc | 1 + include/api/CJsonOutputWriter.h | 3 ++ include/core/CBoostJsonPoolAllocator.h | 50 +++++++++++++++---- include/core/CBoostJsonWriterBase.h | 9 ++++ include/core/CProgramCounters.h | 7 ++- include/model/CResourceMonitor.h | 1 + lib/api/CAnomalyJob.cc | 3 ++ lib/api/CJsonOutputWriter.cc | 4 ++ lib/api/CModelSizeStatsJsonWriter.cc | 4 ++ lib/api/unittest/CAnomalyJobLimitTest.cc | 10 ++-- .../unittest/CModelSnapshotJsonWriterTest.cc | 3 ++ lib/model/CForecastDataSink.cc | 1 + lib/model/CResourceMonitor.cc | 6 ++- 15 files changed, 93 insertions(+), 18 deletions(-) diff --git a/bin/autodetect/Main.cc b/bin/autodetect/Main.cc index a39e25f600..904920e3db 100644 --- a/bin/autodetect/Main.cc +++ b/bin/autodetect/Main.cc @@ -82,7 +82,8 @@ int main(int argc, char** argv) { ml::counter_t::E_TSADNumberSamplesOutsideLatencyWindow, ml::counter_t::E_TSADNumberMemoryLimitModelCreationFailures, ml::counter_t::E_TSADNumberPrunedItems, - ml::counter_t::E_TSADAssignmentMemoryBasis}; + ml::counter_t::E_TSADAssignmentMemoryBasis, + ml::counter_t::E_TSADOutputMemoryAllocatorUsage}; ml::core::CProgramCounters::registerProgramCounterTypes(counters); diff --git a/dev-tools/run_es_tests.sh b/dev-tools/run_es_tests.sh index b2d92ad174..9534083579 100755 --- a/dev-tools/run_es_tests.sh +++ b/dev-tools/run_es_tests.sh @@ -40,6 +40,12 @@ SELECTED_BRANCH=main function pickCloneTarget { + if isCloneTargetValid "$GITHUB_PR_OWNER" "$GITHUB_PR_BRANCH" ; then + SELECTED_FORK="$GITHUB_PR_OWNER" + SELECTED_BRANCH="$GITHUB_PR_BRANCH" + return 0 + fi + if isCloneTargetValid "$PR_AUTHOR" "$PR_SOURCE_BRANCH" ; then SELECTED_FORK="$PR_AUTHOR" SELECTED_BRANCH="$PR_SOURCE_BRANCH" diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index d42e6a543e..98e24cb3b3 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -51,6 +51,7 @@ === Bug Fixes * Remove ineffective optimizations for duplicate strings. (See {ml-pull}2652[#2652], issue: {ml-issue}2130[#2130].) +* Use custom Boost.JSON resource allocator. (See {ml-pull}2674[#2674].) == {es} version 8.13.0 diff --git a/include/api/CJsonOutputWriter.h b/include/api/CJsonOutputWriter.h index 3df15f1fd8..d17469ac7c 100644 --- a/include/api/CJsonOutputWriter.h +++ b/include/api/CJsonOutputWriter.h @@ -238,6 +238,9 @@ class API_EXPORT CJsonOutputWriter { //! from the CResourceMonitor via a callback void reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& modelSizeStats); + //! Return the number of bytes currently used to output JSON documents. . + std::size_t getOutputMemoryAllocatorUsage() const; + //! Write categorizer stats void writeCategorizerStats(const std::string& partitionFieldName, const std::string& partitionFieldValue, diff --git a/include/core/CBoostJsonPoolAllocator.h b/include/core/CBoostJsonPoolAllocator.h index 53788e65e3..69ec04e736 100644 --- a/include/core/CBoostJsonPoolAllocator.h +++ b/include/core/CBoostJsonPoolAllocator.h @@ -20,14 +20,44 @@ namespace json = boost::json; namespace ml { namespace core { + +namespace { + +class custom_resource : public boost::container::pmr::memory_resource { +public: + custom_resource(std::size_t& allocatedBytes) + : m_AllocatedBytes{allocatedBytes} {} + std::size_t allocatedBytes() const { return m_AllocatedBytes; } + +private: + void* do_allocate(std::size_t bytes, std::size_t /*align*/) override { + m_AllocatedBytes += bytes; + return ::operator new(bytes); + } + + void do_deallocate(void* ptr, std::size_t bytes, std::size_t /*align*/) override { + m_AllocatedBytes -= bytes; + return ::operator delete(ptr); + } + + bool do_is_equal(memory_resource const& other) const noexcept override { + // since the global allocation and de-allocation functions are used, + // any instance of a custom_resource can deallocate memory allocated + // by another instance of a logging_resource + return dynamic_cast(&other) != nullptr; + } + +private: + std::size_t& m_AllocatedBytes; +}; +} //! \brief -//! A boost::json memory allocator using a fixed size buffer +//! A custom boost::json memory allocator //! //! DESCRIPTION:\n -//! Encapsulates a boost::json monotonic_resource optimized with a fixed size buffer, see https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html +//! Encapsulates a custom boost::json memory_resource, see https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html //! //! IMPLEMENTATION DECISIONS:\n -//! Use a fixed size buffer for the allocator for performance reasons //! //! Retain documents created to ensure that the associated memory allocator exists for the documents //! lifetime @@ -57,17 +87,17 @@ class CBoostJsonPoolAllocator { //! \return reference to the underlying storage pointer json::storage_ptr& get() { return m_JsonStoragePointer; } -private: - //! Size of the fixed buffer to allocate for parsing JSON - static const size_t FIXED_BUFFER_SIZE = 4096; + std::size_t getAllocatedBytes() const { return m_AllocatedBytes; } private: - //! fixed size memory buffer used to optimize allocator performance - unsigned char m_FixedBuffer[FIXED_BUFFER_SIZE]; - + std::size_t m_AllocatedBytes{0}; //! storage pointer to use for allocating boost::json objects + //! We use a custom resource allocator for more predictable + //! and timely allocation/de-allocations, see + //! https://www.boost.org/doc/libs/1_83_0/libs/json/doc/html/json/allocators/storage_ptr.html#json.allocators.storage_ptr.user_defined_resource + //! for more details. json::storage_ptr m_JsonStoragePointer{ - json::make_shared_resource(m_FixedBuffer)}; + json::make_shared_resource(m_AllocatedBytes)}; //! Container used to persist boost::json documents TDocumentPtrVec m_JsonDocumentStore; diff --git a/include/core/CBoostJsonWriterBase.h b/include/core/CBoostJsonWriterBase.h index e0bee383c6..1ce185f478 100644 --- a/include/core/CBoostJsonWriterBase.h +++ b/include/core/CBoostJsonWriterBase.h @@ -25,6 +25,8 @@ #include #include +#include +#include #include namespace json = boost::json; @@ -84,6 +86,13 @@ class CBoostJsonWriterBase { void reset(OUTPUT_STREAM& os) { m_Os = &os; } + std::size_t getOutputMemoryAllocatorUsage() const { + return std::accumulate(m_AllocatorCache.begin(), m_AllocatorCache.end(), + 0l, [](std::size_t a, auto& b) { + return a + b.second->getAllocatedBytes(); + }); + } + //! Push a named allocator on to the stack //! Look in the cache for the allocator - creating it if not present void pushAllocator(const std::string& allocatorName) { diff --git a/include/core/CProgramCounters.h b/include/core/CProgramCounters.h index c934ed7901..3c4d10269f 100644 --- a/include/core/CProgramCounters.h +++ b/include/core/CProgramCounters.h @@ -109,6 +109,9 @@ enum ECounterTypes { //! Which option is being used to get model memory for node assignment? E_TSADAssignmentMemoryBasis = 29, + //! The memory currently used by the allocators to output JSON documents, in bytes. + E_TSADOutputMemoryAllocatorUsage = 30, + // Data Frame Outlier Detection //! The estimated peak memory usage for outlier detection in bytes @@ -143,7 +146,7 @@ enum ECounterTypes { // Add any new values here //! This MUST be last, increment the value for every new enum added - E_LastEnumCounter = 30 + E_LastEnumCounter = 31 }; static constexpr std::size_t NUM_COUNTERS = static_cast(E_LastEnumCounter); @@ -350,6 +353,8 @@ class CORE_EXPORT CProgramCounters { "The number of old people or attributes pruned from the models"}, {counter_t::E_TSADAssignmentMemoryBasis, "E_TSADAssignmentMemoryBasis", "Which option is being used to get model memory for node assignment?"}, + {counter_t::E_TSADOutputMemoryAllocatorUsage, "E_TSADOutputMemoryAllocatorUsage", + "The amount of memory used to output JSON documents, in bytes."}, {counter_t::E_DFOEstimatedPeakMemoryUsage, "E_DFOEstimatedPeakMemoryUsage", "The upfront estimate of the peak memory outlier detection would use"}, {counter_t::E_DFOPeakMemoryUsage, "E_DFOPeakMemoryUsage", "The peak memory outlier detection used"}, diff --git a/include/model/CResourceMonitor.h b/include/model/CResourceMonitor.h index 2cf8baeded..1c6375691d 100644 --- a/include/model/CResourceMonitor.h +++ b/include/model/CResourceMonitor.h @@ -63,6 +63,7 @@ class MODEL_EXPORT CResourceMonitor { core_t::TTime s_BucketStartTime{0}; std::size_t s_BytesExceeded{0}; std::size_t s_BytesMemoryLimit{0}; + std::size_t s_OutputMemoryAllocatorUsage{0}; SCategorizerStats s_OverallCategorizerStats; }; diff --git a/lib/api/CAnomalyJob.cc b/lib/api/CAnomalyJob.cc index 313abe0cec..0048d918c9 100644 --- a/lib/api/CAnomalyJob.cc +++ b/lib/api/CAnomalyJob.cc @@ -799,6 +799,9 @@ void CAnomalyJob::writeOutResults(bool interim, bucketTime, results.root()->s_AnnotatedProbability.s_Probability, results.root()->s_RawAnomalyScore, results.root()->s_NormalizedAnomalyScore); + core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage) = + m_JsonOutputWriter.getOutputMemoryAllocatorUsage(); + if (m_JsonOutputWriter.endOutputBatch(interim, processingTime) == false) { LOG_ERROR(<< "Problem writing anomaly output"); } diff --git a/lib/api/CJsonOutputWriter.cc b/lib/api/CJsonOutputWriter.cc index f9618d7992..3dff59637b 100644 --- a/lib/api/CJsonOutputWriter.cc +++ b/lib/api/CJsonOutputWriter.cc @@ -858,6 +858,10 @@ void CJsonOutputWriter::popAllocator() { m_Writer.popAllocator(); } +std::size_t CJsonOutputWriter::getOutputMemoryAllocatorUsage() const { + return m_Writer.getOutputMemoryAllocatorUsage(); +} + void CJsonOutputWriter::reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& results) { m_Writer.onObjectBegin(); CModelSizeStatsJsonWriter::write(m_JobId, results, m_Writer); diff --git a/lib/api/CModelSizeStatsJsonWriter.cc b/lib/api/CModelSizeStatsJsonWriter.cc index ee4cfebe29..43fef49602 100644 --- a/lib/api/CModelSizeStatsJsonWriter.cc +++ b/lib/api/CModelSizeStatsJsonWriter.cc @@ -45,6 +45,7 @@ const std::string LOG_TIME{"log_time"}; const std::string CATEGORIZER_STATS{"categorizer_stats"}; const std::string PARTITION_FIELD_NAME{"partition_field_name"}; const std::string PARTITION_FIELD_VALUE{"partition_field_value"}; +const std::string OUTPUT_MEMORY_ALLOCATOR_BYTES("output_memory_allocator_bytes"); } void CModelSizeStatsJsonWriter::write(const std::string& jobId, @@ -85,6 +86,9 @@ void CModelSizeStatsJsonWriter::write(const std::string& jobId, writer.onString(model_t::print(results.s_AssignmentMemoryBasis)); } + writer.onKey(OUTPUT_MEMORY_ALLOCATOR_BYTES); + writer.onUint64(results.s_OutputMemoryAllocatorUsage); + CModelSizeStatsJsonWriter::writeCommonFields( jobId, results.s_OverallCategorizerStats, results.s_BucketStartTime, writer); diff --git a/lib/api/unittest/CAnomalyJobLimitTest.cc b/lib/api/unittest/CAnomalyJobLimitTest.cc index 4082e7e0fb..b003e90a53 100644 --- a/lib/api/unittest/CAnomalyJobLimitTest.cc +++ b/lib/api/unittest/CAnomalyJobLimitTest.cc @@ -323,9 +323,9 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) { std::size_t s_ExpectedByMemoryUsageRelativeErrorDivisor; std::size_t s_ExpectedPartitionUsageRelativeErrorDivisor; std::size_t s_ExpectedOverUsageRelativeErrorDivisor; - } testParams[]{{600, 500, 6000, 290, 33, 25, 30}, - {3600, 500, 5500, 280, 27, 25, 20}, - {172800, 150, 850, 110, 6, 5, 3}}; + } testParams[]{{600, 500, 5200, 290, 27, 25, 2}, + {3600, 500, 5500, 280, 27, 25, 2}, + {172800, 65, 850, 50, 6, 5, 2}}; for (const auto& testParam : testParams) { TGeneratorVec generators{periodic, tradingDays, level, ramp, sparse}; @@ -442,7 +442,7 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) { LOG_DEBUG(<< "**** Test over with bucketLength = " << testParam.s_BucketLength << " ****"); { - std::size_t memoryLimit{5 /*MB*/}; + std::size_t memoryLimit{20 /*MB*/}; model::CLimits limits; limits.resourceMonitor().memoryLimit(memoryLimit); ml::api::CAnomalyJobConfig jobConfig = @@ -481,7 +481,7 @@ BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) { LOG_DEBUG(<< "Memory usage = " << used.s_Usage); LOG_DEBUG(<< "Memory limit bytes = " << memoryLimit * 1024 * 1024); BOOST_TEST_REQUIRE(used.s_OverFields > testParam.s_ExpectedOverFields); - BOOST_TEST_REQUIRE(used.s_OverFields < 7100); + BOOST_TEST_REQUIRE(used.s_OverFields <= 9000); BOOST_REQUIRE_CLOSE_ABSOLUTE( memoryLimit * core::constants::BYTES_IN_MEGABYTES / 2, used.s_Usage, memoryLimit * core::constants::BYTES_IN_MEGABYTES / diff --git a/lib/api/unittest/CModelSnapshotJsonWriterTest.cc b/lib/api/unittest/CModelSnapshotJsonWriterTest.cc index c200f8fd0a..ff404b7f91 100644 --- a/lib/api/unittest/CModelSnapshotJsonWriterTest.cc +++ b/lib/api/unittest/CModelSnapshotJsonWriterTest.cc @@ -28,6 +28,7 @@ using namespace api; BOOST_AUTO_TEST_CASE(testWrite) { std::ostringstream sstream; + // clang-format off // The output writer won't close the JSON structures until is is destroyed { model::CResourceMonitor::SModelSizeStats modelSizeStats{ @@ -44,6 +45,7 @@ BOOST_AUTO_TEST_CASE(testWrite) { core_t::TTime(1521046309), // bucket start time 0, // model bytes exceeded 50000, // model bytes memory limit + 60000, // JSON memory allocator bytes used {1000, // categorized messages 100, // total categories 7, // frequent categories @@ -51,6 +53,7 @@ BOOST_AUTO_TEST_CASE(testWrite) { 2, // dead categories 8, // failed categories model_t::E_CategorizationStatusWarn}}; + // clang-format on CModelSnapshotJsonWriter::SModelSnapshotReport report{ "6.3.0", diff --git a/lib/model/CForecastDataSink.cc b/lib/model/CForecastDataSink.cc index 08f62e763c..0f93e51173 100644 --- a/lib/model/CForecastDataSink.cc +++ b/lib/model/CForecastDataSink.cc @@ -113,6 +113,7 @@ CForecastDataSink::CForecastDataSink(const std::string& jobId, m_Writer(outStream), m_NumRecordsWritten(0), m_CreateTime(createTime), m_StartTime(startTime), m_EndTime(endTime), m_ExpiryTime(expiryTime), m_MemoryUsage(memoryUsage) { + m_MemoryUsage += m_Writer.getOutputMemoryAllocatorUsage(); } void CForecastDataSink::writeStats(const double progress, diff --git a/lib/model/CResourceMonitor.cc b/lib/model/CResourceMonitor.cc index 7e86dfc433..116dc75b8b 100644 --- a/lib/model/CResourceMonitor.cc +++ b/lib/model/CResourceMonitor.cc @@ -395,6 +395,8 @@ CResourceMonitor::createMemoryUsageReport(core_t::TTime bucketStartTime) { resource.first->updateModelSizeStats(res); } res.s_AllocationFailures += m_AllocationFailuresCount; + res.s_OutputMemoryAllocatorUsage = static_cast( + core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage)); res.s_OverallCategorizerStats.s_MemoryCategorizationFailures += m_CategorizerAllocationFailures; return res; } @@ -487,7 +489,9 @@ std::size_t CResourceMonitor::lowLimit() const { std::size_t CResourceMonitor::totalMemory() const { return m_MonitoredResourceCurrentMemory + m_ExtraMemory + CStringStore::names().memoryUsage() + - CStringStore::influencers().memoryUsage(); + CStringStore::influencers().memoryUsage() + + static_cast(core::CProgramCounters::counter( + counter_t::E_TSADOutputMemoryAllocatorUsage)); } } // model