Skip to content
67 changes: 58 additions & 9 deletions nodedb-vector/src/collection/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::collection::tier::StorageTier;
use crate::distance::DistanceMetric;
use crate::flat::FlatIndex;
use crate::hnsw::{HnswIndex, HnswParams};
use crate::quantize::pq::PqCodec;

use super::lifecycle::VectorCollection;

Expand All @@ -33,12 +34,18 @@ pub(crate) struct CollectionSnapshot {
pub(crate) struct SealedSnapshot {
pub base_id: u32,
pub hnsw_bytes: Vec<u8>,
#[serde(default)]
pub pq_bytes: Option<Vec<u8>>,
#[serde(default)]
pub pq_codes: Option<Vec<u8>>,
}

#[derive(Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack)]
pub(crate) struct BuildingSnapshot {
pub base_id: u32,
pub vectors: Vec<Vec<f32>>,
#[serde(default)]
pub deleted: Vec<bool>,
}

impl VectorCollection {
Expand All @@ -53,17 +60,27 @@ impl VectorCollection {
next_id: self.next_id,
growing_base_id: self.growing_base_id,
growing_vectors: (0..self.growing.len() as u32)
.filter_map(|i| self.growing.get_vector(i).map(|v| v.to_vec()))
.filter_map(|i| self.growing.get_vector_raw(i).map(|v| v.to_vec()))
.collect(),
growing_deleted: (0..self.growing.len() as u32)
.map(|i| self.growing.get_vector(i).is_none())
.map(|i| self.growing.is_deleted(i))
.collect(),
sealed_segments: self
.sealed
.iter()
.map(|s| SealedSnapshot {
base_id: s.base_id,
hnsw_bytes: s.index.checkpoint_to_bytes(),
.map(|s| {
let (pq_bytes, pq_codes) = match &s.pq {
Some((codec, codes)) => {
(zerompk::to_msgpack_vec(codec).ok(), Some(codes.clone()))
}
None => (None, None),
};
SealedSnapshot {
base_id: s.base_id,
hnsw_bytes: s.index.checkpoint_to_bytes(),
pq_bytes,
pq_codes,
}
})
.collect(),
building_segments: self
Expand All @@ -72,7 +89,10 @@ impl VectorCollection {
.map(|b| BuildingSnapshot {
base_id: b.base_id,
vectors: (0..b.flat.len() as u32)
.filter_map(|i| b.flat.get_vector(i).map(|v| v.to_vec()))
.filter_map(|i| b.flat.get_vector_raw(i).map(|v| v.to_vec()))
.collect(),
deleted: (0..b.flat.len() as u32)
.map(|i| b.flat.is_deleted(i))
.collect(),
})
.collect(),
Expand Down Expand Up @@ -118,18 +138,35 @@ impl VectorCollection {
};

let mut growing = FlatIndex::new(snap.dim, metric);
for v in &snap.growing_vectors {
growing.insert(v.clone());
for (i, v) in snap.growing_vectors.iter().enumerate() {
let deleted = snap.growing_deleted.get(i).copied().unwrap_or(false);
if deleted {
growing.insert_tombstoned(v.clone());
} else {
growing.insert(v.clone());
}
}

let mut sealed = Vec::with_capacity(snap.sealed_segments.len());
for ss in &snap.sealed_segments {
if let Some(index) = HnswIndex::from_checkpoint(&ss.hnsw_bytes) {
let sq8 = VectorCollection::build_sq8_for_index(&index);
let pq = match (&ss.pq_bytes, &ss.pq_codes) {
(Some(bytes), Some(codes)) => zerompk::from_msgpack::<PqCodec>(bytes)
.ok()
.map(|codec| (codec, codes.clone())),
_ => None,
};
// Only train SQ8 when PQ isn't present — a segment never carries both.
let sq8 = if pq.is_some() {
None
} else {
VectorCollection::build_sq8_for_index(&index)
};
sealed.push(SealedSegment {
index,
base_id: ss.base_id,
sq8,
pq,
tier: StorageTier::L0Ram,
mmap_vectors: None,
});
Expand All @@ -143,18 +180,29 @@ impl VectorCollection {
.insert(v.clone())
.expect("dimension guaranteed by checkpoint");
}
// Replay building-segment tombstones onto the HNSW index.
for (i, &dead) in bs.deleted.iter().enumerate() {
if dead {
index.delete(i as u32);
}
}
let sq8 = VectorCollection::build_sq8_for_index(&index);
sealed.push(SealedSegment {
index,
base_id: bs.base_id,
sq8,
pq: None,
tier: StorageTier::L0Ram,
mmap_vectors: None,
});
}

let next_segment_id = (sealed.len() + 1) as u32;

let index_config = crate::index_config::IndexConfig {
hnsw: params.clone(),
..crate::index_config::IndexConfig::default()
};
Some(Self {
growing,
growing_base_id: snap.growing_base_id,
Expand All @@ -171,6 +219,7 @@ impl VectorCollection {
doc_id_map: snap.doc_id_map.into_iter().collect(),
multi_doc_map: snap.multi_doc_map.into_iter().collect(),
seal_threshold: DEFAULT_SEAL_THRESHOLD,
index_config,
})
}
}
Expand Down
137 changes: 98 additions & 39 deletions nodedb-vector/src/collection/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::flat::FlatIndex;
use crate::hnsw::{HnswIndex, HnswParams};
use crate::quantize::sq8::Sq8Codec;
use crate::index_config::{IndexConfig, IndexType};

use super::segment::{BuildRequest, BuildingSegment, DEFAULT_SEAL_THRESHOLD, SealedSegment};

Expand Down Expand Up @@ -40,6 +40,8 @@ pub struct VectorCollection {
pub multi_doc_map: std::collections::HashMap<String, Vec<u32>>,
/// Number of vectors in the growing segment before sealing.
pub(crate) seal_threshold: usize,
/// Full index configuration (index type, PQ params, IVF params).
pub(crate) index_config: IndexConfig,
}

impl VectorCollection {
Expand All @@ -50,6 +52,25 @@ impl VectorCollection {

/// Create an empty collection with an explicit seal threshold.
pub fn with_seal_threshold(dim: usize, params: HnswParams, seal_threshold: usize) -> Self {
let index_config = IndexConfig {
hnsw: params.clone(),
..IndexConfig::default()
};
Self::with_seal_threshold_and_config(dim, index_config, seal_threshold)
}

/// Create an empty collection with a full index configuration.
pub fn with_index_config(dim: usize, config: IndexConfig) -> Self {
Self::with_seal_threshold_and_config(dim, config, DEFAULT_SEAL_THRESHOLD)
}

/// Create an empty collection with a full index config and custom seal threshold.
pub fn with_seal_threshold_and_config(
dim: usize,
config: IndexConfig,
seal_threshold: usize,
) -> Self {
let params = config.hnsw.clone();
Self {
growing: FlatIndex::new(dim, params.metric),
growing_base_id: 0,
Expand All @@ -66,6 +87,7 @@ impl VectorCollection {
doc_id_map: std::collections::HashMap::new(),
multi_doc_map: std::collections::HashMap::new(),
seal_threshold,
index_config: config,
}
}

Expand Down Expand Up @@ -213,53 +235,28 @@ impl VectorCollection {
.position(|b| b.segment_id == segment_id)
{
let building = self.building.remove(pos);
let sq8 = Self::build_sq8_for_index(&index);
let use_pq = self.index_config.index_type == IndexType::HnswPq;
let (sq8, pq) = if use_pq {
(
None,
Self::build_pq_for_index(&index, self.index_config.pq_m),
)
} else {
(Self::build_sq8_for_index(&index), None)
};
let (tier, mmap_vectors) = self.resolve_tier_for_build(segment_id, &index);

self.sealed.push(SealedSegment {
index,
base_id: building.base_id,
sq8,
pq,
tier,
mmap_vectors,
});
}
}

/// Build SQ8 quantized data for an HNSW index.
pub fn build_sq8_for_index(index: &HnswIndex) -> Option<(Sq8Codec, Vec<u8>)> {
if index.live_count() < 1000 {
return None;
}
let dim = index.dim();
let n = index.len();

let mut refs: Vec<&[f32]> = Vec::with_capacity(n);
for i in 0..n {
if !index.is_deleted(i as u32)
&& let Some(v) = index.get_vector(i as u32)
{
refs.push(v);
}
}
if refs.is_empty() {
return None;
}

let codec = Sq8Codec::calibrate(&refs, dim);

let mut data = Vec::with_capacity(dim * n);
for i in 0..n {
if let Some(v) = index.get_vector(i as u32) {
data.extend(codec.quantize(v));
} else {
data.extend(vec![0u8; dim]);
}
}

Some((codec, data))
}

/// Access sealed segments (read-only).
pub fn sealed_segments(&self) -> &[SealedSegment] {
&self.sealed
Expand All @@ -276,10 +273,64 @@ impl VectorCollection {
}

/// Compact sealed segments by removing tombstoned nodes.
///
/// Rewrites `doc_id_map` and `multi_doc_map` for every sealed segment
/// so that global ids continue to resolve to the correct document
/// strings after local-id renumbering.
pub fn compact(&mut self) -> usize {
let mut total_removed = 0;
for seg in &mut self.sealed {
total_removed += seg.index.compact();
let base_id = seg.base_id;
let (removed, id_map) = seg.index.compact_with_map();
total_removed += removed;
if removed == 0 {
continue;
}

// Rebuild doc_id_map for entries in [base_id, base_id + id_map.len()).
let segment_end = base_id as u64 + id_map.len() as u64;
let doc_keys: Vec<u32> = self
.doc_id_map
.keys()
.copied()
.filter(|&k| (k as u64) >= base_id as u64 && (k as u64) < segment_end)
.collect();
// Two-phase: remove all old entries first, then insert new ones so
// we don't clobber a freshly-remapped entry with a later tombstone
// removal.
let mut new_entries: Vec<(u32, String)> = Vec::with_capacity(doc_keys.len());
for old_global in &doc_keys {
let doc = self.doc_id_map.remove(old_global);
let old_local = (old_global - base_id) as usize;
let new_local = id_map[old_local];
if new_local != u32::MAX
&& let Some(doc) = doc
{
new_entries.push((base_id + new_local, doc));
}
}
for (k, v) in new_entries {
self.doc_id_map.insert(k, v);
}

// Rewrite multi_doc_map entries for this segment.
for ids in self.multi_doc_map.values_mut() {
ids.retain_mut(|vid| {
let v = *vid;
if (v as u64) >= base_id as u64 && (v as u64) < segment_end {
let old_local = (v - base_id) as usize;
let new_local = id_map[old_local];
if new_local == u32::MAX {
false
} else {
*vid = base_id + new_local;
true
}
} else {
true
}
});
}
}
total_removed
}
Expand Down Expand Up @@ -382,12 +433,20 @@ impl VectorCollection {
0.0
};

let quantization = if self.sealed.iter().any(|s| s.sq8.is_some()) {
let quantization = if self.sealed.iter().any(|s| s.pq.is_some()) {
nodedb_types::VectorIndexQuantization::Pq
} else if self.sealed.iter().any(|s| s.sq8.is_some()) {
nodedb_types::VectorIndexQuantization::Sq8
} else {
nodedb_types::VectorIndexQuantization::None
};

let index_type = match self.index_config.index_type {
IndexType::HnswPq => nodedb_types::VectorIndexType::HnswPq,
IndexType::IvfPq => nodedb_types::VectorIndexType::IvfPq,
IndexType::Hnsw => nodedb_types::VectorIndexType::Hnsw,
};

let hnsw_mem: usize = self
.sealed
.iter()
Expand Down Expand Up @@ -422,7 +481,7 @@ impl VectorCollection {
memory_bytes,
disk_bytes,
build_in_progress: !self.building.is_empty(),
index_type: nodedb_types::VectorIndexType::Hnsw,
index_type,
hnsw_m: self.params.m,
hnsw_m0: self.params.m0,
hnsw_ef_construction: self.params.ef_construction,
Expand Down
1 change: 1 addition & 0 deletions nodedb-vector/src/collection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod budget;
pub mod checkpoint;
pub mod lifecycle;
pub mod quantize;
pub mod search;
pub mod segment;
pub mod tier;
Expand Down
Loading
Loading