Skip to content

Commit

Permalink
Replace data::Row to Vec<Value> in Store traits
Browse files Browse the repository at this point in the history
  • Loading branch information
panarch committed Nov 27, 2022
1 parent 37ec20a commit bc68a53
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 63 deletions.
4 changes: 2 additions & 2 deletions core/src/data/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
executor::evaluate_stateless,
result::Result,
},
serde::{Deserialize, Serialize},
serde::Serialize,
std::{fmt::Debug, slice::Iter, vec::IntoIter},
thiserror::Error,
};
Expand Down Expand Up @@ -37,7 +37,7 @@ enum Columns<I1, I2> {
Specified(I2),
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq)]
pub struct Row(pub Vec<Value>);

impl Row {
Expand Down
12 changes: 9 additions & 3 deletions core/src/executor/alter/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ pub async fn create_table<T: GStore + GStoreMut>(

match source {
Some(q) => {
let (storage, rows) = async { select(&storage, q, None).await?.try_collect().await }
.await
.try_self(storage)?;
let (storage, rows) = async {
select(&storage, q, None)
.await?
.map_ok(Into::into)
.try_collect()
.await
}
.await
.try_self(storage)?;

storage.append_data(target_table_name, rows).await
}
Expand Down
13 changes: 9 additions & 4 deletions core/src/executor/evaluate/stateless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
super::{expr, function, EvaluateError, Evaluated},
crate::{
ast::{Expr, Function},
data::{Interval, Literal, Row, Value},
data::{Interval, Literal, Value},
result::Result,
},
chrono::prelude::Utc,
Expand All @@ -12,7 +12,7 @@ use {
type Columns<'a> = &'a [String];

pub fn evaluate_stateless<'a>(
context: Option<(Columns, &'a Row)>,
context: Option<(Columns, &'a [Value])>,
expr: &'a Expr,
) -> Result<Evaluated<'a>> {
let eval = |expr| evaluate_stateless(context, expr);
Expand All @@ -30,7 +30,12 @@ pub fn evaluate_stateless<'a>(
}
};

match row.get_value(columns, ident) {
let value = columns
.iter()
.position(|column| column == ident)
.and_then(|index| row.get(index));

match value {
Some(value) => Ok(value.clone()),
None => Err(EvaluateError::ValueNotFound(ident.to_owned()).into()),
}
Expand Down Expand Up @@ -150,7 +155,7 @@ pub fn evaluate_stateless<'a>(
}

fn evaluate_function<'a>(
context: Option<(Columns, &'a Row)>,
context: Option<(Columns, &'a [Value])>,
func: &'a Function,
) -> Result<Evaluated<'a>> {
use function as f;
Expand Down
29 changes: 18 additions & 11 deletions core/src/executor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ pub async fn execute<T: GStore + GStoreMut>(
..
} => {
enum RowsData {
Append(Vec<Row>),
Insert(Vec<(Key, Row)>),
Append(Vec<Vec<Value>>),
Insert(Vec<(Key, Vec<Value>)>),
}

let (rows, num_rows, table_name) = try_block!(storage, {
Expand All @@ -202,6 +202,7 @@ pub async fn execute<T: GStore + GStoreMut>(
.map(|values| Row::new(&column_defs, columns, values));
let rows = stream::iter(rows);
let rows = limit.apply(rows);
let rows = rows.map_ok(Into::into);

Rows::Values(rows)
}
Expand All @@ -211,17 +212,23 @@ pub async fn execute<T: GStore + GStoreMut>(

async move {
row.validate(&column_defs)?;
Ok(row)
Ok(row.into())
}
});

Rows::Select(rows)
}
}
.try_collect::<Vec<_>>()
.try_collect::<Vec<Vec<Value>>>()
.await?;

validate_unique(&storage, table_name, column_validation, rows.iter()).await?;
validate_unique(
&storage,
table_name,
column_validation,
rows.iter().map(|values| values.as_slice()),
)
.await?;

let num_rows = rows.len();
let primary_key = column_defs
Expand All @@ -237,11 +244,11 @@ pub async fn execute<T: GStore + GStoreMut>(
let rows = match primary_key {
Some(i) => rows
.into_iter()
.filter_map(|row| {
row.0
.filter_map(|values| {
values
.get(i)
.map(Key::try_from)
.map(|result| result.map(|key| (key, row)))
.map(|result| result.map(|key| (key, values)))
})
.collect::<Result<Vec<_>>>()
.map(RowsData::Insert)?,
Expand Down Expand Up @@ -279,10 +286,10 @@ pub async fn execute<T: GStore + GStoreMut>(

async move {
let row = update.apply(row).await?;
Ok((key, row))
Ok((key, row.into()))
}
})
.try_collect::<Vec<_>>()
.try_collect::<Vec<(Key, Vec<Value>)>>()
.await?;

let column_validation =
Expand All @@ -291,7 +298,7 @@ pub async fn execute<T: GStore + GStoreMut>(
&storage,
table_name,
column_validation,
rows.iter().map(|r| &r.1),
rows.iter().map(|(_, values)| values.as_slice()),
)
.await?;

Expand Down
7 changes: 4 additions & 3 deletions core/src/executor/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn fetch<'a>(
.await
.map(stream::iter)?
.try_filter_map(move |(key, row)| {
let row = Row::from(row);
let columns = Rc::clone(&columns);

async move {
Expand Down Expand Up @@ -119,7 +120,7 @@ pub async fn fetch_relation_rows<'a>(
let rows = storage
.scan_indexed_data(name, index_name, *asc, cmp_value)
.await?
.map_ok(|(_, row)| row);
.map_ok(|(_, row)| row.into());

Rows::Indexed(rows)
}
Expand All @@ -137,10 +138,10 @@ pub async fn fetch_relation_rows<'a>(
.map(|row| vec![row])
.unwrap_or_else(Vec::new);

Rows::PrimaryKey(rows.into_iter())
Rows::PrimaryKey(rows.into_iter().map_ok(Row::from))
}
_ => {
let rows = storage.scan_data(name).await?.map_ok(|(_, row)| row);
let rows = storage.scan_data(name).await?.map_ok(|(_, row)| row.into());

Rows::FullScan(rows)
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/executor/select/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ fn sort_stateless(
.iter()
.map(|OrderByExpr { expr, asc }| -> Result<_> {
let row = row.as_ref().ok();
let context = row.map(|row| (labels.as_slice(), row));
let context = row.map(|row| (labels.as_slice(), row.0.as_slice()));

let value: Value = evaluate_stateless(context, expr)?.try_into()?;

Ok((value, *asc))
Expand Down
12 changes: 6 additions & 6 deletions core/src/executor/validate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
ast::{ColumnDef, ColumnOption},
data::{Key, Row, Value},
data::{Key, Value},
result::Result,
store::Store,
},
Expand Down Expand Up @@ -82,7 +82,7 @@ pub async fn validate_unique(
storage: &dyn Store,
table_name: &str,
column_validation: ColumnValidation,
row_iter: impl Iterator<Item = &Row> + Clone,
row_iter: impl Iterator<Item = &[Value]> + Clone,
) -> Result<()> {
enum Columns {
/// key index
Expand Down Expand Up @@ -125,7 +125,7 @@ pub async fn validate_unique(
match columns {
Columns::PrimaryKeyOnly(primary_key_index) => {
for primary_key in
row_iter.filter_map(|row| row.0.get(primary_key_index).map(Key::try_from))
row_iter.filter_map(|row| row.get(primary_key_index).map(Key::try_from))
{
let key = primary_key?;

Expand All @@ -150,7 +150,7 @@ pub async fn validate_unique(
.try_for_each(|constraint| {
let col_idx = constraint.column_index;
let val = row
.get_value_by_index(col_idx)
.get(col_idx)
.ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;

constraint.check(val)?;
Expand All @@ -164,7 +164,7 @@ pub async fn validate_unique(

fn create_unique_constraints<'a>(
unique_columns: Vec<(usize, String)>,
row_iter: impl Iterator<Item = &'a Row> + Clone,
row_iter: impl Iterator<Item = &'a [Value]> + Clone,
) -> Result<Vector<UniqueConstraint>> {
unique_columns
.into_iter()
Expand All @@ -175,7 +175,7 @@ fn create_unique_constraints<'a>(
.clone()
.try_fold(new_constraint, |constraint, row| {
let val = row
.get_value_by_index(col_idx)
.get(col_idx)
.ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;

constraint.add(val)
Expand Down
4 changes: 2 additions & 2 deletions core/src/plan/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use crate::store::Transaction;
use crate::store::{Index, IndexMut};
use {
crate::{
data::{Key, Row, Schema},
data::{Key, Schema},
executor::execute,
parse_sql::parse,
result::{Error, MutResult, Result},
store::{RowIter, Store, StoreMut},
store::{Row, RowIter, Store, StoreMut},
translate::translate,
},
async_trait::async_trait,
Expand Down
3 changes: 2 additions & 1 deletion core/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ cfg_if! {

use {
crate::{
data::{Key, Row, Schema},
data::{Key, Schema, Value},
result::{MutResult, Result},
},
async_trait::async_trait,
};

pub type Row = Vec<Value>;
pub type RowIter = Box<dyn Iterator<Item = Result<(Key, Row)>>>;

/// By implementing `Store` trait, you can run `SELECT` query.
Expand Down
6 changes: 3 additions & 3 deletions storages/memory-storage/src/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl MemoryStorage {
.ok_or_else(|| AlterTableError::TableNotFound(table_name.to_owned()))?;

item.rows.iter_mut().for_each(|(_, row)| {
row.0.push(value.clone());
row.push(value.clone());
});
item.schema.column_defs.push(column_def.clone());

Expand Down Expand Up @@ -127,8 +127,8 @@ impl MemoryStorage {
item.schema.column_defs.remove(column_index);

item.rows.iter_mut().for_each(|(_, row)| {
if row.0.len() > column_index {
row.0.remove(column_index);
if row.len() > column_index {
row.remove(column_index);
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions storages/memory-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ mod transaction;
use {
async_trait::async_trait,
gluesql_core::{
data::{Key, Row, Schema},
data::{Key, Schema},
result::{MutResult, Result},
store::{RowIter, Store, StoreMut},
store::{Row, RowIter, Store, StoreMut},
},
indexmap::IndexMap,
serde::{Deserialize, Serialize},
Expand Down
4 changes: 2 additions & 2 deletions storages/shared-memory-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ mod transaction;
use {
async_trait::async_trait,
gluesql_core::{
data::{Key, Row, Schema},
data::{Key, Schema},
result::{MutResult, Result},
store::{RowIter, Store, StoreMut},
store::{Row, RowIter, Store, StoreMut},
},
memory_storage::MemoryStorage,
std::sync::Arc,
Expand Down
11 changes: 5 additions & 6 deletions storages/sled-storage/src/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use {
async_trait::async_trait,
gluesql_core::{
ast::ColumnDef,
data::{schema::Schema, Row, Value},
data::{schema::Schema, Value},
executor::evaluate_stateless,
result::{MutResult, Result, TrySelf},
store::{AlterTable, AlterTableError},
store::{AlterTable, AlterTableError, Row},
},
sled::transaction::ConflictableTransactionError,
std::{iter::once, str},
Expand Down Expand Up @@ -301,7 +301,7 @@ impl AlterTable for SledStorage {
continue;
}
};
let row = Row(row.0.into_iter().chain(once(value.clone())).collect());
let row = row.into_iter().chain(once(value.clone())).collect();

let (snapshot, _) = snapshot.update(txid, row);
let snapshot = bincode::serialize(&snapshot)
Expand Down Expand Up @@ -418,12 +418,11 @@ impl AlterTable for SledStorage {
continue;
}
};
let row = Row(row
.0
let row = row
.into_iter()
.enumerate()
.filter_map(|(i, v)| (i != column_index).then_some(v))
.collect());
.collect();

let (snapshot, _) = snapshot.update(txid, row);
let snapshot = bincode::serialize(&snapshot)
Expand Down
5 changes: 1 addition & 4 deletions storages/sled-storage/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use {
lock::{get_txdata_key, Lock, TxData},
SledStorage, Snapshot,
},
gluesql_core::{
data::{Row, Schema},
result::Result,
},
gluesql_core::{data::Schema, result::Result, store::Row},
std::time::{SystemTime, UNIX_EPOCH},
};

Expand Down
4 changes: 2 additions & 2 deletions storages/sled-storage/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use {
async_trait::async_trait,
gluesql_core::{
ast::IndexOperator,
data::{Key, Row},
data::Key,
prelude::Value,
result::{Error, Result},
store::{Index, IndexError, RowIter},
store::{Index, IndexError, Row, RowIter},
},
iter_enum::{DoubleEndedIterator, Iterator},
sled::IVec,
Expand Down

0 comments on commit bc68a53

Please sign in to comment.