Skip to content

Commit

Permalink
feat: Asynchoronous indexing (#1206)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Mar 11, 2023
1 parent d937a4c commit b4b7f2d
Show file tree
Hide file tree
Showing 22 changed files with 403 additions and 235 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub fn initialize_cache(
cache.insert(&mut record.record).unwrap();
}
cache.commit().unwrap();
cache_manager.wait_until_indexing_catchup();

cache_manager
.create_alias(cache.name(), schema_name)
Expand Down
1 change: 1 addition & 0 deletions dozer-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ itertools = "0.10.5"
roaring = "0.10.1"
dozer-storage = { path = "../dozer-storage" }
uuid = { version = "1.3.0", features = ["v4"] }
rayon = "1.7.0"

[dev-dependencies]
criterion = "0.4"
Expand Down
86 changes: 39 additions & 47 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::{Path, PathBuf};

use dozer_storage::{
errors::StorageError,
lmdb::RoTransaction,
Expand All @@ -8,6 +10,7 @@ use dozer_types::{
borrow::IntoOwned,
types::{Field, FieldType, Record, Schema, SchemaWithIndex},
};
use tempdir::TempDir;

use crate::{
cache::{index, lmdb::utils::init_env, RecordWithId},
Expand All @@ -25,6 +28,10 @@ pub trait MainEnvironment: BeginTransaction {

fn schema(&self) -> &SchemaWithIndex;

fn base_path(&self) -> &Path {
&self.common().base_path
}

fn name(&self) -> &str {
&self.common().name
}
Expand Down Expand Up @@ -54,6 +61,8 @@ pub trait MainEnvironment: BeginTransaction {

#[derive(Debug)]
pub struct MainEnvironmentCommon {
/// The environment base path.
base_path: PathBuf,
/// The environment name.
name: String,
/// The operation log.
Expand All @@ -65,6 +74,7 @@ pub struct MainEnvironmentCommon {
pub struct RwMainEnvironment {
txn: SharedTransaction,
common: MainEnvironmentCommon,
_temp_dir: Option<TempDir>,
schema: SchemaWithIndex,
}

Expand All @@ -87,48 +97,41 @@ impl MainEnvironment for RwMainEnvironment {
}

impl RwMainEnvironment {
pub fn open(
common_options: &CacheCommonOptions,
write_options: CacheWriteOptions,
) -> Result<Self, CacheError> {
let (env, common, schema) = open_env_with_schema(common_options, Some(write_options))?;

Ok(Self {
txn: env.create_txn()?,
common,
schema,
})
}

pub fn create(
schema: &SchemaWithIndex,
pub fn new(
schema: Option<&SchemaWithIndex>,
common_options: &CacheCommonOptions,
write_options: CacheWriteOptions,
) -> Result<Self, CacheError> {
let (env, common, schema_option, old_schema) =
let (env, common, schema_option, old_schema, temp_dir) =
open_env(common_options, Some(write_options))?;
let txn = env.create_txn()?;

let schema = if let Some(old_schema) = old_schema {
if &old_schema != schema {
return Err(CacheError::SchemaMismatch {
name: common.name,
given: Box::new(schema.clone()),
stored: Box::new(old_schema),
});
let schema = match (schema, old_schema) {
(Some(schema), Some(old_schema)) => {
if &old_schema != schema {
return Err(CacheError::SchemaMismatch {
name: common.name,
given: Box::new(schema.clone()),
stored: Box::new(old_schema),
});
}
old_schema
}
old_schema
} else {
let mut txn = txn.write();
schema_option.store(txn.txn_mut(), schema)?;
txn.commit_and_renew()?;
schema.clone()
(Some(schema), None) => {
let mut txn = txn.write();
schema_option.store(txn.txn_mut(), schema)?;
txn.commit_and_renew()?;
schema.clone()
}
(None, Some(schema)) => schema,
(None, None) => return Err(CacheError::SchemaNotFound),
};

Ok(Self {
txn,
common,
schema,
_temp_dir: temp_dir,
})
}

Expand Down Expand Up @@ -197,7 +200,8 @@ impl MainEnvironment for RoMainEnvironment {

impl RoMainEnvironment {
pub fn new(common_options: &CacheCommonOptions) -> Result<Self, CacheError> {
let (env, common, schema) = open_env_with_schema(common_options, None)?;
let (env, common, _, schema, _) = open_env(common_options, None)?;
let schema = schema.ok_or(CacheError::SchemaNotFound)?;
Ok(Self {
env,
common,
Expand All @@ -206,6 +210,7 @@ impl RoMainEnvironment {
}
}

#[allow(clippy::type_complexity)]
fn open_env(
common_options: &CacheCommonOptions,
write_options: Option<CacheWriteOptions>,
Expand All @@ -215,10 +220,11 @@ fn open_env(
MainEnvironmentCommon,
LmdbOption<SchemaWithIndex>,
Option<SchemaWithIndex>,
Option<TempDir>,
),
CacheError,
> {
let (mut env, name) = init_env(common_options, write_options)?;
let (mut env, (base_path, name), temp_dir) = init_env(common_options, write_options)?;

let create_if_not_exist = write_options.is_some();
let operation_log = OperationLog::new(&mut env, create_if_not_exist)?;
Expand All @@ -231,31 +237,17 @@ fn open_env(
Ok((
env,
MainEnvironmentCommon {
base_path,
name,
operation_log,
intersection_chunk_size: common_options.intersection_chunk_size,
},
schema_option,
schema,
temp_dir,
))
}

fn open_env_with_schema(
common_options: &CacheCommonOptions,
write_options: Option<CacheWriteOptions>,
) -> Result<
(
LmdbEnvironmentManager,
MainEnvironmentCommon,
SchemaWithIndex,
),
CacheError,
> {
let (env, common, _, schema) = open_env(common_options, write_options)?;
let schema = schema.ok_or(CacheError::SchemaNotFound)?;
Ok((env, common, schema))
}

fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
debug_assert_eq!(schema.identifier, record.schema_id);
debug_assert_eq!(schema.fields.len(), record.values.len());
Expand Down
115 changes: 42 additions & 73 deletions dozer-cache/src/cache/lmdb/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
use std::fmt::Debug;
use std::path::PathBuf;

use dozer_storage::BeginTransaction;
use std::{fmt::Debug, sync::Arc};

use dozer_types::types::{Record, SchemaWithIndex};

use super::super::{RoCache, RwCache};
use super::{
super::{RoCache, RwCache},
indexing::IndexingThreadPool,
};
use crate::cache::expression::QueryExpression;
use crate::cache::RecordWithId;
use crate::errors::CacheError;

mod helper;
mod main_environment;
mod query;
mod secondary_environment;

use main_environment::{MainEnvironment, RoMainEnvironment, RwMainEnvironment};
pub use main_environment::{MainEnvironment, RoMainEnvironment, RwMainEnvironment};
use query::LmdbQueryHandler;
pub use secondary_environment::SecondaryEnvironment;
use secondary_environment::{RoSecondaryEnvironment, RwSecondaryEnvironment};
pub use secondary_environment::{
RoSecondaryEnvironment, RwSecondaryEnvironment, SecondaryEnvironment,
};

#[derive(Clone, Debug)]
pub struct CacheCommonOptions {
Expand Down Expand Up @@ -75,61 +76,53 @@ pub struct CacheWriteOptions {
impl Default for CacheWriteOptions {
fn default() -> Self {
Self {
max_size: 1024 * 1024 * 1024 * 1024,
max_size: 1024 * 1024 * 1024,
}
}
}

#[derive(Debug)]
pub struct LmdbRwCache {
main_env: RwMainEnvironment,
secondary_envs: Vec<RwSecondaryEnvironment>,
secondary_envs: Vec<RoSecondaryEnvironment>,
}

impl LmdbRwCache {
pub fn open(
pub fn new(
schema: Option<&SchemaWithIndex>,
common_options: &CacheCommonOptions,
write_options: CacheWriteOptions,
indexing_thread_pool: &mut IndexingThreadPool,
) -> Result<Self, CacheError> {
let main_env = RwMainEnvironment::open(common_options, write_options)?;
let secondary_envs = (0..main_env.schema().1.len())
.map(|index| {
RwSecondaryEnvironment::open(
secondary_environment_name(index),
common_options,
write_options,
)
})
.collect::<Result<_, _>>()?;
Ok(Self {
main_env,
secondary_envs,
})
}
let rw_main_env = RwMainEnvironment::new(schema, common_options, write_options)?;

let common_options = CacheCommonOptions {
path: Some((
rw_main_env.base_path().to_path_buf(),
rw_main_env.name().to_string(),
)),
..*common_options
};
let ro_main_env = Arc::new(RoMainEnvironment::new(&common_options)?);

let mut ro_secondary_envs = vec![];
for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() {
let name = secondary_environment_name(index);
let rw_secondary_env = RwSecondaryEnvironment::new(
Some(index_definition),
name.clone(),
&common_options,
write_options,
)?;
indexing_thread_pool.add_indexing_task(ro_main_env.clone(), Arc::new(rw_secondary_env));

let ro_secondary_env = RoSecondaryEnvironment::new(name, &common_options)?;
ro_secondary_envs.push(ro_secondary_env);
}

pub fn create(
schema: &SchemaWithIndex,
common_options: &CacheCommonOptions,
write_options: CacheWriteOptions,
) -> Result<Self, CacheError> {
let main_env = RwMainEnvironment::create(schema, common_options, write_options)?;
let secondary_envs = main_env
.schema()
.1
.iter()
.enumerate()
.map(|(index, index_definition)| {
RwSecondaryEnvironment::create(
index_definition,
secondary_environment_name(index),
common_options,
write_options,
)
})
.collect::<Result<_, _>>()?;
Ok(Self {
main_env,
secondary_envs,
main_env: rw_main_env,
secondary_envs: ro_secondary_envs,
})
}
}
Expand Down Expand Up @@ -161,46 +154,22 @@ impl RwCache for LmdbRwCache {
let span = dozer_types::tracing::span!(dozer_types::tracing::Level::TRACE, "insert_cache");
let _enter = span.enter();
let record_id = self.main_env.insert(record)?;

let span = dozer_types::tracing::span!(
dozer_types::tracing::Level::TRACE,
"build_indexes",
record_id = record_id,
);
let _enter = span.enter();
self.index()?;

Ok(record_id)
}

fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
let version = self.main_env.delete(key)?;
self.index()?;
Ok(version)
}

fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
let version = self.delete(key)?;
self.insert(record)?;
self.index()?;
Ok(version)
}

fn commit(&self) -> Result<(), CacheError> {
self.main_env.commit()?;
for secondary_env in &self.secondary_envs {
secondary_env.commit()?;
}
Ok(())
}
}

impl LmdbRwCache {
fn index(&self) -> Result<(), CacheError> {
let main_txn = self.main_env.begin_txn()?;
for secondary_env in &self.secondary_envs {
secondary_env.index(&main_txn, self.main_env.operation_log())?;
}
Ok(())
}
}
Expand Down Expand Up @@ -233,7 +202,7 @@ impl LmdbCache for LmdbRwCache {
&self.main_env
}

type SecondaryEnvironment = RwSecondaryEnvironment;
type SecondaryEnvironment = RoSecondaryEnvironment;
fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
&self.secondary_envs[index]
}
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions dozer-cache/src/cache/lmdb/cache/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod handler;
mod intersection;
mod lmdb_cmp;
mod secondary;

pub use handler::LmdbQueryHandler;
Expand Down
Loading

0 comments on commit b4b7f2d

Please sign in to comment.