Skip to content

Commit

Permalink
perf: Wrap ProcessorRecord::lifetime in box to avoid occupying memo…
Browse files Browse the repository at this point in the history
…ry when it's `None`
  • Loading branch information
chubei committed Jul 26, 2023
1 parent 99acb37 commit 60433ef
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
6 changes: 3 additions & 3 deletions dozer-core/src/processor_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct ProcessorRecord {
total_len: u32,

/// Time To Live for this record. If the value is None, the record will never expire.
pub lifetime: Option<Lifetime>,
lifetime: Option<Box<Lifetime>>,

// Imagine that we flatten all the fields in `values` recursively, `index` is the index into the flattened vector.
index: Vec<u32>,
Expand Down Expand Up @@ -69,10 +69,10 @@ impl ProcessorRecord {
}

pub fn get_lifetime(&self) -> Option<Lifetime> {
self.lifetime.clone()
self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
}
pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
self.lifetime = lifetime;
self.lifetime = lifetime.map(Box::new);
}

pub fn get_field_indexes(&self) -> &[u32] {
Expand Down
16 changes: 8 additions & 8 deletions dozer-sql/src/pipeline/product/join/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl JoinOperator {

add_join_record(&mut self.left_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand All @@ -480,7 +480,7 @@ impl JoinOperator {

add_join_record(&mut self.right_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand All @@ -494,7 +494,7 @@ impl JoinOperator {

add_join_record(&mut self.left_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand All @@ -508,7 +508,7 @@ impl JoinOperator {

add_join_record(&mut self.right_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand All @@ -522,7 +522,7 @@ impl JoinOperator {

add_join_record(&mut self.left_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand All @@ -536,7 +536,7 @@ impl JoinOperator {

add_join_record(&mut self.right_map, join_key, primary_key, &new);

if let Some(lifetime) = new.get_record().lifetime.clone() {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.insert_evict_index(from, lifetime, join_key, primary_key)?
}

Expand Down Expand Up @@ -619,8 +619,8 @@ fn join_records(
left_record: ProcessorRecordRef,
right_record: ProcessorRecordRef,
) -> ProcessorRecordRef {
let left_lifetime = left_record.get_record().lifetime.clone();
let right_lifetime = right_record.get_record().lifetime.clone();
let left_lifetime = left_record.get_record().get_lifetime();
let right_lifetime = right_record.get_record().get_lifetime();

let mut output_record = ProcessorRecord::new();
output_record.extend_referenced_record(left_record);
Expand Down
9 changes: 5 additions & 4 deletions dozer-sql/src/pipeline/product/join/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use dozer_core::node::{PortHandle, Processor};
use dozer_core::DEFAULT_PORT_HANDLE;
use dozer_types::errors::internal::BoxedError;
use dozer_types::labels::Labels;
use dozer_types::types::Lifetime;
use metrics::{
counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
increment_counter,
Expand Down Expand Up @@ -60,7 +61,7 @@ impl ProductProcessor {
}
}

fn update_eviction_index(&mut self, lifetime: &dozer_types::types::Lifetime) {
fn update_eviction_index(&mut self, lifetime: Lifetime) {
let now = &lifetime.reference;
let old_instants = self.join_operator.evict_index(&JoinBranch::Left, now);
self.join_operator
Expand Down Expand Up @@ -91,7 +92,7 @@ impl Processor for ProductProcessor {
let now = std::time::Instant::now();
let records = match op {
ProcessorOperation::Delete { old } => {
if let Some(lifetime) = &old.get_record().lifetime {
if let Some(lifetime) = old.get_record().get_lifetime() {
self.update_eviction_index(lifetime);
}

Expand All @@ -100,7 +101,7 @@ impl Processor for ProductProcessor {
.map_err(PipelineError::JoinError)?
}
ProcessorOperation::Insert { new } => {
if let Some(lifetime) = &new.get_record().lifetime {
if let Some(lifetime) = new.get_record().get_lifetime() {
self.update_eviction_index(lifetime);
}

Expand All @@ -109,7 +110,7 @@ impl Processor for ProductProcessor {
.map_err(PipelineError::JoinError)?
}
ProcessorOperation::Update { old, new } => {
if let Some(lifetime) = &old.get_record().lifetime {
if let Some(lifetime) = old.get_record().get_lifetime() {
self.update_eviction_index(lifetime);
}

Expand Down
8 changes: 4 additions & 4 deletions dozer-sql/src/pipeline/projection/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl ProjectionProcessor {
.extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
}

output_record.set_lifetime(record.get_record().lifetime.to_owned());
output_record.set_lifetime(record.get_record().get_lifetime());

Ok(ProcessorOperation::Delete {
old: ProcessorRecordRef::new(output_record),
Expand All @@ -46,7 +46,7 @@ impl ProjectionProcessor {
.extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
}

output_record.set_lifetime(record.get_record().lifetime.to_owned());
output_record.set_lifetime(record.get_record().get_lifetime());
Ok(ProcessorOperation::Insert {
new: ProcessorRecordRef::new(output_record),
})
Expand All @@ -66,9 +66,9 @@ impl ProjectionProcessor {
.extend_direct_field(expr.evaluate(new.get_record(), &self.input_schema)?);
}

old_output_record.set_lifetime(old.get_record().lifetime.to_owned());
old_output_record.set_lifetime(old.get_record().get_lifetime());

new_output_record.set_lifetime(new.get_record().lifetime.to_owned());
new_output_record.set_lifetime(new.get_record().get_lifetime());
Ok(ProcessorOperation::Update {
old: ProcessorRecordRef::new(old_output_record),
new: ProcessorRecordRef::new(new_output_record),
Expand Down

0 comments on commit 60433ef

Please sign in to comment.