diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7b6db1636234aa..c933511abb2b49 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1762,6 +1762,33 @@ DEFINE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec, "3600"); DEFINE_mInt64(ann_index_build_chunk_size, "1000000"); DEFINE_Validator(ann_index_build_chunk_size, [](const int64_t config) -> bool { return config > 0; }); +// Chunk byte budget for ANN/vector index building. The effective per-batch row +// count is min(ann_index_build_chunk_size, ann_index_build_chunk_bytes / row_bytes). +DEFINE_mInt64(ann_index_build_chunk_bytes, "134217728"); +DEFINE_Validator(ann_index_build_chunk_bytes, + [](const int64_t config) -> bool { return config > 0; }); + +// Global byte budget shared by all concurrent ANN/vector index builds on this BE. +// 0 disables admission control (legacy behavior; OOM exposure unchanged). +// Any positive value enables AnnBuildMemoryBudget: writers reserve their +// estimated peak before train/add and release on finish. +DEFINE_mInt64(ann_index_build_memory_budget_bytes, "0"); +DEFINE_Validator(ann_index_build_memory_budget_bytes, + [](const int64_t config) -> bool { return config >= 0; }); +// Max time (ms) a writer waits when the budget is exhausted before applying +// ann_index_build_on_oom_action. Only meaningful when the budget is enabled +// and the action is "wait". +DEFINE_mInt64(ann_index_build_memory_wait_timeout_ms, "30000"); +DEFINE_Validator(ann_index_build_memory_wait_timeout_ms, + [](const int64_t config) -> bool { return config >= 0; }); +// Behavior when the budget cannot be satisfied within the wait timeout. +// Accepted values: "wait" (default; treat timeout as failure), "skip" (delete +// the index entry and let the segment write succeed without ANN), "fail" +// (return RuntimeError immediately without waiting). +DEFINE_mString(ann_index_build_on_oom_action, "wait"); +DEFINE_Validator(ann_index_build_on_oom_action, [](const std::string& v) -> bool { + return v == "wait" || v == "skip" || v == "fail"; +}); DEFINE_mBool(enable_wal_tde, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 427282a4452bc4..b04076aea3e718 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1811,6 +1811,17 @@ DECLARE_String(ann_index_ivf_list_cache_limit); DECLARE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec); // Chunk size for ANN/vector index building per training/adding batch DECLARE_mInt64(ann_index_build_chunk_size); +// Soft byte budget for each ANN/vector index build chunk. Used together with +// ann_index_build_chunk_size to derive the effective per-batch row count. +DECLARE_mInt64(ann_index_build_chunk_bytes); +// Global byte budget shared by all concurrent ANN/vector index builds on this BE. +// 0 disables admission control. +DECLARE_mInt64(ann_index_build_memory_budget_bytes); +// Max wait time (ms) before admission control falls back to on_oom_action. +DECLARE_mInt64(ann_index_build_memory_wait_timeout_ms); +// One of: "wait" | "skip" | "fail" — what to do when the budget is exhausted +// past the wait timeout. +DECLARE_mString(ann_index_build_on_oom_action); DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction); DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction); diff --git a/be/src/storage/index/ann/ann_build_memory_budget.cpp b/be/src/storage/index/ann/ann_build_memory_budget.cpp new file mode 100644 index 00000000000000..4d82e3262e2b3b --- /dev/null +++ b/be/src/storage/index/ann/ann_build_memory_budget.cpp @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/index/ann/ann_build_memory_budget.h" + +#include +#include + +#include "common/config.h" +#include "util/cpu_info.h" + +namespace doris::segment_v2 { + +namespace { + +constexpr int64_t kPerThreadWorkspaceBytes = 4LL << 20; // 4 MiB +constexpr double kHnswGraphFactor = 2.0; +constexpr int kHnswEdgeBytes = 8; + +int omp_threads_cap() { + if (config::omp_threads_limit > 0) { + return config::omp_threads_limit; + } + return std::max(1, CpuInfo::num_cores()); +} + +int64_t per_row_store_bytes(const AnnBuildMemoryParams& params) { + const int64_t dim = std::max(0, params.dim); + switch (params.quantizer) { + case AnnBuildMemoryParams::Quantizer::FLAT: + return dim * static_cast(sizeof(float)); + case AnnBuildMemoryParams::Quantizer::SQ8: + return dim; + case AnnBuildMemoryParams::Quantizer::SQ4: + return (dim + 1) / 2; + case AnnBuildMemoryParams::Quantizer::PQ: + return std::max(0, params.pq_m); + } + return dim * static_cast(sizeof(float)); +} + +} // namespace + +AnnBuildMemoryBudget& AnnBuildMemoryBudget::instance() { + static AnnBuildMemoryBudget s_instance; + return s_instance; +} + +bool AnnBuildMemoryBudget::try_reserve(int64_t bytes, int64_t timeout_ms, int64_t caller_held) { + if (bytes <= 0) { + return true; + } + const int64_t budget = config::ann_index_build_memory_budget_bytes; + if (budget <= 0) { + // Admission control disabled. + return true; + } + if (caller_held < 0) { + caller_held = 0; + } + + std::unique_lock lock(_mu); + auto can_fit = [&]() { + // Re-read budget on each wake: the config is mutable and an operator + // may raise it while we wait. + const int64_t current_budget = config::ann_index_build_memory_budget_bytes; + if (current_budget <= 0) { + return true; + } + // Allow a single oversized build to proceed if it is the only one in + // flight; otherwise it would deadlock forever against itself. `caller_held` + // is what this same build already reserved, so a grow only blocks on + // *other* builds (when _reserved == caller_held nobody else is in flight). + if (_reserved <= caller_held) { + return true; + } + return _reserved + bytes <= current_budget; + }; + + if (can_fit()) { + _reserved += bytes; + return true; + } + if (timeout_ms <= 0) { + return false; + } + const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + if (_cv.wait_until(lock, deadline, can_fit)) { + _reserved += bytes; + return true; + } + return false; +} + +void AnnBuildMemoryBudget::release(int64_t bytes) { + if (bytes <= 0) { + return; + } + { + std::lock_guard lock(_mu); + _reserved -= bytes; + if (_reserved < 0) { + _reserved = 0; + } + } + _cv.notify_all(); +} + +int64_t AnnBuildMemoryBudget::reserved_bytes() const { + std::lock_guard lock(_mu); + return _reserved; +} + +void AnnBuildMemoryBudget::reset_for_test() { + { + std::lock_guard lock(_mu); + _reserved = 0; + } + _cv.notify_all(); +} + +void AnnBuildMemoryReservation::release() noexcept { + if (_bytes > 0) { + AnnBuildMemoryBudget::instance().release(_bytes); + _bytes = 0; + } +} + +bool AnnBuildMemoryReservation::grow(int64_t additional_bytes, int64_t timeout_ms) { + if (additional_bytes <= 0) { + return true; + } + if (!AnnBuildMemoryBudget::instance().try_reserve(additional_bytes, timeout_ms, + /*caller_held=*/_bytes)) { + return false; + } + _bytes += additional_bytes; + return true; +} + +AnnBuildMemoryReservation AnnBuildMemoryReservation::try_acquire(int64_t bytes, + int64_t timeout_ms) { + if (bytes <= 0) { + return AnnBuildMemoryReservation(); + } + if (!AnnBuildMemoryBudget::instance().try_reserve(bytes, timeout_ms)) { + return AnnBuildMemoryReservation(); + } + return AnnBuildMemoryReservation(bytes); +} + +int64_t estimate_ann_build_memory(const AnnBuildMemoryParams& params, int64_t expected_rows, + int64_t chunk_rows) { + const int64_t dim = std::max(0, params.dim); + if (dim == 0) { + return 0; + } + if (chunk_rows < 1) { + chunk_rows = 1; + } + if (expected_rows <= 0) { + // Unknown segment size: assume at least one chunk worth so admission + // covers the input buffer plus a chunk's worth of graph/store. + expected_rows = chunk_rows; + } + + const int64_t buffer_bytes = chunk_rows * dim * static_cast(sizeof(float)); + const int64_t store_bytes = expected_rows * per_row_store_bytes(params); + + int64_t structure_bytes = 0; + switch (params.index_type) { + case AnnBuildMemoryParams::IndexKind::HNSW: { + const int64_t degree = std::max(1, params.max_degree); + structure_bytes = + static_cast(expected_rows * degree * kHnswEdgeBytes * kHnswGraphFactor); + break; + } + case AnnBuildMemoryParams::IndexKind::IVF: + case AnnBuildMemoryParams::IndexKind::IVF_ON_DISK: { + // Coarse quantizer centroids (flat) + small per-list overhead. + const int64_t nlist = std::max(1, params.ivf_nlist); + const int64_t centroid_bytes = nlist * dim * static_cast(sizeof(float)); + const int64_t list_overhead = nlist * 64; // small per-list bookkeeping + structure_bytes = centroid_bytes + list_overhead; + break; + } + } + + const int64_t temp_bytes = + std::max(buffer_bytes, omp_threads_cap() * kPerThreadWorkspaceBytes); + + return buffer_bytes + store_bytes + structure_bytes + temp_bytes; +} + +} // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/ann_build_memory_budget.h b/be/src/storage/index/ann/ann_build_memory_budget.h new file mode 100644 index 00000000000000..0f3975abe75cb1 --- /dev/null +++ b/be/src/storage/index/ann/ann_build_memory_budget.h @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +namespace doris::segment_v2 { + +// FAISS-free mirror of the few FaissBuildParameter fields the memory estimator +// needs. Kept independent of on purpose: this header is pulled in +// (via ann_index_writer.h -> column_writer.h) by many translation units that do +// NOT have the FAISS include path, so it must not leak FAISS headers. The writer +// fills this in init() from its FaissBuildParameter. +struct AnnBuildMemoryParams { + enum class IndexKind { HNSW, IVF, IVF_ON_DISK }; + enum class Quantizer { FLAT, SQ4, SQ8, PQ }; + IndexKind index_type = IndexKind::HNSW; + Quantizer quantizer = Quantizer::FLAT; + int dim = 0; + int max_degree = 0; + int ivf_nlist = 0; + int pq_m = 0; +}; + +// Global byte budget shared by every concurrent ANN/vector index build on this +// BE. Acts as admission control: writers reserve their estimated peak before +// train/add and release on finish. A budget of 0 (config default) disables +// admission control and all reservations succeed immediately. +// +// Pairs with ScopedOmpThreadBudget (which caps CPU) to bound HNSW/IVF build +// memory across concurrent loads, compactions, and schema-change builds. +class AnnBuildMemoryBudget { +public: + static AnnBuildMemoryBudget& instance(); + + // Try to reserve `bytes` from the global budget. Blocks up to `timeout_ms` + // for other builds to release memory. Returns true on success or when the + // budget is disabled (`ann_index_build_memory_budget_bytes` <= 0). Returns + // false on timeout. `bytes <= 0` always succeeds without altering counters. + // + // `caller_held` is what this same build already holds; it is excluded from + // the "single build in flight" exemption so a build can keep growing its own + // reservation without deadlocking against itself (a grow is always allowed + // when the caller is the only build in flight, i.e. `_reserved == caller_held`). + bool try_reserve(int64_t bytes, int64_t timeout_ms, int64_t caller_held = 0); + + // Releases previously reserved bytes and notifies any waiters. + void release(int64_t bytes); + + int64_t reserved_bytes() const; + + // Test hook: reset internal counter. Not for production use. + void reset_for_test(); + +private: + AnnBuildMemoryBudget() = default; + + mutable std::mutex _mu; + std::condition_variable _cv; + int64_t _reserved = 0; +}; + +// RAII handle for a successful AnnBuildMemoryBudget reservation. Releases the +// bytes on destruction or explicit release(). Move-only. +class AnnBuildMemoryReservation { +public: + AnnBuildMemoryReservation() = default; + AnnBuildMemoryReservation(const AnnBuildMemoryReservation&) = delete; + AnnBuildMemoryReservation& operator=(const AnnBuildMemoryReservation&) = delete; + AnnBuildMemoryReservation(AnnBuildMemoryReservation&& other) noexcept : _bytes(other._bytes) { + other._bytes = 0; + } + AnnBuildMemoryReservation& operator=(AnnBuildMemoryReservation&& other) noexcept { + if (this != &other) { + release(); + _bytes = other._bytes; + other._bytes = 0; + } + return *this; + } + ~AnnBuildMemoryReservation() { release(); } + + void release() noexcept; + int64_t bytes() const { return _bytes; } + bool active() const { return _bytes > 0; } + + // Grow this reservation by `additional_bytes` against the global budget. + // Used to track real memory as a build accumulates rows past its initial + // (chunk-sized) admission reservation. Because the build already holds + // bytes() it is exempt from blocking against itself; it only waits for + // *other* concurrent builds. Returns true on success (handle grows), + // false on timeout (handle unchanged). `additional_bytes <= 0` is a no-op + // that returns true. + bool grow(int64_t additional_bytes, int64_t timeout_ms); + + // Acquire `bytes` against the global budget. On success the returned + // reservation owns the bytes; on failure the returned reservation is + // inactive (active() == false). `bytes <= 0` yields an inactive reservation + // and is a successful no-op. + static AnnBuildMemoryReservation try_acquire(int64_t bytes, int64_t timeout_ms); + +private: + explicit AnnBuildMemoryReservation(int64_t bytes) : _bytes(bytes) {} + int64_t _bytes = 0; +}; + +// Conservative memory peak estimate for a single ANN index build. +// expected_rows is an upper bound on segment row count (0 = unknown, treated as +// chunk_rows so admission is at least chunk-sized). +// +// Bytes accounted: +// - input chunk buffer (chunk_rows * dim * sizeof(float)) +// - quantized vector store: FLAT=4*dim, SQ8=dim, SQ4=dim/2, PQ=pq_m bytes/row +// - HNSW graph: rows * max_degree * 8 bytes * graph_factor +// OR IVF coarse quantizer + inverted lists overhead +// - per-thread workspace (omp_threads * 4 MiB) +// +// The model is intentionally conservative: precision is not the goal, the +// goal is to refuse builds that would obviously blow past the budget. +int64_t estimate_ann_build_memory(const AnnBuildMemoryParams& params, int64_t expected_rows, + int64_t chunk_rows); + +} // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/ann_index.h b/be/src/storage/index/ann/ann_index.h index 936a58cbd17a18..b5dcdd4387644d 100644 --- a/be/src/storage/index/ann/ann_index.h +++ b/be/src/storage/index/ann/ann_index.h @@ -78,6 +78,13 @@ class VectorIndex { virtual doris::Status train(Int64 n, const float* x) = 0; + // Whether the underlying index requires a per-build train() call before add(). + // HNSW has nothing to learn from data and overrides to false; IVF (k-means) and + // PQ/SQ quantizers must be trained once before add() is allowed. + // Callers should train at most once per index lifetime; subsequent train() calls + // would re-cluster and invalidate vectors added in earlier chunks. + virtual bool needs_training() const { return false; } + /** Add n vectors of dimension d vectors to the index. * * Vectors are implicitly assigned labels ntotal .. ntotal + n - 1 diff --git a/be/src/storage/index/ann/ann_index_writer.cpp b/be/src/storage/index/ann/ann_index_writer.cpp index 28d348cc319a48..31de76d20f8816 100644 --- a/be/src/storage/index/ann/ann_index_writer.cpp +++ b/be/src/storage/index/ann/ann_index_writer.cpp @@ -17,11 +17,14 @@ #include "storage/index/ann/ann_index_writer.h" +#include #include #include #include #include "common/cast_set.h" +#include "common/config.h" +#include "storage/index/ann/ann_build_memory_budget.h" #include "storage/index/ann/faiss_ann_index.h" #include "storage/index/inverted/inverted_index_fs_directory.h" @@ -35,10 +38,57 @@ static std::string get_or_default(const std::map& prop return default_value; } +// Project the FAISS build parameters down to the FAISS-free subset the memory +// estimator needs. Done here (inside the ANN translation unit, which has the +// FAISS include path) so ann_index_writer.h can stay FAISS-free. +static AnnBuildMemoryParams to_memory_params(const FaissBuildParameter& p) { + AnnBuildMemoryParams m; + switch (p.index_type) { + case FaissBuildParameter::IndexType::HNSW: + m.index_type = AnnBuildMemoryParams::IndexKind::HNSW; + break; + case FaissBuildParameter::IndexType::IVF: + m.index_type = AnnBuildMemoryParams::IndexKind::IVF; + break; + case FaissBuildParameter::IndexType::IVF_ON_DISK: + m.index_type = AnnBuildMemoryParams::IndexKind::IVF_ON_DISK; + break; + } + switch (p.quantizer) { + case FaissBuildParameter::Quantizer::FLAT: + m.quantizer = AnnBuildMemoryParams::Quantizer::FLAT; + break; + case FaissBuildParameter::Quantizer::SQ4: + m.quantizer = AnnBuildMemoryParams::Quantizer::SQ4; + break; + case FaissBuildParameter::Quantizer::SQ8: + m.quantizer = AnnBuildMemoryParams::Quantizer::SQ8; + break; + case FaissBuildParameter::Quantizer::PQ: + m.quantizer = AnnBuildMemoryParams::Quantizer::PQ; + break; + } + m.dim = p.dim; + m.max_degree = p.max_degree; + m.ivf_nlist = p.ivf_nlist; + m.pq_m = p.pq_m; + return m; +} + AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer, const TabletIndex* index_meta) : _index_file_writer(index_file_writer), _index_meta(index_meta) {} +size_t AnnIndexColumnWriter::compute_chunk_rows(size_t dim) { + if (dim == 0) { + return 1; + } + const size_t bytes_per_row = dim * sizeof(float); + const size_t rows_by_bytes = std::max( + 1, cast_set(config::ann_index_build_chunk_bytes) / bytes_per_row); + return std::max(1, std::min(cast_set(chunk_size()), rows_by_bytes)); +} + AnnIndexColumnWriter::~AnnIndexColumnWriter() {} Status AnnIndexColumnWriter::init() { @@ -55,8 +105,8 @@ Status AnnIndexColumnWriter::init() { const std::string index_type = get_or_default(properties, INDEX_TYPE, "hnsw"); const std::string metric_type = get_or_default(properties, METRIC_TYPE, "l2_distance"); const std::string quantizer = get_or_default(properties, QUANTIZER, "flat"); - FaissBuildParameter build_parameter; std::shared_ptr faiss_index = std::make_shared(); + FaissBuildParameter build_parameter; build_parameter.index_type = FaissBuildParameter::string_to_index_type(index_type); build_parameter.dim = std::stoi(get_or_default(properties, DIM, "512")); build_parameter.max_degree = std::stoi(get_or_default(properties, MAX_DEGREE, "32")); @@ -70,16 +120,108 @@ Status AnnIndexColumnWriter::init() { faiss_index->build(build_parameter); _vector_index = faiss_index; + _build_params = to_memory_params(build_parameter); + _dimension = cast_set(build_parameter.dim); + _chunk_rows = compute_chunk_rows(_dimension); LOG_INFO( - "Create a new faiss index, index_type {} dim {} metric_type {} max_degree {}, " - "ef_construction {}, quantizer {}", - index_type, build_parameter.dim, metric_type, build_parameter.max_degree, - build_parameter.ef_construction, quantizer); + "Create a new faiss index, index_id {} index_type {} dim {} metric_type {} " + "max_degree {}, ef_construction {}, quantizer {}, chunk_rows {} chunk_bytes {}", + _index_meta->index_id(), index_type, build_parameter.dim, metric_type, + build_parameter.max_degree, build_parameter.ef_construction, quantizer, _chunk_rows, + _chunk_rows * _dimension * sizeof(float)); + + RETURN_IF_ERROR(_acquire_memory_budget()); + + return Status::OK(); +} + +int64_t AnnIndexColumnWriter::_oom_wait_timeout_ms() { + // "fail" must not wait at all; "wait" and "skip" honor the configured timeout. + if (config::ann_index_build_on_oom_action == "fail") { + return 0; + } + return config::ann_index_build_memory_wait_timeout_ms; +} + +Status AnnIndexColumnWriter::_acquire_memory_budget() { + if (config::ann_index_build_memory_budget_bytes <= 0) { + // Admission control disabled. + return Status::OK(); + } + // Initial admission reservation. expected_rows is unknown at init() time, so + // estimate one chunk's worth as a floor. _ensure_reservation_for_rows() then + // grows the reservation toward the real footprint as rows accumulate, so the + // global budget reflects actual memory rather than just a per-chunk floor. + const int64_t estimated = + estimate_ann_build_memory(_build_params, /*expected_rows=*/0, _chunk_rows); + const int64_t timeout_ms = _oom_wait_timeout_ms(); + _reservation = AnnBuildMemoryReservation::try_acquire(estimated, timeout_ms); + if (_reservation.active() || estimated <= 0) { + return Status::OK(); + } + return _apply_oom_action(estimated, timeout_ms); +} + +Status AnnIndexColumnWriter::_ensure_reservation_for_rows(int64_t total_rows) { + if (config::ann_index_build_memory_budget_bytes <= 0) { + return Status::OK(); + } + const int64_t target = estimate_ann_build_memory(_build_params, total_rows, _chunk_rows); + const int64_t held = _reservation.bytes(); + if (target <= held) { + return Status::OK(); + } + const int64_t timeout_ms = _oom_wait_timeout_ms(); + if (_reservation.grow(target - held, timeout_ms)) { + return Status::OK(); + } + // Backpressure: the build cannot grow within the budget. "skip" discards the + // partially built index at finish() (segment write still succeeds); "wait"/ + // "fail" abort the build with a diagnostic error. + return _apply_oom_action(target, timeout_ms); +} - size_t block_size = AnnIndexColumnWriter::chunk_size() * build_parameter.dim; - _float_array.reserve(block_size); +void AnnIndexColumnWriter::_grow_reservation_best_effort(int64_t total_rows) { + if (config::ann_index_build_memory_budget_bytes <= 0) { + return; + } + const int64_t target = estimate_ann_build_memory(_build_params, total_rows, _chunk_rows); + const int64_t held = _reservation.bytes(); + if (target > held) { + // finish() only adds the last partial chunk; aborting a near-complete + // build is never worth it, so account best-effort without blocking. + (void)_reservation.grow(target - held, /*timeout_ms=*/0); + } +} + +Status AnnIndexColumnWriter::_apply_oom_action(int64_t estimated_bytes, int64_t waited_ms) { + const std::string action = config::ann_index_build_on_oom_action; + const int64_t budget = config::ann_index_build_memory_budget_bytes; + const int64_t in_use = AnnBuildMemoryBudget::instance().reserved_bytes(); + if (action == "skip") { + LOG_WARNING( + "Skipping ANN index {} build due to memory budget: estimated={} bytes, " + "in_use={} bytes, budget={} bytes, waited={} ms", + _index_meta->index_id(), estimated_bytes, in_use, budget, waited_ms); + _skip_due_to_oom = true; + return Status::OK(); + } + // "wait" already exhausted its timeout inside try_acquire; treat as failure. + return Status::RuntimeError( + "ANN index {} build failed due to memory budget (action={}): " + "estimated={} bytes, in_use={} bytes, budget={} bytes, waited={} ms", + _index_meta->index_id(), action, estimated_bytes, in_use, budget, waited_ms); +} +Status AnnIndexColumnWriter::_train_once_if_needed(Int64 n, const float* vec) { + if (_trained) { + return Status::OK(); + } + if (_vector_index->needs_training()) { + RETURN_IF_ERROR(_vector_index->train(n, vec)); + } + _trained = true; return Status::OK(); } @@ -96,9 +238,14 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val if (num_rows == 0) { return Status::OK(); } + if (_skip_due_to_oom) { + // Admission control chose to skip this index build; drop the rows + // silently so the surrounding segment write still succeeds. + return Status::OK(); + } const auto* offsets = reinterpret_cast(offsets_ptr); - const size_t dim = _vector_index->get_dimension(); + const size_t dim = _dimension; for (size_t i = 0; i < num_rows; ++i) { auto array_elem_size = offsets[i + 1] - offsets[i]; if (array_elem_size != dim) { @@ -109,10 +256,14 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val const float* p = reinterpret_cast(value_ptr); - const size_t full_elements = AnnIndexColumnWriter::chunk_size() * dim; + DCHECK(_chunk_rows > 0) << "init() must have computed _chunk_rows"; + const size_t full_elements = _current_chunk_capacity_elements(); size_t remaining_elements = num_rows * dim; size_t src_offset = 0; while (remaining_elements > 0) { + if (_float_array.capacity() < full_elements) { + _float_array.reserve(full_elements); + } size_t available_space = full_elements - _float_array.size(); size_t elements_to_add = std::min(remaining_elements, available_space); @@ -121,11 +272,20 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val remaining_elements -= elements_to_add; if (_float_array.size() == full_elements) { - RETURN_IF_ERROR( - _vector_index->train(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - RETURN_IF_ERROR( - _vector_index->add(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - _float_array.clear(); + // Grow the budget reservation to cover the rows this add() will make + // resident before consuming more memory. + RETURN_IF_ERROR(_ensure_reservation_for_rows(static_cast(_added_rows) + + static_cast(_chunk_rows))); + if (_skip_due_to_oom) { + // Backpressure chose to skip: drop the buffered chunk and stop + // adding. finish() deletes the index entry. + _reset_chunk_buffer(true); + return Status::OK(); + } + RETURN_IF_ERROR(_train_once_if_needed(_chunk_rows, _float_array.data())); + RETURN_IF_ERROR(_vector_index->add(_chunk_rows, _float_array.data())); + _reset_chunk_buffer(false); + _added_rows += _chunk_rows; _need_save_index = true; } } @@ -145,30 +305,59 @@ int64_t AnnIndexColumnWriter::size() const { return 0; } +void AnnIndexColumnWriter::_reset_chunk_buffer(bool release_memory) { + _float_array.clear(); + if (release_memory) { + PODArray empty; + _float_array.swap(empty); + return; + } + // Guard against target == 0 (uninitialized dim/chunk_rows), which would make + // `target * 2 == 0` and degenerate the threshold check into a per-batch free. + const size_t target = _current_chunk_capacity_elements(); + if (target == 0) { + return; + } + if (_float_array.capacity() > target * 2) { + PODArray empty; + _float_array.swap(empty); + } +} + Status AnnIndexColumnWriter::finish() { + if (_skip_due_to_oom) { + _reset_chunk_buffer(true); + return _index_file_writer->delete_index(_index_meta); + } Int64 min_train_rows = _vector_index->get_min_train_rows(); // Check if we have enough rows to train the index // train/add the remaining data if (_float_array.empty()) { if (_need_save_index) { + // Release the input buffer before save() so the serialization workspace + // does not overlap with a stale chunk allocation (up to chunk_bytes). + _reset_chunk_buffer(true); return _vector_index->save(_dir.get()); } else { // No data was added at all. This can happen if the segment has 0 rows // or all rows were filtered out. We need to delete the directory entry // to avoid writing an empty/invalid index file. - LOG_INFO("No data to train/add for ANN index. Skipping index building."); + LOG_INFO("No data to train/add for ANN index {}. Skipping index building.", + _index_meta->index_id()); + _reset_chunk_buffer(true); return _index_file_writer->delete_index(_index_meta); } } else { - DCHECK(_float_array.size() % _vector_index->get_dimension() == 0); + DCHECK(_float_array.size() % _dimension == 0); - Int64 num_rows = _float_array.size() / _vector_index->get_dimension(); + Int64 num_rows = _float_array.size() / _dimension; if (num_rows >= min_train_rows) { - RETURN_IF_ERROR(_vector_index->train(num_rows, _float_array.data())); + _grow_reservation_best_effort(static_cast(_added_rows) + num_rows); + RETURN_IF_ERROR(_train_once_if_needed(num_rows, _float_array.data())); RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); + _reset_chunk_buffer(true); return _vector_index->save(_dir.get()); } else { // It happens to have not enough data to train. @@ -177,8 +366,9 @@ Status AnnIndexColumnWriter::finish() { // For IVF indexes, adding remaining vectors without training is acceptable // because the quantizer was already trained on previous batches. These vectors // are simply added to the nearest clusters without retraining. + _grow_reservation_best_effort(static_cast(_added_rows) + num_rows); RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); + _reset_chunk_buffer(true); return _vector_index->save(_dir.get()); } else { // Not enough data to train and no data added before. @@ -187,10 +377,9 @@ Status AnnIndexColumnWriter::finish() { // writing an empty/invalid index file which causes "IndexInput read past EOF" error. LOG_INFO( "Remaining data size {} is less than minimum {} rows required for ANN " - "index " - "training. Skipping index building for this segment.", - num_rows, min_train_rows); - _float_array.clear(); + "index {} training. Skipping index building for this segment.", + num_rows, min_train_rows, _index_meta->index_id()); + _reset_chunk_buffer(true); return _index_file_writer->delete_index(_index_meta); } } diff --git a/be/src/storage/index/ann/ann_index_writer.h b/be/src/storage/index/ann/ann_index_writer.h index 7b7e63f8574439..c98d3d1f155ce3 100644 --- a/be/src/storage/index/ann/ann_index_writer.h +++ b/be/src/storage/index/ann/ann_index_writer.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include "common/config.h" #include "core/pod_array.h" +#include "storage/index/ann/ann_build_memory_budget.h" #include "storage/index/ann/ann_index.h" #include "storage/index/index_file_writer.h" #include "storage/index/index_writer.h" @@ -45,6 +47,7 @@ class AnnIndexColumnWriter : public IndexColumnWriter { return config::ann_index_build_chunk_size; #endif } + static size_t compute_chunk_rows(size_t dim); static constexpr const char* INDEX_TYPE = "index_type"; static constexpr const char* METRIC_TYPE = "metric_type"; static constexpr const char* DIM = "dim"; @@ -70,7 +73,31 @@ class AnnIndexColumnWriter : public IndexColumnWriter { int64_t size() const override; Status finish() override; -private: +protected: + void _reset_chunk_buffer(bool release_memory); + size_t _current_chunk_capacity_elements() const { return _chunk_rows * _dimension; } + // Train the underlying index on demand for the first batch; subsequent calls + // are no-ops. Keeps IVF k-means from being re-run per chunk (which would + // invalidate vectors already added) and skips lock acquisition for HNSW. + Status _train_once_if_needed(Int64 n, const float* vec); + // Reserve the estimated build-memory peak from AnnBuildMemoryBudget. On + // success the reservation is held in _reservation until destruction. On + // failure, applies ann_index_build_on_oom_action ("wait"/"skip"/"fail"). + // Uses _build_params, which init() must populate first. + Status _acquire_memory_budget(); + Status _apply_oom_action(int64_t estimated_bytes, int64_t waited_ms); + // Grow _reservation so it covers `total_rows` of accumulated, resident + // vectors. Called before each streaming add() so the global budget tracks + // real memory instead of just the initial per-chunk floor. On failure routes + // through _apply_oom_action (may set _skip_due_to_oom or return an error). + Status _ensure_reservation_for_rows(int64_t total_rows); + // Best-effort, non-blocking variant for finish(): accounts the last partial + // chunk without aborting a near-complete build on contention. + void _grow_reservation_best_effort(int64_t total_rows); + // Wait timeout for budget acquisition: 0 for action "fail" (never wait), + // otherwise ann_index_build_memory_wait_timeout_ms. + static int64_t _oom_wait_timeout_ms(); + // VectorIndex shoule be managed by some cache. // VectorIndex should be weak shared by AnnIndexWriter and VectorIndexReader // This should be a weak_ptr @@ -81,6 +108,22 @@ class AnnIndexColumnWriter : public IndexColumnWriter { IndexFileWriter* _index_file_writer; const TabletIndex* _index_meta; std::shared_ptr _dir; + AnnBuildMemoryReservation _reservation; bool _need_save_index = false; + bool _trained = false; + // True after _apply_oom_action chose the "skip" path. Subsequent add_array_values + // calls are no-ops and finish() deletes the index entry so the surrounding + // segment write still succeeds. + bool _skip_due_to_oom = false; + size_t _dimension = 0; + size_t _chunk_rows = 0; + // Rows already added to the index via full chunks. Used to size incremental + // budget reservations against the real (accumulated) footprint. + size_t _added_rows = 0; + // FAISS-free build parameters captured at init() so reservation growth can + // re-estimate the footprint as the segment's row count becomes known. Kept + // FAISS-free so this header does not leak into the many non-ANN + // translation units that pull in column_writer.h. + AnnBuildMemoryParams _build_params; }; } // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp b/be/src/storage/index/ann/faiss_ann_index.cpp index f933f3c683f940..edb14f40e362b8 100644 --- a/be/src/storage/index/ann/faiss_ann_index.cpp +++ b/be/src/storage/index/ann/faiss_ann_index.cpp @@ -452,6 +452,16 @@ struct CachedRandomAccessReader : faiss::RandomAccessReader { size_t _file_size; }; +bool FaissVectorIndex::needs_training() const { + // HNSW does not require training. IVF performs k-means clustering, and any + // quantizer other than FLAT (SQ/PQ) also needs to learn parameters from data. + if (_params.index_type == FaissBuildParameter::IndexType::IVF || + _params.index_type == FaissBuildParameter::IndexType::IVF_ON_DISK) { + return true; + } + return _params.quantizer != FaissBuildParameter::Quantizer::FLAT; +} + doris::Status FaissVectorIndex::train(Int64 n, const float* vec) { DCHECK(vec != nullptr); DCHECK(_index != nullptr); diff --git a/be/src/storage/index/ann/faiss_ann_index.h b/be/src/storage/index/ann/faiss_ann_index.h index e1f1f65a3adadb..0313e0435ecf33 100644 --- a/be/src/storage/index/ann/faiss_ann_index.h +++ b/be/src/storage/index/ann/faiss_ann_index.h @@ -198,6 +198,8 @@ class FaissVectorIndex : public VectorIndex { */ doris::Status train(Int64 n, const float* vec) override; + bool needs_training() const override; + /** * @brief Adds vectors to the index for future searches. * diff --git a/be/test/storage/index/ann/ann_build_memory_budget_test.cpp b/be/test/storage/index/ann/ann_build_memory_budget_test.cpp new file mode 100644 index 00000000000000..901536155462fe --- /dev/null +++ b/be/test/storage/index/ann/ann_build_memory_budget_test.cpp @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/index/ann/ann_build_memory_budget.h" + +#include +#include + +#include +#include +#include +#include + +#include "common/config.h" +#include "io/fs/local_file_system.h" +#include "storage/index/ann/ann_index_writer.h" +#include "storage/index/ann/faiss_ann_index.h" +#include "storage/index/ann/vector_search_utils.h" +#include "storage/index/index_file_writer.h" +#include "storage/tablet/tablet_schema.h" +#include "util/defer_op.h" + +using namespace doris::vector_search_utils; + +namespace doris::segment_v2 { + +class AnnBuildMemoryBudgetTest : public ::testing::Test { +protected: + void SetUp() override { + _saved_budget = config::ann_index_build_memory_budget_bytes; + _saved_timeout = config::ann_index_build_memory_wait_timeout_ms; + _saved_action = config::ann_index_build_on_oom_action; + AnnBuildMemoryBudget::instance().reset_for_test(); + } + void TearDown() override { + AnnBuildMemoryBudget::instance().reset_for_test(); + config::ann_index_build_memory_budget_bytes = _saved_budget; + config::ann_index_build_memory_wait_timeout_ms = _saved_timeout; + config::ann_index_build_on_oom_action = _saved_action; + } + + int64_t _saved_budget = 0; + int64_t _saved_timeout = 0; + std::string _saved_action; +}; + +TEST_F(AnnBuildMemoryBudgetTest, DisabledBudgetGrantsEverything) { + config::ann_index_build_memory_budget_bytes = 0; + auto& budget = AnnBuildMemoryBudget::instance(); + EXPECT_TRUE(budget.try_reserve(1LL << 40, /*timeout_ms=*/0)); + EXPECT_EQ(budget.reserved_bytes(), 0); // disabled => no accounting +} + +TEST_F(AnnBuildMemoryBudgetTest, ReserveAndReleaseAccounting) { + config::ann_index_build_memory_budget_bytes = 1024; + auto& budget = AnnBuildMemoryBudget::instance(); + ASSERT_TRUE(budget.try_reserve(400, 0)); + EXPECT_EQ(budget.reserved_bytes(), 400); + ASSERT_TRUE(budget.try_reserve(600, 0)); + EXPECT_EQ(budget.reserved_bytes(), 1000); + budget.release(400); + EXPECT_EQ(budget.reserved_bytes(), 600); + budget.release(600); + EXPECT_EQ(budget.reserved_bytes(), 0); +} + +TEST_F(AnnBuildMemoryBudgetTest, TimeoutWhenStarved) { + config::ann_index_build_memory_budget_bytes = 1024; + auto& budget = AnnBuildMemoryBudget::instance(); + ASSERT_TRUE(budget.try_reserve(1024, 0)); + EXPECT_FALSE(budget.try_reserve(1, /*timeout_ms=*/10)); + budget.release(1024); +} + +TEST_F(AnnBuildMemoryBudgetTest, OversizedSingleBuildIsAllowed) { + // When nothing else is in flight, a single build larger than the budget + // must still proceed (otherwise it would self-deadlock). Once it is + // holding bytes, the next request must respect the budget. + config::ann_index_build_memory_budget_bytes = 100; + auto& budget = AnnBuildMemoryBudget::instance(); + ASSERT_TRUE(budget.try_reserve(10000, 0)); + EXPECT_EQ(budget.reserved_bytes(), 10000); + EXPECT_FALSE(budget.try_reserve(1, 0)); + budget.release(10000); +} + +TEST_F(AnnBuildMemoryBudgetTest, WaiterWakesOnRelease) { + config::ann_index_build_memory_budget_bytes = 1024; + auto& budget = AnnBuildMemoryBudget::instance(); + ASSERT_TRUE(budget.try_reserve(1024, 0)); + + std::atomic waiter_ok {false}; + std::thread waiter([&]() { + // Plenty of room to wait; should be unblocked by the release below. + waiter_ok = budget.try_reserve(500, /*timeout_ms=*/5000); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_FALSE(waiter_ok.load()); + budget.release(1024); + waiter.join(); + EXPECT_TRUE(waiter_ok.load()); + budget.release(500); +} + +TEST_F(AnnBuildMemoryBudgetTest, ReservationRaiiReleasesOnDestruction) { + config::ann_index_build_memory_budget_bytes = 1024; + auto& budget = AnnBuildMemoryBudget::instance(); + { + auto handle = AnnBuildMemoryReservation::try_acquire(800, 0); + ASSERT_TRUE(handle.active()); + EXPECT_EQ(handle.bytes(), 800); + EXPECT_EQ(budget.reserved_bytes(), 800); + } + EXPECT_EQ(budget.reserved_bytes(), 0); +} + +TEST_F(AnnBuildMemoryBudgetTest, ReservationFailureYieldsInactiveHandle) { + config::ann_index_build_memory_budget_bytes = 100; + auto& budget = AnnBuildMemoryBudget::instance(); + ASSERT_TRUE(budget.try_reserve(100, 0)); + auto handle = AnnBuildMemoryReservation::try_acquire(1, /*timeout_ms=*/5); + EXPECT_FALSE(handle.active()); + EXPECT_EQ(handle.bytes(), 0); + budget.release(100); +} + +TEST_F(AnnBuildMemoryBudgetTest, GrowAllowedForSoleBuildEvenPastBudget) { + // A build that already holds bytes may keep growing past the budget as long + // as it is the only build in flight; otherwise it would deadlock on itself. + config::ann_index_build_memory_budget_bytes = 100; + auto& budget = AnnBuildMemoryBudget::instance(); + auto handle = AnnBuildMemoryReservation::try_acquire(60, /*timeout_ms=*/0); + ASSERT_TRUE(handle.active()); + EXPECT_EQ(budget.reserved_bytes(), 60); + // 60 + 100 = 160 > budget, but sole build => allowed without waiting. + EXPECT_TRUE(handle.grow(100, /*timeout_ms=*/0)); + EXPECT_EQ(handle.bytes(), 160); + EXPECT_EQ(budget.reserved_bytes(), 160); +} + +TEST_F(AnnBuildMemoryBudgetTest, GrowBlocksWhenOtherBuildInFlight) { + config::ann_index_build_memory_budget_bytes = 100; + auto& budget = AnnBuildMemoryBudget::instance(); + auto handle = AnnBuildMemoryReservation::try_acquire(60, /*timeout_ms=*/0); + ASSERT_TRUE(handle.active()); + // A second, independent build takes 30 (reserved now 90, another build present). + ASSERT_TRUE(budget.try_reserve(30, /*timeout_ms=*/0)); + // handle grow by 50 -> 90 + 50 = 140 > budget and not sole build => timeout. + EXPECT_FALSE(handle.grow(50, /*timeout_ms=*/10)); + EXPECT_EQ(handle.bytes(), 60); // unchanged on failure + EXPECT_EQ(budget.reserved_bytes(), 90); + budget.release(30); +} + +TEST_F(AnnBuildMemoryBudgetTest, GrowWakesWhenOtherBuildReleases) { + config::ann_index_build_memory_budget_bytes = 100; + auto& budget = AnnBuildMemoryBudget::instance(); + auto handle = AnnBuildMemoryReservation::try_acquire(40, /*timeout_ms=*/0); + ASSERT_TRUE(handle.active()); + ASSERT_TRUE(budget.try_reserve(50, /*timeout_ms=*/0)); // other build, reserved=90 + + std::atomic grew {false}; + std::thread waiter([&]() { + // 90 + 30 = 120 > budget; must wait for the other build to release. + grew = handle.grow(30, /*timeout_ms=*/5000); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_FALSE(grew.load()); + budget.release(50); + waiter.join(); + EXPECT_TRUE(grew.load()); + EXPECT_EQ(handle.bytes(), 70); +} + +TEST_F(AnnBuildMemoryBudgetTest, EstimateGrowsWithRowsAndDim) { + AnnBuildMemoryParams p; + p.index_type = AnnBuildMemoryParams::IndexKind::HNSW; + p.quantizer = AnnBuildMemoryParams::Quantizer::FLAT; + p.dim = 128; + p.max_degree = 32; + const int64_t small = estimate_ann_build_memory(p, /*expected_rows=*/1000, /*chunk_rows=*/100); + const int64_t big = estimate_ann_build_memory(p, /*expected_rows=*/100000, /*chunk_rows=*/100); + EXPECT_GT(big, small); + EXPECT_GT(small, 0); +} + +TEST_F(AnnBuildMemoryBudgetTest, EstimateUnknownRowsFallsBackToChunk) { + AnnBuildMemoryParams p; + p.index_type = AnnBuildMemoryParams::IndexKind::HNSW; + p.quantizer = AnnBuildMemoryParams::Quantizer::FLAT; + p.dim = 64; + p.max_degree = 16; + const int64_t unknown = estimate_ann_build_memory(p, /*expected_rows=*/0, /*chunk_rows=*/4096); + const int64_t one_chunk = + estimate_ann_build_memory(p, /*expected_rows=*/4096, /*chunk_rows=*/4096); + EXPECT_EQ(unknown, one_chunk); +} + +TEST_F(AnnBuildMemoryBudgetTest, EstimateQuantizersShrinkStore) { + AnnBuildMemoryParams p; + p.index_type = AnnBuildMemoryParams::IndexKind::HNSW; + p.dim = 256; + p.max_degree = 32; + + p.quantizer = AnnBuildMemoryParams::Quantizer::FLAT; + const int64_t flat = estimate_ann_build_memory(p, 10000, 1000); + p.quantizer = AnnBuildMemoryParams::Quantizer::SQ8; + const int64_t sq8 = estimate_ann_build_memory(p, 10000, 1000); + p.quantizer = AnnBuildMemoryParams::Quantizer::SQ4; + const int64_t sq4 = estimate_ann_build_memory(p, 10000, 1000); + p.quantizer = AnnBuildMemoryParams::Quantizer::PQ; + p.pq_m = 16; + const int64_t pq = estimate_ann_build_memory(p, 10000, 1000); + + // Each quantization step must reduce the per-row footprint of the store, + // pulling the overall estimate down. + EXPECT_GT(flat, sq8); + EXPECT_GT(sq8, sq4); + EXPECT_GT(sq4, pq); +} + +TEST_F(AnnBuildMemoryBudgetTest, EstimateHandlesZeroDim) { + AnnBuildMemoryParams p; + p.dim = 0; + EXPECT_EQ(estimate_ann_build_memory(p, 1000, 100), 0); +} + +// ------------------------------------------------------------------------- +// Writer integration: skip / fail / disabled paths. +// ------------------------------------------------------------------------- + +class TestSkipAwareWriter : public AnnIndexColumnWriter { +public: + using AnnIndexColumnWriter::AnnIndexColumnWriter; + bool skip_due_to_oom() const { return _skip_due_to_oom; } +}; + +class WriterAdmissionTest : public ::testing::Test { +protected: + void SetUp() override { + if (ExecEnv::GetInstance()->get_tmp_file_dirs() == nullptr) { + const std::string tmp_dir = "./ut_dir/tmp_vector_search"; + (void)doris::io::global_local_filesystem()->delete_directory(tmp_dir); + (void)doris::io::global_local_filesystem()->create_directory(tmp_dir); + std::vector paths; + paths.emplace_back(tmp_dir, -1); + auto tmp_file_dirs = std::make_unique(paths); + ASSERT_TRUE(tmp_file_dirs->init().ok()); + ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs)); + } + + _saved_budget = config::ann_index_build_memory_budget_bytes; + _saved_timeout = config::ann_index_build_memory_wait_timeout_ms; + _saved_action = config::ann_index_build_on_oom_action; + AnnBuildMemoryBudget::instance().reset_for_test(); + + _properties["index_type"] = "hnsw"; + _properties["metric_type"] = "l2_distance"; + _properties["dim"] = "16"; + _properties["max_degree"] = "16"; + _tablet_index = std::make_unique(); + _tablet_index->_index_type = IndexType::ANN; + _tablet_index->_properties = _properties; + _tablet_index->_index_id = 42; + _tablet_index->_index_name = "test_ann_index"; + + _index_file_writer = + std::make_unique(doris::io::global_local_filesystem()); + } + void TearDown() override { + AnnBuildMemoryBudget::instance().reset_for_test(); + config::ann_index_build_memory_budget_bytes = _saved_budget; + config::ann_index_build_memory_wait_timeout_ms = _saved_timeout; + config::ann_index_build_on_oom_action = _saved_action; + } + + int64_t _saved_budget = 0; + int64_t _saved_timeout = 0; + std::string _saved_action; + std::map _properties; + std::unique_ptr _tablet_index; + std::unique_ptr _index_file_writer; +}; + +TEST_F(WriterAdmissionTest, SkipModeDeletesIndexAndSwallowsRows) { + // Tiny budget that the estimator will never satisfy. The writer should + // enter skip mode, accept add_array_values silently, and ask the index + // file writer to delete the index entry on finish(). + config::ann_index_build_memory_budget_bytes = 1; + config::ann_index_build_memory_wait_timeout_ms = 0; + config::ann_index_build_on_oom_action = "skip"; + // Occupy the budget so the writer is NOT the only build in flight; otherwise + // the "single oversized build" exemption would let it through and skip mode + // would never trigger. + ASSERT_TRUE(AnnBuildMemoryBudget::instance().try_reserve(1, /*timeout_ms=*/0)); + + auto writer = + std::make_unique(_index_file_writer.get(), _tablet_index.get()); + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + // IndexFileWriter::delete_index is not mocked; the real method short-circuits + // to Status::OK when the index id is not registered, which matches the + // skip-path expectation here. + + ASSERT_TRUE(writer->init().ok()); + EXPECT_TRUE(writer->skip_due_to_oom()); + + const size_t dim = 16; + const size_t num_rows = 2; + std::vector vectors(num_rows * dim, 1.0f); + std::vector offsets = {0, dim, 2 * dim}; + EXPECT_TRUE(writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows) + .ok()); + + EXPECT_TRUE(writer->finish().ok()); +} + +TEST_F(WriterAdmissionTest, FailModeReturnsErrorFromInit) { + config::ann_index_build_memory_budget_bytes = 1; + config::ann_index_build_memory_wait_timeout_ms = 0; + config::ann_index_build_on_oom_action = "fail"; + // Occupy the budget so the writer is NOT the only build in flight; otherwise + // the "single oversized build" exemption would let it through and init() + // would succeed instead of failing. + ASSERT_TRUE(AnnBuildMemoryBudget::instance().try_reserve(1, /*timeout_ms=*/0)); + + auto writer = + std::make_unique(_index_file_writer.get(), _tablet_index.get()); + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + Status status = writer->init(); + EXPECT_FALSE(status.ok()); + EXPECT_TRUE(status.is()) << status.to_string(); +} + +TEST_F(WriterAdmissionTest, FailModeDoesNotWaitDespiteLargeTimeout) { + // Regression: "fail" must return immediately, never blocking for the + // configured wait timeout. We set a large timeout and assert init() fails + // in well under it. + config::ann_index_build_memory_budget_bytes = 1; + config::ann_index_build_memory_wait_timeout_ms = 10000; + config::ann_index_build_on_oom_action = "fail"; + // Occupy the budget so the writer's reservation cannot fit. + ASSERT_TRUE(AnnBuildMemoryBudget::instance().try_reserve(1, /*timeout_ms=*/0)); + + auto writer = + std::make_unique(_index_file_writer.get(), _tablet_index.get()); + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + const auto start = std::chrono::steady_clock::now(); + Status status = writer->init(); + const auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + EXPECT_FALSE(status.ok()); + EXPECT_TRUE(status.is()) << status.to_string(); + EXPECT_LT(elapsed, 2000) << "fail action waited " << elapsed << " ms"; + AnnBuildMemoryBudget::instance().release(1); +} + +TEST_F(WriterAdmissionTest, ReservationGrowsWithAccumulatedRows) { + // With a generous budget the writer must enlarge its reservation as chunks + // are added, so the global counter tracks real memory rather than just the + // initial per-chunk floor. + config::ann_index_build_memory_budget_bytes = 1LL << 30; + config::ann_index_build_memory_wait_timeout_ms = 1000; + config::ann_index_build_on_oom_action = "wait"; + + auto writer = + std::make_unique(_index_file_writer.get(), _tablet_index.get()); + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + const int64_t after_init = AnnBuildMemoryBudget::instance().reserved_bytes(); + EXPECT_GT(after_init, 0); + + // BE_TEST chunk_size() is 10, so 30 rows flush three full chunks. + const size_t dim = 16; + const size_t num_rows = 30; + std::vector vectors(num_rows * dim, 0.5f); + std::vector offsets(num_rows + 1); + for (size_t i = 0; i <= num_rows; ++i) { + offsets[i] = i * dim; + } + ASSERT_TRUE(writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows) + .ok()); + + const int64_t after_adds = AnnBuildMemoryBudget::instance().reserved_bytes(); + EXPECT_GT(after_adds, after_init) << "reservation did not grow: after_init=" << after_init + << " after_adds=" << after_adds; + + ASSERT_TRUE(writer->finish().ok()); + writer.reset(); // RAII release + EXPECT_EQ(AnnBuildMemoryBudget::instance().reserved_bytes(), 0); +} + +TEST_F(WriterAdmissionTest, DisabledBudgetIsTransparent) { + config::ann_index_build_memory_budget_bytes = 0; + auto writer = + std::make_unique(_index_file_writer.get(), _tablet_index.get()); + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + EXPECT_FALSE(writer->skip_due_to_oom()); + EXPECT_EQ(AnnBuildMemoryBudget::instance().reserved_bytes(), 0); +} + +} // namespace doris::segment_v2 diff --git a/be/test/storage/index/ann/ann_index_writer_test.cpp b/be/test/storage/index/ann/ann_index_writer_test.cpp index bb30f9e19794af..e0d977f0443fe9 100644 --- a/be/test/storage/index/ann/ann_index_writer_test.cpp +++ b/be/test/storage/index/ann/ann_index_writer_test.cpp @@ -30,6 +30,7 @@ #include "storage/index/index_file_writer.h" #include "storage/index/inverted/inverted_index_fs_directory.h" #include "storage/tablet/tablet_schema.h" +#include "util/defer_op.h" using namespace doris::vector_search_utils; @@ -40,6 +41,7 @@ class MockVectorIndex : public VectorIndex { MockVectorIndex() { _dimension = 4; } // Set dimension for test MOCK_METHOD(doris::Status, train, (Int64 n, const float* vec), (override)); MOCK_METHOD(doris::Status, add, (Int64 n, const float* vec), (override)); + MOCK_METHOD(bool, needs_training, (), (const, override)); MOCK_METHOD(doris::Status, ann_topn_search, (const float* query_vec, int k, const segment_v2::IndexSearchParameters& params, segment_v2::IndexSearchResult& result), @@ -61,6 +63,10 @@ class TestAnnIndexColumnWriter : public AnnIndexColumnWriter { void set_vector_index(std::shared_ptr index) { _vector_index = index; } void set_need_save_index(bool value) { _need_save_index = value; } + size_t chunk_rows() const { return _chunk_rows; } + size_t float_array_capacity() const { return _float_array.capacity(); } + void reset_chunk_buffer_for_test(bool release) { _reset_chunk_buffer(release); } + void force_float_array_reserve(size_t n) { _float_array.reserve(n); } }; class AnnIndexWriterTest : public ::testing::Test { @@ -165,6 +171,27 @@ TEST_F(AnnIndexWriterTest, TestInitWithDifferentProperties) { } } +TEST_F(AnnIndexWriterTest, TestComputeChunkRowsByBytes) { + const auto old_chunk_bytes = config::ann_index_build_chunk_bytes; + Defer defer([old_chunk_bytes]() { config::ann_index_build_chunk_bytes = old_chunk_bytes; }); + + config::ann_index_build_chunk_bytes = 64; + + EXPECT_EQ(AnnIndexColumnWriter::compute_chunk_rows(4), 4); +} + +TEST_F(AnnIndexWriterTest, TestInitDoesNotReserveChunkBuffer) { + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + EXPECT_EQ(writer->float_array_capacity(), 0); +} + TEST_F(AnnIndexWriterTest, TestAddArrayValuesSuccess) { auto writer = std::make_unique(_index_file_writer.get(), _tablet_index.get()); @@ -427,11 +454,13 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); + // Index requires training (e.g. IVF/PQ). After the fix, train() is invoked + // only once on the first full chunk; the remainder reuses the trained state. + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, train(10, testing::_)) .Times(1) .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -478,6 +507,34 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { EXPECT_TRUE(status.ok()); } +TEST_F(AnnIndexWriterTest, TestFinishReleasesChunkBufferMemory) { + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + + std::vector vectors = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f}; + std::vector offsets = {0, 4, 8}; + ASSERT_TRUE(writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), 2) + .ok()); + EXPECT_GT(writer->float_array_capacity(), 0); + + ASSERT_TRUE(writer->finish().ok()); + EXPECT_EQ(writer->float_array_capacity(), 0); +} + TEST_F(AnnIndexWriterTest, TestCreateFromIndexColumnWriter) { TabletSchemaSPtr tablet_schema = std::make_shared(); TabletSchemaPB tablet_schema_pb; @@ -587,11 +644,12 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); + // IVF: train once on the first full chunk; subsequent batches only add(). + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, train(10, testing::_)) .Times(1) .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -662,6 +720,7 @@ TEST_F(AnnIndexWriterTest, TestSkipTrainWhenRemainderLessThanNlist) { // CHUNK_SIZE = 10, nlist = 5 // Add 12 rows: first 10 will be trained/added in one batch, remaining 2 < 5 // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); EXPECT_CALL(*mock_index, train(10, testing::_)) .Times(1) @@ -731,15 +790,18 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { writer->set_vector_index(mock_index); // CHUNK_SIZE = 10, nlist = 3 - // Add 23 rows: 2 full chunks of 10, remaining 3 == nlist, so train remaining + // Add 23 rows: 2 full chunks of 10, remaining 3 == nlist. + // After the fix train() is called exactly once on the first full chunk; + // the remaining 3 rows are added without re-training (which would otherwise + // re-cluster IVF and invalidate the already-added vectors). + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(3)); EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(2) .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -807,10 +869,11 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeSkipRemainder) { // CHUNK_SIZE = 10, nlist = 4 // Add 22 rows: 2 full chunks of 10, remaining 2 < 4 // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(4)); EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(2) .WillRepeatedly(testing::Return(Status::OK())); @@ -881,6 +944,7 @@ TEST_F(AnnIndexWriterTest, TestSkipIndexWhenTotalRowsLessThanNlist) { // Add only 3 rows, which is less than nlist (5) // Since no data was trained before (_need_save_index = false), we should skip index building entirely // No train, add, or save should be called + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); @@ -923,17 +987,17 @@ TEST_F(AnnIndexWriterTest, TestPQMinTrainRows) { writer->set_vector_index(mock_index); // Set up expectations: mock a very large min_train_rows threshold. - // Since we only provide 1000 vectors, which is less than 131072, training will happen in batches - // but finish() will skip saving since remaining data is insufficient + // 1000 vectors split into 100 chunks of 10. train() runs once on the first chunk, + // then all subsequent chunks only add(). At finish() there are 0 leftover rows + // and _need_save_index=true, so save() is invoked. + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // 1000 vectors will be processed in 100 batches of 10 vectors each EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(100) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(100) .WillRepeatedly(testing::Return(Status::OK())); - // Since we have trained data in batches, the index will be saved even though total data is insufficient EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); const size_t dim = 4; @@ -978,6 +1042,7 @@ TEST_F(AnnIndexWriterTest, TestSQMinTrainRows) { // Set up expectations: SQ should require at least 20 training vectors // Since we only provide 15 vectors, training will happen in batches but finish() will skip saving + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(20)); // 15 vectors will be processed in 1 batch of 10 vectors and remaining 5 vectors EXPECT_CALL(*mock_index, train(10, testing::_)) @@ -1028,16 +1093,17 @@ TEST_F(AnnIndexWriterTest, TestPQWithSufficientData) { writer->set_vector_index(mock_index); // Mock min_train_rows to 131072 and provide exactly that amount. + // 131072 vectors split into 13107 full chunks of 10 (= 131070 rows) plus 2 leftover. + // train() runs once on the first chunk; the rest is pure add() at chunk boundary + // plus one trailing add(2) in finish(). + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(true)); EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // Since we provide exactly 131072 vectors, they will be trained and added in chunks - // Each chunk is 10 vectors, so we expect 13107 train calls and 13107 add calls for full chunks EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(13107) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(13107) .WillRepeatedly(testing::Return(Status::OK())); - // The remaining 2 vectors will be added without training since min_train_rows > 2 EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -1066,4 +1132,87 @@ TEST_F(AnnIndexWriterTest, TestPQWithSufficientData) { EXPECT_TRUE(status.ok()); } +TEST_F(AnnIndexWriterTest, TestHnswSkipsTrainCall) { + // For HNSW (needs_training=false), train() must never be invoked regardless + // of how many chunks are pushed. This guards both the per-chunk path and + // finish()'s leftover path against unintentional lock acquisition. + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, needs_training()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(10, testing::_)) + .Times(1) + .WillRepeatedly(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + + const size_t dim = 4; + const size_t num_rows = 12; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); + } + std::vector offsets; + for (size_t i = 0; i <= num_rows; ++i) { + offsets.push_back(i * dim); + } + + ASSERT_TRUE(writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows) + .ok()); + ASSERT_TRUE(writer->finish().ok()); +} + +TEST_F(AnnIndexWriterTest, TestComputeChunkRowsHandlesDimZero) { + // Defensive guard: never divide by zero, return at least 1 row. + EXPECT_EQ(AnnIndexColumnWriter::compute_chunk_rows(0), 1); +} + +TEST_F(AnnIndexWriterTest, TestComputeChunkRowsByteBudgetSmallerThanRow) { + // Even when chunk_bytes is below a single row's footprint, we must still + // accept one row per chunk rather than zero. + const auto old_chunk_bytes = config::ann_index_build_chunk_bytes; + Defer defer([old_chunk_bytes]() { config::ann_index_build_chunk_bytes = old_chunk_bytes; }); + + config::ann_index_build_chunk_bytes = 1; + EXPECT_EQ(AnnIndexColumnWriter::compute_chunk_rows(128), 1); +} + +TEST_F(AnnIndexWriterTest, TestComputeChunkRowsChunkSizeIsUpperBound) { + // When the byte budget is generous, the per-row chunk_size cap still applies. + // BE_TEST: chunk_size() == 10. + const auto old_chunk_bytes = config::ann_index_build_chunk_bytes; + Defer defer([old_chunk_bytes]() { config::ann_index_build_chunk_bytes = old_chunk_bytes; }); + + config::ann_index_build_chunk_bytes = 1LL << 30; // 1 GiB + EXPECT_EQ(AnnIndexColumnWriter::compute_chunk_rows(4), 10); +} + +TEST_F(AnnIndexWriterTest, TestResetChunkBufferKeepsCapacityWhenTargetUnknown) { + // Regression: previously `_current_chunk_capacity_elements() * 2` evaluated + // to 0 when chunk_rows / dimension were unset, so every soft reset would + // free the buffer. After the fix, reset(false) must keep the capacity when + // the target is unknown; only an explicit reset(true) releases it. + TestAnnIndexColumnWriter writer(_index_file_writer.get(), _tablet_index.get()); + // Intentionally skip init() so _chunk_rows and _dimension stay 0. + writer.force_float_array_reserve(1024); + const size_t reserved = writer.float_array_capacity(); + ASSERT_GE(reserved, 1024u); + + writer.reset_chunk_buffer_for_test(/*release_memory=*/false); + EXPECT_EQ(writer.float_array_capacity(), reserved); + + writer.reset_chunk_buffer_for_test(/*release_memory=*/true); + EXPECT_EQ(writer.float_array_capacity(), 0u); +} + } // namespace doris::segment_v2