Skip to content

Commit

Permalink
Store: implementions for ITable read_plan and read
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jun 2, 2021
1 parent e4e4331 commit a5c42b2
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 24 deletions.
22 changes: 10 additions & 12 deletions fusestore/store/src/api/rpc/flight_service.rs
Expand Up @@ -29,9 +29,6 @@ use common_flights::StoreDoActionResult;
use common_flights::StoreDoGet;
use futures::Stream;
use futures::StreamExt;
#[allow(unused_imports)]
use log::error;
#[allow(unused_imports)]
use log::info;
use prost::Message;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -155,7 +152,13 @@ impl FlightService for StoreFlightImpl {
// Action.
let action: StoreDoGet = request.try_into()?;
match action {
StoreDoGet::Read(_) => Err(Status::internal("Store read unimplemented")),
StoreDoGet::Read(act) => {
let stream =
self.action_handler.read_partition(act).await.map_err(|e| {
Status::internal(format!("read failure: {}", e.to_string()))
})?;
Ok(Response::new(Box::pin(stream)))
}
StoreDoGet::Pull(pull) => {
let key = pull.key;

Expand All @@ -178,29 +181,23 @@ impl FlightService for StoreFlightImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
info!("calling me!");

let _claim = self.check_token(&request.metadata())?;
let meta = request.metadata();

info!("reading meta data");
let (db_name, tbl_name) =
common_flights::get_do_put_meta(meta).map_err(|e| Status::internal(e.to_string()))?;
info!("meta data {}-{}", db_name, tbl_name);

info!("calling handler");
let append_res = self
.action_handler
.do_put(db_name, tbl_name, request.into_inner())
.await
.map_err(|e| Status::internal(e.to_string()))?;
info!("handler called");

let bytes = serde_json::to_vec(&append_res).unwrap();
let bytes = serde_json::to_vec(&append_res).map_err(|e| Status::internal(e.to_string()))?;
let put_res = PutResult {
app_metadata: bytes,
};
info!("got result {:?}", append_res);

Ok(Response::new(Box::pin(futures::stream::once(async {
Ok(put_res)
}))))
Expand Down Expand Up @@ -237,6 +234,7 @@ impl FlightService for StoreFlightImpl {
unimplemented!()
}
}

impl StoreFlightImpl {
fn once_stream_resp(
&self,
Expand Down
88 changes: 88 additions & 0 deletions fusestore/store/src/api/rpc/flight_service_test.rs
Expand Up @@ -4,8 +4,10 @@

use common_arrow::arrow::array::ArrayRef;
use common_datablocks::DataBlock;
use common_datavalues::DataColumnarValue;
use common_flights::GetTableActionResult;
use common_flights::StoreClient;
use common_planners::ScanPlan;
use log::info;
use pretty_assertions::assert_eq;
use test_env_log::test;
Expand Down Expand Up @@ -245,3 +247,89 @@ async fn test_do_append() -> anyhow::Result<()> {
});
Ok(())
}
#[test(tokio::test)]
async fn test_scan_partition() -> anyhow::Result<()> {
use std::sync::Arc;

use common_arrow::arrow::datatypes::DataType;
use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::Int64Array;
use common_datavalues::StringArray;
use common_flights::StoreClient;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DatabaseEngineType;
use common_planners::TableEngineType;

let addr = crate::tests::start_store_server().await?;

let schema = Arc::new(DataSchema::new(vec![
DataField::new("col_i", DataType::Int64, false),
DataField::new("col_s", DataType::Utf8, false),
]));
let db_name = "test_db";
let tbl_name = "test_tbl";

let col0: ArrayRef = Arc::new(Int64Array::from(vec![0, 1, 2]));
let col1: ArrayRef = Arc::new(StringArray::from(vec!["str1", "str2", "str3"]));

let expected_rows = col0.data().len() * 2;
let expected_cols = 2;

let block = DataBlock::create(schema.clone(), vec![
DataColumnarValue::Array(col0),
DataColumnarValue::Array(col1),
]);
let batches = vec![block.clone(), block];
let num_batch = batches.len();
let stream = futures::stream::iter(batches);

let mut client = StoreClient::try_create(addr.as_str(), "root", "xxx").await?;
{
let plan = CreateDatabasePlan {
if_not_exists: false,
db: db_name.to_string(),
engine: DatabaseEngineType::Local,
options: Default::default(),
};
client.create_database(plan.clone()).await?;
let plan = CreateTablePlan {
if_not_exists: false,
db: db_name.to_string(),
table: tbl_name.to_string(),
schema: schema.clone(),
options: maplit::hashmap! {"opt‐1".into() => "val-1".into()},
engine: TableEngineType::Parquet,
};
client.create_table(plan.clone()).await?;
}
let res = client
.append_data(
db_name.to_string(),
tbl_name.to_string(),
schema,
Box::pin(stream),
)
.await?;
log::info!("append res is {:?}", res);
let summary = res.summary;
assert_eq!(summary.rows, expected_rows);
assert_eq!(res.parts.len(), num_batch);
res.parts.iter().for_each(|p| {
assert_eq!(p.rows, expected_rows / num_batch);
assert_eq!(p.cols, expected_cols);
});

let plan = ScanPlan {
schema_name: tbl_name.to_string(),
..ScanPlan::empty()
};
let res = client
.scan_partition(db_name.to_string(), tbl_name.to_string(), &plan)
.await;
// TODO d assertions, de-duplicated codes
println!("scan res is {:?}", res);

Ok(())
}
62 changes: 62 additions & 0 deletions fusestore/store/src/engine/mem_engine.rs
Expand Up @@ -5,6 +5,10 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use common_flights::AppendResult;
use common_flights::DataPartInfo;
use common_planners::Partition;
use common_planners::Statistics;
use tonic::Status;

use crate::protobuf::CmdCreateDatabase;
Expand All @@ -15,6 +19,7 @@ use crate::protobuf::Table;
// MemEngine is a prototype storage that is primarily used for testing purposes.
pub struct MemEngine {
pub dbs: HashMap<String, Db>,
pub tbl_parts: HashMap<String, HashMap<String, Vec<DataPartInfo>>>,
pub next_id: i64,
pub next_ver: i64,
}
Expand All @@ -24,6 +29,7 @@ impl MemEngine {
pub fn create() -> Arc<Mutex<MemEngine>> {
let e = MemEngine {
dbs: HashMap::new(),
tbl_parts: HashMap::new(),
next_id: 0,
next_ver: 0,
};
Expand Down Expand Up @@ -59,6 +65,7 @@ impl MemEngine {
}

pub fn drop_database(&mut self, db_name: &str, if_exists: bool) -> Result<(), Status> {
self.remove_db_data_parts(db_name);
let entry = self.dbs.remove_entry(db_name);
match (entry, if_exists) {
(_, true) => Ok(()),
Expand Down Expand Up @@ -122,6 +129,7 @@ impl MemEngine {
tbl_name: &str,
if_exists: bool,
) -> Result<(), Status> {
self.remove_table_data_parts(db_name, tbl_name);
let r = self.dbs.get_mut(db_name).map(|db| {
let name2id_removed = db.table_name_to_id.remove_entry(tbl_name);
let id_removed = name2id_removed
Expand Down Expand Up @@ -158,6 +166,60 @@ impl MemEngine {
Ok(table.clone())
}

pub fn get_data_parts(&self, db_name: &str, table_name: &str) -> Option<Vec<DataPartInfo>> {
let parts = self.tbl_parts.get(db_name);
parts.and_then(|m| m.get(table_name)).map(Clone::clone)
}

pub fn append_data_parts(
&mut self,
db_name: &str,
table_name: &str,
append_res: &AppendResult,
) {
let part_info = || {
append_res
.parts
.iter()
.map(|p| {
let loc = &p.location;
DataPartInfo {
partition: Partition {
name: loc.clone(),
version: 0,
},
stats: Statistics {
read_bytes: p.disk_bytes,
read_rows: p.rows,
},
}
})
.collect::<Vec<_>>()
};
self.tbl_parts
.entry(db_name.to_string())
.and_modify(move |e| {
e.entry(table_name.to_string())
.and_modify(|v| v.append(&mut part_info()))
.or_insert_with(part_info);
})
.or_insert_with(|| {
[(table_name.to_string(), part_info())]
.iter()
.cloned()
.collect()
});
}

pub fn remove_table_data_parts(&mut self, db_name: &str, table_name: &str) {
self.tbl_parts
.remove(db_name)
.and_then(|mut t| t.remove(table_name));
}

pub fn remove_db_data_parts(&mut self, db_name: &str) {
self.tbl_parts.remove(db_name);
}
pub fn create_id(&mut self) -> i64 {
let id = self.next_id;
self.next_id += 1;
Expand Down

0 comments on commit a5c42b2

Please sign in to comment.