Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

Commit

Permalink
introduce SchemaProvider trait
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 6, 2018
1 parent cf3fda9 commit bf3c033
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
40 changes: 35 additions & 5 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,16 +730,46 @@ pub enum ExecutionResult {
Count(usize),
}

struct ExecutionContextSchemaProvider {
tables: Rc<RefCell<HashMap<String, Rc<DataFrame>>>>,
function_meta: Rc<RefCell<HashMap<String, Rc<FunctionMeta>>>>,
}

impl SchemaProvider for ExecutionContextSchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<Rc<Schema>> {
match self.tables.borrow().get(&name.to_string().to_lowercase()) {
Some(table) => Some(table.schema().clone()),
None => None
}
}

fn get_function_meta(&self, name: &str) -> Option<Rc<FunctionMeta>> {
match self.function_meta.borrow().get(&name.to_string().to_lowercase()) {
Some(meta) => Some(meta.clone()),
None => None
}
}
}


#[derive(Clone)]
pub struct ExecutionContext {
tables: Rc<RefCell<HashMap<String, Rc<DataFrame>>>>,
function_meta: Rc<RefCell<HashMap<String, FunctionMeta>>>,
function_meta: Rc<RefCell<HashMap<String, Rc<FunctionMeta>>>>,
functions: Rc<RefCell<HashMap<String, Rc<ScalarFunction>>>>,
aggregate_functions: Rc<RefCell<HashMap<String, Rc<AggregateFunction>>>>,
config: Rc<DFConfig>,
}

impl ExecutionContext {

fn create_schema_provider(&self) -> Rc<SchemaProvider> {
Rc::new(ExecutionContextSchemaProvider {
tables: self.tables.clone(),
function_meta: self.function_meta.clone()
})
}

pub fn local() -> Self {
ExecutionContext {
tables: Rc::new(RefCell::new(HashMap::new())),
Expand Down Expand Up @@ -770,7 +800,7 @@ impl ExecutionContext {

self.function_meta
.borrow_mut()
.insert(func.name().to_lowercase(), fm);
.insert(func.name().to_lowercase(), Rc::new(fm));

self.functions
.borrow_mut()
Expand All @@ -787,7 +817,7 @@ impl ExecutionContext {

self.function_meta
.borrow_mut()
.insert(func.name().to_lowercase(), fm);
.insert(func.name().to_lowercase(), Rc::new(fm));

self.aggregate_functions
.borrow_mut()
Expand All @@ -799,7 +829,7 @@ impl ExecutionContext {
let ast = Parser::parse_sql(String::from(sql))?;

// create a query planner
let query_planner = SqlToRel::new(self.tables.clone(), self.function_meta.clone());
let query_planner = SqlToRel::new(self.create_schema_provider());

// plan the query (create a logical relational plan)
Ok(query_planner.sql_to_rel(&ast)?)
Expand Down Expand Up @@ -850,7 +880,7 @@ impl ExecutionContext {
}
_ => {
// create a query planner
let query_planner = SqlToRel::new(self.tables.clone(), self.function_meta.clone());
let query_planner = SqlToRel::new(self.create_schema_provider());

// plan the query (create a logical relational plan)
let plan = query_planner.sql_to_rel(&ast)?;
Expand Down
30 changes: 12 additions & 18 deletions src/sqlplanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,30 @@

//! SQL Query Planner (produces logical plan from SQL AST)

use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::rc::Rc;
use std::string::String;

use super::dataframe::*;
use super::logical::*;
use super::sqlast::*;
use super::types::*;

use arrow::datatypes::*;

pub trait SchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<Rc<Schema>>;
fn get_function_meta(&self, name: &str) -> Option<Rc<FunctionMeta>>;
}

/// SQL query planner
pub struct SqlToRel {
//default_schema: Option<String>,
tables: Rc<RefCell<HashMap<String, Rc<DataFrame>>>>,
function_meta: Rc<RefCell<HashMap<String, FunctionMeta>>>,
schema_provider: Rc<SchemaProvider>
}

impl SqlToRel {
/// Create a new query planner
pub fn new(
tables: Rc<RefCell<HashMap<String, Rc<DataFrame>>>>,
function_meta: Rc<RefCell<HashMap<String, FunctionMeta>>>,
) -> Self {
SqlToRel {
/*default_schema: None,*/ tables,
function_meta,
}
pub fn new(schema_provider: Rc<SchemaProvider>) -> Self {
SqlToRel { schema_provider }
}

/// Generate a logic plan from a SQL AST node
Expand Down Expand Up @@ -179,11 +173,11 @@ impl SqlToRel {
}
}

&ASTNode::SQLIdentifier(ref id) => match self.tables.borrow().get(id) {
Some(table) => Ok(Rc::new(LogicalPlan::TableScan {
&ASTNode::SQLIdentifier(ref id) => match self.schema_provider.get_table_meta(id.as_ref()) {
Some(schema) => Ok(Rc::new(LogicalPlan::TableScan {
schema_name: String::from("default"),
table_name: id.clone(),
schema: table.schema().clone(),
schema: schema.clone(),
projection: None,
})),
None => Err(format!("no schema found for table {}", id)),
Expand Down Expand Up @@ -298,7 +292,7 @@ impl SqlToRel {
return_type: DataType::UInt64,
})
}
_ => match self.function_meta.borrow().get(&id.to_lowercase()) {
_ => match self.schema_provider.get_function_meta(id) {
Some(fm) => {
let rex_args = args.iter()
.map(|a| self.sql_to_rex(a, schema))
Expand Down

0 comments on commit bf3c033

Please sign in to comment.