Skip to content

Commit

Permalink
Merge insert logics in row.rs & execute.rs into executor/insert.rs (g…
Browse files Browse the repository at this point in the history
…luesql#1031)

Insert execution code logics are splitted in data/row.rs and executor/execute.rs
By adding executor/insert.rs, it merges the codes into a single insert module.
  • Loading branch information
panarch committed Dec 1, 2022
1 parent 8d7af7a commit 34c628c
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 222 deletions.
114 changes: 3 additions & 111 deletions core/src/data/row.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
use {
crate::{
ast::{ColumnDef, Expr},
data::Value,
executor::evaluate_stateless,
result::Result,
},
crate::{data::Value, result::Result},
serde::Serialize,
std::{fmt::Debug, rc::Rc},
thiserror::Error,
};

#[derive(Error, Serialize, Debug, PartialEq)]
pub enum RowError {
#[error("lack of required column: {0}")]
LackOfRequiredColumn(String),

#[error("wrong column name: {0}")]
WrongColumnName(String),

#[error("column and values not matched")]
ColumnAndValuesNotMatched,

#[error("literals have more values than target columns")]
TooManyValues,

#[error("conflict! row cannot be empty")]
ConflictOnEmptyRow,

#[error("VALUES lists must all be the same length")]
NumberOfValuesDifferent,
}

#[derive(iter_enum::Iterator)]
enum Columns<I1, I2> {
All(I1),
Specified(I2),
#[error("conflict! row cannot be empty")]
ConflictOnEmptyRow,
}

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -62,91 +39,6 @@ impl Row {
.ok_or_else(|| RowError::ConflictOnEmptyRow.into())
}

pub fn new(
column_defs: &[ColumnDef],
labels: Rc<[String]>,
columns: &[String],
values: &[Expr],
) -> Result<Self> {
if !columns.is_empty() && values.len() != columns.len() {
return Err(RowError::ColumnAndValuesNotMatched.into());
} else if values.len() > column_defs.len() {
return Err(RowError::TooManyValues.into());
}

if let Some(wrong_column_name) = columns.iter().find(|column_name| {
!column_defs
.iter()
.any(|column_def| &&column_def.name == column_name)
}) {
return Err(RowError::WrongColumnName(wrong_column_name.to_owned()).into());
}

let columns = if columns.is_empty() {
Columns::All(column_defs.iter().map(|ColumnDef { name, .. }| name))
} else {
Columns::Specified(columns.iter())
};

let column_name_value_list = columns.zip(values.iter()).collect::<Vec<(_, _)>>();

let values = column_defs
.iter()
.map(|column_def| {
let ColumnDef {
name: def_name,
data_type,
nullable,
..
} = column_def;

let value = column_name_value_list
.iter()
.find(|(name, _)| name == &def_name)
.map(|(_, value)| value);

match (value, column_def.get_default(), nullable) {
(Some(&expr), _, _) | (None, Some(expr), _) => {
evaluate_stateless(None, expr)?.try_into_value(data_type, *nullable)
}
(None, None, true) => Ok(Value::Null),
(None, None, false) => {
Err(RowError::LackOfRequiredColumn(def_name.to_owned()).into())
}
}
})
.collect::<Result<Vec<Value>>>()?;

Ok(Row {
columns: labels,
values,
})
}

pub fn validate(&self, column_defs: &[ColumnDef]) -> Result<()> {
let items = column_defs
.iter()
.enumerate()
.filter_map(|(index, column_def)| {
let value = self.get_value_by_index(index);

value.map(|v| (v, column_def))
});

for (value, column_def) in items {
let ColumnDef {
data_type,
nullable,
..
} = column_def;

value.validate_type(data_type)?;
value.validate_null(*nullable)?;
}

Ok(())
}

pub fn len(&self) -> usize {
self.values.len()
}
Expand Down
112 changes: 9 additions & 103 deletions core/src/executor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use {
super::{
alter::{create_table, drop_table},
fetch::{fetch, fetch_columns},
insert::insert,
select::{select, select_with_labels},
update::Update,
validate::{validate_unique, ColumnValidation},
},
crate::{
ast::{
ColumnDef, ColumnOption, DataType, Dictionary, Expr, Query, SelectItem, SetExpr,
Statement, TableAlias, TableFactor, TableWithJoins, Values, Variable,
DataType, Dictionary, Expr, Query, SelectItem, SetExpr, Statement, TableAlias,
TableFactor, TableWithJoins, Variable,
},
data::{Key, Row, Schema, Value},
executor::limit::Limit,
result::{MutResult, Result},
data::{Key, Schema, Value},
result::MutResult,
store::{GStore, GStoreMut},
},
futures::stream::{self, TryStreamExt},
futures::stream::TryStreamExt,
serde::{Deserialize, Serialize},
std::{env::var, fmt::Debug, rc::Rc},
thiserror::Error as ThisError,
Expand Down Expand Up @@ -173,103 +173,9 @@ pub async fn execute<T: GStore + GStoreMut>(
table_name,
columns,
source,
..
} => {
enum RowsData {
Append(Vec<Vec<Value>>),
Insert(Vec<(Key, Vec<Value>)>),
}

let (rows, num_rows, table_name) = try_block!(storage, {
let Schema { column_defs, .. } = storage
.fetch_schema(table_name)
.await?
.ok_or_else(|| ExecuteError::TableNotFound(table_name.to_owned()))?;
let labels = Rc::from(
column_defs
.iter()
.map(|column_def| column_def.name.to_owned())
.collect::<Vec<_>>(),
);
let column_defs = Rc::from(column_defs);
let column_validation = ColumnValidation::All(Rc::clone(&column_defs));

#[derive(futures_enum::Stream)]
enum Rows<I1, I2> {
Values(I1),
Select(I2),
}

let rows = match &source.body {
SetExpr::Values(Values(values_list)) => {
let limit = Limit::new(source.limit.as_ref(), source.offset.as_ref())?;
let rows = values_list.iter().map(|values| {
Row::new(&column_defs, Rc::clone(&labels), columns, values)
});
let rows = stream::iter(rows);
let rows = limit.apply(rows);
let rows = rows.map_ok(Into::into);

Rows::Values(rows)
}
SetExpr::Select(_) => {
let rows = select(&storage, source, None).await?.and_then(|row| {
let column_defs = Rc::clone(&column_defs);

async move {
row.validate(&column_defs)?;
Ok(row.into())
}
});

Rows::Select(rows)
}
}
.try_collect::<Vec<Vec<Value>>>()
.await?;

validate_unique(
&storage,
table_name,
column_validation,
rows.iter().map(|values| values.as_slice()),
)
.await?;

let num_rows = rows.len();
let primary_key = column_defs
.iter()
.enumerate()
.find(|(_, ColumnDef { options, .. })| {
options
.iter()
.any(|option| option == &ColumnOption::Unique { is_primary: true })
})
.map(|(i, _)| i);

let rows = match primary_key {
Some(i) => rows
.into_iter()
.filter_map(|values| {
values
.get(i)
.map(Key::try_from)
.map(|result| result.map(|key| (key, values)))
})
.collect::<Result<Vec<_>>>()
.map(RowsData::Insert)?,
None => RowsData::Append(rows),
};

Ok((rows, num_rows, table_name))
});

match rows {
RowsData::Append(rows) => storage.append_data(table_name, rows).await,
RowsData::Insert(rows) => storage.insert_data(table_name, rows).await,
}
.map(|(storage, _)| (storage, Payload::Insert(num_rows)))
}
} => insert(storage, table_name, columns, source)
.await
.map(|(storage, num_rows)| (storage, Payload::Insert(num_rows))),
Statement::Update {
table_name,
selection,
Expand Down

0 comments on commit 34c628c

Please sign in to comment.