Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: aggregate count #122

Merged
merged 7 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/polodb_core/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ pub enum Error {
UnknownBsonElementType(u8),
#[error("failed to run regex expression: {}, expression: {}, options: {}", .0.error, .0.expression, .0.options)]
RegexError(Box<RegexError>),
#[error("unknown aggression operation: {0}")]
UnknownAggregationOperation(String),
#[error("invalid aggregation stage: {0:?}")]
InvalidAggregationStage(Box<Document>),
}

impl Error {
Expand Down
69 changes: 68 additions & 1 deletion src/polodb_core/tests/test_aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use bson::{doc, Document};
use polodb_core::{Database, Result};

Expand Down Expand Up @@ -78,4 +83,66 @@ fn test_aggregate_match() {

assert_eq!(result[0].get("name").unwrap().as_str().unwrap(), "banana");
assert_eq!(result[1].get("name").unwrap().as_str().unwrap(), "pear");
}
}

#[test]
fn test_aggregate_count() {
let db = Database::open_memory().unwrap();
let fruits = db.collection::<Document>("fruits");
fruits.insert_many(vec![
doc! {
"name": "apple",
"color": "red",
"shape": "round",
},
doc! {
"name": "banana",
"color": "yellow",
"shape": "long",
},
doc! {
"name": "orange",
"color": "orange",
"shape": "round",
},
doc! {
"name": "pear",
"color": "yellow",
"shape": "round",
},
doc! {
"name": "peach",
"color": "orange",
"shape": "round",
},
]).unwrap();

let result = fruits
.aggregate(vec![
doc! {
"$match": {
"color": "yellow",
},
},
doc! {
"$count": "count",
}
])
.unwrap()
.collect::<Result<Vec<Document>>>()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("count").unwrap().as_i64().unwrap(), 2);

let result = fruits
.aggregate(vec![
doc! {
"$count": "count",
}
])
.unwrap()
.collect::<Result<Vec<Document>>>()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("count").unwrap().as_i64().unwrap(), 5);
}
24 changes: 24 additions & 0 deletions src/polodb_core/vm/aggregation_codegen_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

use crate::vm::label::Label;

pub(crate) struct PipelineItem {
pub next_label: Label,
pub complete_label: Option<Label>,
}

pub(crate) struct AggregationCodeGenContext {
pub items: Vec<PipelineItem>
}

impl Default for AggregationCodeGenContext {
fn default() -> Self {
AggregationCodeGenContext {
items: Vec::default(),
}
}
}
204 changes: 199 additions & 5 deletions src/polodb_core/vm/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::vm::SubProgram;
use crate::{Error, Result};
use bson::spec::{BinarySubtype, ElementType};
use bson::{Array, Binary, Bson, Document};
use crate::vm::aggregation_codegen_context::{AggregationCodeGenContext, PipelineItem};
use crate::vm::global_variable::{GlobalVariable, GlobalVariableSlot};

const JUMP_TABLE_DEFAULT_SIZE: usize = 8;
const PATH_DEFAULT_SIZE: usize = 8;
Expand Down Expand Up @@ -116,6 +118,28 @@ impl Codegen {
*self.program
}

#[inline]
#[allow(dead_code)]
pub(super) fn new_global_variable(&mut self, init_value: Bson) -> Result<GlobalVariable> {
self.new_global_variable_impl(init_value, None)
}

#[inline]
#[allow(dead_code)]
pub(super) fn new_global_variable_with_name(&mut self, name: String, init_value: Bson) -> Result<GlobalVariable> {
self.new_global_variable_impl(init_value, Some(name.into_boxed_str()))
}

fn new_global_variable_impl(&mut self, init_value: Bson, name: Option<Box<str>>) -> Result<GlobalVariable> {
let id = self.program.global_variables.len() as u32;
self.program.global_variables.push(GlobalVariableSlot {
pos: 0,
init_value,
name,
});
Ok(GlobalVariable::new(id))
}

pub(super) fn new_label(&mut self) -> Label {
let id = self.program.label_slots.len() as u32;
self.program.label_slots.push(LabelSlot::Empty);
Expand All @@ -132,6 +156,16 @@ impl Codegen {
self.program.label_slots[label.u_pos()] = LabelSlot::UnnamedLabel(current_loc);
}

fn emit_load_global(&mut self, global: GlobalVariable) {
self.emit(DbOp::LoadGlobal);
self.emit_u32(global.pos());
}

fn emit_store_global(&mut self, global: GlobalVariable) {
self.emit(DbOp::StoreGlobal);
self.emit_u32(global.pos());
}

pub(super) fn emit_label_with_name<T: Into<Box<str>>>(&mut self, label: Label, name: T) {
if !self.program.label_slots[label.u_pos()].is_empty() {
unreachable!("this label has been emit");
Expand Down Expand Up @@ -203,6 +237,7 @@ impl Codegen {
col_spec: &CollectionSpecification,
query: &Document,
result_callback: F,
before_close: Option<Box<dyn FnOnce(&mut Codegen) -> Result<()>>>,
is_many: bool,
) -> Result<()>
where
Expand Down Expand Up @@ -242,6 +277,10 @@ impl Codegen {
// <==== close cursor
self.emit_label_with_name(close_label, "close");

if let Some(before_close) = before_close {
before_close(self)?;
}

self.emit(DbOp::Close);
self.emit(DbOp::Halt);

Expand Down Expand Up @@ -279,8 +318,7 @@ impl Codegen {
self.emit_standard_query_doc(query, result_label, compare_fun_clean)?;

self.emit_label_with_name(compare_fun_clean, "compare_function_clean");
self.emit(DbOp::Ret);
self.emit_u32(0);
self.emit_ret(0);

Ok(())
}
Expand Down Expand Up @@ -501,8 +539,7 @@ impl Codegen {
)?;

self.emit_label(ret_label);
self.emit(DbOp::Ret);
self.emit_u32(0);
self.emit_ret(0);

functions.push(query_label);
});
Expand Down Expand Up @@ -805,7 +842,9 @@ impl Codegen {
}
};

self.emit_query_tuple_document(key, doc, !is_in_not, not_found_label)?;
path_hint!(self, "$not".to_string(), {
self.emit_query_tuple_document(key, doc, !is_in_not, not_found_label)?;
});
}

