Skip to content

Commit

Permalink
fix insert after delete problem in aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
snork-alt committed Jan 22, 2023
1 parent 118d0fa commit 1b3dc47
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
28 changes: 14 additions & 14 deletions dozer-sql/src/pipeline/aggregation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,17 @@ impl AggregationProcessor {
None => 0_u64,
};

txn.put(
db,
key.as_slice(),
(if decr {
curr_count.wrapping_sub(delta)
} else {
curr_count.wrapping_add(delta)
})
.to_be_bytes()
.as_slice(),
)?;
let new_val = if decr {
curr_count.wrapping_sub(delta)
} else {
curr_count.wrapping_add(delta)
};

if new_val > 0 {
txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
} else {
txn.del(db, key.as_slice(), None)?;
}
Ok(curr_count)
}

Expand Down Expand Up @@ -383,10 +383,10 @@ impl AggregationProcessor {
}
};

if prev_count > 0 {
txn.put(db, record_key.as_slice(), new_state.as_slice())?;
} else {
if prev_count == 1 {
let _ = txn.del(db, record_key.as_slice(), None)?;
} else {
txn.put(db, record_key.as_slice(), new_state.as_slice())?;
}
Ok(res)
}
Expand Down
47 changes: 46 additions & 1 deletion dozer-sql/src/pipeline/aggregation/tests/aggregation_null.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::output;
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
use crate::pipeline::aggregation::tests::aggregation_tests_utils::{
init_input_schema, init_processor, FIELD_100_INT,
delete_exp, delete_field, init_input_schema, init_processor, insert_exp, insert_field,
FIELD_100_INT, FIELD_1_INT, ITALY,
};
use crate::pipeline::builder::SchemaSQLContext;
use crate::pipeline::tests::utils::get_select;
Expand Down Expand Up @@ -47,6 +48,50 @@ fn test_sum_aggregation_null() {
assert_eq!(out, exp);
}

#[test]
fn test_sum_aggregation_del_and_insert() {
let schema = init_input_schema(Int, "COUNT");
let (processor, tx) = init_processor(
"SELECT Country, COUNT(Salary) \
FROM Users \
WHERE Salary >= 1 GROUP BY Country",
HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
)
.unwrap();

// Insert 100 for segment Italy
/*
Italy, 100.0
-------------
COUNT = 1
*/
let mut inp = insert_field(ITALY, FIELD_100_INT);
let mut out = output!(processor, inp, tx);
let mut exp = vec![insert_exp(ITALY, FIELD_1_INT)];
assert_eq!(out, exp);

// Delete last record
/*
-------------
COUNT = 0
*/
inp = delete_field(ITALY, FIELD_100_INT);
out = output!(processor, inp, tx);
exp = vec![delete_exp(ITALY, FIELD_1_INT)];
assert_eq!(out, exp);

// Insert 100 for segment Italy
/*
Italy, 100.0
-------------
COUNT = 1
*/
let mut inp = insert_field(ITALY, FIELD_100_INT);
let mut out = output!(processor, inp, tx);
let mut exp = vec![insert_exp(ITALY, FIELD_1_INT)];
assert_eq!(out, exp);
}

#[test]
fn test_aggregation_alias() {
let schema = Schema::empty()
Expand Down

0 comments on commit 1b3dc47

Please sign in to comment.