Skip to content
This repository has been archived by the owner on Sep 28, 2021. It is now read-only.

Commit

Permalink
Implicit and Explicit transaction scope (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-dukhno committed May 5, 2021
1 parent d7f10ea commit daf8354
Show file tree
Hide file tree
Showing 34 changed files with 530 additions and 498 deletions.
31 changes: 28 additions & 3 deletions node_engine/src/lib.rs
Expand Up @@ -12,11 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use data_manipulation::QueryPlan;
pub use node_engine_old::NodeEngineOld;
use postgre_sql::query_ast::Query;
use std::collections::HashMap;
use types::SqlTypeFamily;

mod node_engine_old;
mod query_engine;
mod query_engine_old;
mod session;
mod session_old;
mod transaction_manager;
mod txn_context;
mod worker;

pub use node_engine_old::NodeEngineOld;
#[derive(Default)]
#[allow(dead_code)]
pub struct QueryPlanCache {
plans: HashMap<String, (Query, Vec<SqlTypeFamily>)>,
}

#[allow(dead_code)]
impl QueryPlanCache {
pub fn allocate(&mut self, name: String, _query_plan: QueryPlan, query_ast: Query, params: Vec<SqlTypeFamily>) {
self.plans.insert(name, (query_ast, params));
}

pub fn lookup(&self, name: &str) -> Option<&(Query, Vec<SqlTypeFamily>)> {
self.plans.get(name)
}

pub fn deallocate(&mut self, name: &str) -> Option<(Query, Vec<SqlTypeFamily>)> {
self.plans.remove(name)
}
}
231 changes: 99 additions & 132 deletions node_engine/src/query_engine_old/mod.rs

Large diffs are not rendered by default.

Expand Up @@ -12,24 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use data_manipulation::QueryPlan;
use postgre_sql::query_ast::Query;
use std::collections::HashMap;
use types::SqlTypeFamily;
use crate::txn_context::TransactionContext;
use storage::Database;

#[derive(Default)]
#[allow(dead_code)]
pub struct Session {
plans: HashMap<String, (Query, Vec<SqlTypeFamily>)>,
pub struct TransactionManager {
database: Database,
}

#[allow(dead_code)]
impl Session {
pub fn cache(&mut self, name: String, _query_plan: QueryPlan, query_ast: Query, params: Vec<SqlTypeFamily>) {
self.plans.insert(name, (query_ast, params));
impl TransactionManager {
pub fn new(database: Database) -> TransactionManager {
TransactionManager { database }
}

pub fn find(&self, name: &str) -> Option<&(Query, Vec<SqlTypeFamily>)> {
self.plans.get(name)
pub fn start_transaction(&self) -> TransactionContext {
TransactionContext::new(self.database.transaction())
}
}
Expand Up @@ -19,18 +19,18 @@ use data_manipulation::{
};
use definition_planner::DefinitionPlanner;
use postgre_sql::{
query_ast::{Definition, Query, Statement},
query_parser::QueryParser,
query_ast::{Definition, Query},
query_response::{QueryError, QueryEvent},
};
use query_analyzer::QueryAnalyzer;
use query_planner::QueryPlanner;
use query_processing::{TypeChecker, TypeCoercion, TypeInference};
use storage::{Database, Transaction};
use std::fmt::{self, Debug, Formatter};
use storage::Transaction;
use types::SqlTypeFamily;

#[derive(Clone)]
pub struct TransactionContext<'t> {
parser: QueryParser,
definition_planner: DefinitionPlanner<'t>,
catalog: CatalogHandler<'t>,
query_analyzer: QueryAnalyzer<'t>,
Expand All @@ -40,10 +40,15 @@ pub struct TransactionContext<'t> {
query_planner: QueryPlanner<'t>,
}

impl<'t> Debug for TransactionContext<'t> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "txn")
}
}

impl<'t> TransactionContext<'t> {
fn new(transaction: Transaction<'t>) -> TransactionContext<'t> {
pub fn new(transaction: Transaction<'t>) -> TransactionContext<'t> {
TransactionContext {
parser: QueryParser,
definition_planner: DefinitionPlanner::from(transaction.clone()),
catalog: CatalogHandler::from(transaction.clone()),
query_analyzer: QueryAnalyzer::from(transaction.clone()),
Expand All @@ -56,11 +61,7 @@ impl<'t> TransactionContext<'t> {

pub fn commit(self) {}

pub fn parse(&self, sql: &str) -> Result<Vec<Statement>, QueryError> {
Ok(self.parser.parse(sql)?)
}

pub fn execute_ddl(&self, definition: Definition) -> Result<QueryEvent, QueryError> {
pub fn apply_schema_change(&self, definition: Definition) -> Result<QueryEvent, QueryError> {
let schema_change = self.definition_planner.plan(definition)?;
Ok(self.catalog.apply(schema_change)?.into())
}
Expand Down Expand Up @@ -167,20 +168,5 @@ impl<'t> TransactionContext<'t> {
}
}

#[derive(Clone)]
pub struct QueryEngine {
database: Database,
}

impl QueryEngine {
pub fn new(database: Database) -> QueryEngine {
QueryEngine { database }
}

pub fn start_transaction(&self) -> TransactionContext {
TransactionContext::new(self.database.transaction())
}
}

#[cfg(test)]
mod tests;
Expand Up @@ -15,7 +15,7 @@
use super::*;

#[rstest::rstest]
fn delete_all_records(with_schema: QueryEngine) {
fn delete_all_records(with_schema: TransactionManager) {
let txn = with_schema.start_transaction();

assert_definition(
Expand Down Expand Up @@ -56,7 +56,7 @@ fn delete_all_records(with_schema: QueryEngine) {
}

#[rstest::rstest]
fn delete_value_by_predicate_on_single_field(with_schema: QueryEngine) {
fn delete_value_by_predicate_on_single_field(with_schema: TransactionManager) {
let txn = with_schema.start_transaction();

assert_definition(
Expand Down

0 comments on commit daf8354

Please sign in to comment.