Skip to content

Commit

Permalink
fix: Update resolution atomicity and stream operations. (#1390)
Browse files Browse the repository at this point in the history
Fixes #1368 and #1288.
  • Loading branch information
chubei committed Apr 6, 2023
1 parent 6c8a33a commit 676c73b
Show file tree
Hide file tree
Showing 15 changed files with 872 additions and 668 deletions.
85 changes: 64 additions & 21 deletions dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::Path;

use dozer_cache::{
cache::{index::get_primary_key, RwCache, RwCacheManager},
cache::{index::get_primary_key, RwCache, RwCacheManager, UpsertResult},
errors::CacheError,
};
use dozer_core::executor::ExecutorOperation;
Expand All @@ -10,7 +10,7 @@ use dozer_types::{
indicatif::MultiProgress,
log::error,
models::api_endpoint::ConflictResolution,
types::{FieldDefinition, FieldType, IndexDefinition, Operation, Schema},
types::{Field, FieldDefinition, FieldType, IndexDefinition, Operation, Record, Schema},
};
use futures_util::{Future, StreamExt};
use tokio::sync::broadcast::Sender;
Expand Down Expand Up @@ -56,39 +56,47 @@ async fn build_cache(
Operation::Delete { mut old } => {
old.schema_id = schema.identifier;
let key = get_primary_key(&schema.primary_index, &old.values);
let version = cache.delete(&key)?;
old.version = Some(version);

if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
let operation =
types_helper::map_delete_operation(endpoint_name.clone(), old);
send_and_log_error(operations_sender, operation);
if let Some(meta) = cache.delete(&key)? {
old.version = Some(meta.version);

if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref()
{
let operation =
types_helper::map_delete_operation(endpoint_name.clone(), old);
send_and_log_error(operations_sender, operation);
}
}
}
Operation::Insert { mut new } => {
new.schema_id = schema.identifier;
let id = cache.insert(&mut new)?;
let result = cache.insert(&new)?;

if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
let operation =
types_helper::map_insert_operation(endpoint_name.clone(), new, id);
send_and_log_error(operations_sender, operation);
send_upsert_result(
endpoint_name,
operations_sender,
result,
&schema,
None,
new,
);
}
}
Operation::Update { mut old, mut new } => {
old.schema_id = schema.identifier;
new.schema_id = schema.identifier;
let key = get_primary_key(&schema.primary_index, &old.values);
let (version, id) = cache.update(&key, &mut new)?;
let upsert_result = cache.update(&key, &new)?;

if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
let operation = if let Some(version) = version {
old.version = Some(version);
types_helper::map_update_operation(endpoint_name.clone(), old, new)
} else {
types_helper::map_insert_operation(endpoint_name.clone(), new, id)
};
send_and_log_error(operations_sender, operation);
send_upsert_result(
endpoint_name,
operations_sender,
upsert_result,
&schema,
Some(old),
new,
);
}
}
},
Expand Down Expand Up @@ -140,6 +148,41 @@ fn generate_secondary_indexes(fields: &[FieldDefinition]) -> Vec<IndexDefinition
.collect()
}

fn send_upsert_result(
endpoint_name: &str,
operations_sender: &Sender<GrpcOperation>,
upsert_result: UpsertResult,
schema: &Schema,
old: Option<Record>,
mut new: Record,
) {
match upsert_result {
UpsertResult::Inserted { meta } => {
new.version = Some(meta.version);
let op = types_helper::map_insert_operation(endpoint_name.to_string(), new, meta.id);
send_and_log_error(operations_sender, op);
}
UpsertResult::Updated { old_meta, new_meta } => {
// If `old` is `None`, it means `Updated` comes from `Insert` operation.
// In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record.
// So we create the old record with only the fields in the primary index, cloned from `new`.
let mut old = old.unwrap_or_else(|| {
let mut record =
Record::new(new.schema_id, vec![Field::Null; new.values.len()], None);
for index in schema.primary_index.iter() {
record.values[*index] = new.values[*index].clone();
}
record
});
old.version = Some(old_meta.version);
new.version = Some(new_meta.version);
let op = types_helper::map_update_operation(endpoint_name.to_string(), old, new);
send_and_log_error(operations_sender, op);
}
UpsertResult::Ignored => {}
}
}