_ => {
Expand Down Expand Up @@ -840,6 +879,152 @@ impl Codegen {
Ok(())
}

// There are two stage of compiling pipeline
// 1. Generate the layout code of the pipeline
// 2. Generate the implementation code of the pipeline
pub fn emit_aggregation_pipeline(&mut self, ctx: &mut AggregationCodeGenContext, pipeline: &[Document]) -> Result<()> {
if pipeline.is_empty() {
self.emit(DbOp::ResultRow);
self.emit(DbOp::Pop);
return Ok(());
}
let next_label = self.new_label();

for stage_item in &ctx.items {
self.emit_goto(DbOp::Call, stage_item.next_label);
self.emit_u32(1);
}

// the final pipeline item to emit the final result
let final_result_label = self.new_label();
let final_pipeline_item = PipelineItem {
next_label: final_result_label,
complete_label: None,
};

ctx.items.push(final_pipeline_item);

self.emit_goto(DbOp::Goto, next_label);

for i in 0..pipeline.len() {
self.emit_aggregation_stage(pipeline, &ctx, i)?;
}

self.emit_label_with_name(final_result_label, "final_result_row_fun");
self.emit(DbOp::ResultRow);
self.emit_ret(0);

self.emit_label_with_name(next_label, "next_item_label");
Ok(())
}

// Generate the implementation code of the pipeline
// The implementation code is a function with parameters:
// Param 1(bool): is_the_last
// Return value: boolean value indicating going next stage or not
fn emit_aggregation_stage(
&mut self,
pipeline: &[Document],
ctx: &AggregationCodeGenContext,
index: usize,
) -> Result<()> {
let stage = &pipeline[index];
let stage_ctx_item = &ctx.items[index];
let stage_num = format!("{}", index);
if stage.is_empty() {
return Ok(());
}
if stage.len() > 1 {
return Err(Error::InvalidAggregationStage(Box::new(stage.clone())));
}

path_hint!(self, stage_num, {
let first_tuple = stage.iter().next().unwrap();
let (key, value) = first_tuple;

match key.as_str() {
"$count" => {
let count_name = match value {
Bson::String(s) => s,
_ => {
return Err(Error::InvalidAggregationStage(Box::new(stage.clone())));
}
};
let global_var = self.new_global_variable(Bson::Int64(0))?;

// $count_next =>
self.emit_label(stage_ctx_item.next_label);

self.emit_load_global(global_var);
self.emit(DbOp::Inc);
self.emit_store_global(global_var);
self.emit(DbOp::Pop);

self.emit_ret(0);

// $count_complete =>
self.emit_label(stage_ctx_item.complete_label.unwrap());
self.emit(DbOp::PushDocument);
self.emit_load_global(global_var);

let count_name_id = self.push_static(Bson::String(count_name.clone()));
self.emit(DbOp::SetField);
self.emit_u32(count_name_id);

self.emit(DbOp::Pop);

let next_fun = ctx.items[index + 1].next_label;
self.emit_goto(DbOp::Call, next_fun);
self.emit_u32(1);

self.emit_ret(0);
}
_ => {
return Err(Error::UnknownAggregationOperation(key.clone()));
}
};
});

Ok(())
}

pub fn emit_aggregation_before_query(&mut self, ctx: &mut AggregationCodeGenContext, pipeline: &[Document]) -> Result<()> {
for stage_doc in pipeline {
if stage_doc.is_empty() {
return Ok(());
}
let first_tuple = stage_doc.iter().next().unwrap();
let (key, _) = first_tuple;

let label = self.new_label();
let complete_label = match key.as_str() {
"$count" => {
let complete_label = self.new_label();
Some(complete_label)
}
_ => None,
};

ctx.items.push(PipelineItem {
next_label: label,
complete_label,
});
}
Ok(())
}

pub fn emit_aggregation_before_close(&mut self, ctx: &AggregationCodeGenContext) -> Result<()> {
for item in &ctx.items {
if let Some(complete_label) = item.complete_label {
self.emit_goto(DbOp::Call, complete_label);
self.emit_u32(0);
return Ok(());
}
}

Ok(())
}

pub(super) fn emit_delete_operation(&mut self) {
self.emit(DbOp::DeleteCurrent);
}
Expand Down Expand Up @@ -990,6 +1175,15 @@ impl Codegen {
self.emit_u32(id);
}

pub(crate) fn emit_ret(&mut self, return_size: u32) {
if return_size == 0 {
self.emit(DbOp::Ret0);
} else {
self.emit(DbOp::Ret);
self.emit_u32(return_size);
}
}

#[inline]
pub(super) fn emit(&mut self, op: DbOp) {
self.program.instructions.push(op as u8);
Expand Down