fn send_and_log_error<T: Send + Sync + 'static>(sender: &Sender<T>, msg: T) {
if let Err(e) = sender.send(msg) {
error!("Failed to send broadcast message: {}", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
use crate::cache::index;
use crate::cache::lmdb::cache::MainEnvironment;
use crate::cache::test_utils::schema_multi_indices;
use crate::errors::CacheError;
use dozer_types::models::api_endpoint::{
ConflictResolution, OnDeleteResolutionTypes, OnInsertResolutionTypes, OnUpdateResolutionTypes,
};
use dozer_types::types::{Field, Record, Schema};

use super::RwMainEnvironment;

fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Schema) {
let schema = schema_multi_indices();
let main_env =
RwMainEnvironment::new(Some(&schema), &Default::default(), conflict_resolution).unwrap();
(main_env, schema.0)
}

#[test]
fn ignore_insert_error_when_type_nothing() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::Nothing as i32,
on_update: OnUpdateResolutionTypes::default() as i32,
on_delete: OnDeleteResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
let record = Record {
schema_id: schema.identifier,
values: initial_values.clone(),
version: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap().record;

assert_eq!(initial_values, record.values);
assert_eq!(Some(1), record.version);

env.insert(&record).unwrap();

let record = env.get(&key).unwrap().record;

// version should remain unchanged, because insert should be ignored
assert_eq!(initial_values, record.values);
assert_eq!(Some(1), record.version);
}

#[test]
fn update_after_insert_error_when_type_update() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::Update as i32,
on_update: OnUpdateResolutionTypes::default() as i32,
on_delete: OnDeleteResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
let record = Record {
schema_id: schema.identifier,
values: initial_values.clone(),
version: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap().record;

assert_eq!(initial_values, record.values);
assert_eq!(Some(1), record.version);

let second_insert_values = vec![
Field::Int(1),
Field::String("Second insert name".to_string()),
];
let second_record = Record {
schema_id: schema.identifier,
values: second_insert_values.clone(),
version: None,
};

env.insert(&second_record).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap().record;

// version should increase, because record should be updated
assert_eq!(second_insert_values, record.values);
assert_eq!(Some(2), record.version);

// Check cache size. It should have only one record
let current_count = env.count().unwrap();
assert_eq!(current_count, 1_usize);
}

#[test]
fn return_insert_error_when_type_panic() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::Panic as i32,
on_update: OnUpdateResolutionTypes::default() as i32,
on_delete: OnDeleteResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
let record = Record {
schema_id: schema.identifier,
values: initial_values.clone(),
version: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap().record;

assert_eq!(initial_values, record.values);
assert_eq!(Some(1), record.version);

// Try insert same data again
let result = env.insert(&record);
assert!(matches!(result, Err(CacheError::PrimaryKeyExists)));
}

#[test]
fn ignore_update_error_when_type_nothing() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::default() as i32,
on_update: OnUpdateResolutionTypes::Nothing as i32,
on_delete: OnDeleteResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::Null];
let update_values = vec![
Field::Int(1),
Field::String("Film name updated".to_string()),
];

let update_record = Record {
schema_id: schema.identifier,
values: update_values,
version: None,
};
env.update(
&index::get_primary_key(&schema.primary_index, &initial_values),
&update_record,
)
.unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key);

assert!(matches!(record, Err(CacheError::PrimaryKeyNotFound)));
}

#[test]
fn update_after_update_error_when_type_upsert() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::default() as i32,
on_update: OnUpdateResolutionTypes::Upsert as i32,
on_delete: OnDeleteResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::Null];
let update_values = vec![
Field::Int(1),
Field::String("Film name updated".to_string()),
];

let update_record = Record {
schema_id: schema.identifier,
values: update_values.clone(),
version: None,
};
env.update(
&index::get_primary_key(&schema.primary_index, &initial_values),
&update_record,
)
.unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap().record;

assert_eq!(update_values, record.values);
assert_eq!(Some(1), record.version);
}

#[test]
fn return_update_error_when_type_panic() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::default() as i32,
on_update: OnUpdateResolutionTypes::Panic as i32,
on_delete: OnInsertResolutionTypes::default() as i32,
});

let initial_values = vec![Field::Int(1), Field::Null];
let update_values = vec![
Field::Int(1),
Field::String("Film name updated".to_string()),
];

let update_record = Record {
schema_id: schema.identifier,
values: update_values.clone(),
version: None,
};

let result = env.update(
&index::get_primary_key(&schema.primary_index, &initial_values),
&update_record,
);

assert!(matches!(result, Err(CacheError::PrimaryKeyNotFound)));
}

#[test]
fn ignore_delete_error_when_type_nothing() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::default() as i32,
on_update: OnUpdateResolutionTypes::default() as i32,
on_delete: OnUpdateResolutionTypes::Nothing as i32,
});

let initial_values = vec![Field::Int(1), Field::Null];

// Check is cache empty
let current_count = env.count().unwrap();
assert_eq!(current_count, 0_usize);

// Trying delete not existing record should be ignored
let result = env.delete(&index::get_primary_key(
&schema.primary_index,
&initial_values,
));
assert!(result.is_ok());
}

#[test]
fn return_delete_error_when_type_panic() {
let (mut env, schema) = init_env(ConflictResolution {
on_insert: OnInsertResolutionTypes::default() as i32,
on_update: OnUpdateResolutionTypes::default() as i32,
on_delete: OnDeleteResolutionTypes::Panic as i32,
});

let initial_values = vec![Field::Int(1), Field::Null];

let result = env.delete(&index::get_primary_key(
&schema.primary_index,
&initial_values,
));
assert!(matches!(result, Err(CacheError::PrimaryKeyNotFound)));
}
Loading

0 comments on commit 676c73b

Please sign in to comment.