Skip to content

Commit

Permalink
Query: implements RemoteTalbe's read_plan & read
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jun 2, 2021
1 parent f577129 commit b55eacf
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 31 deletions.
7 changes: 5 additions & 2 deletions common/flights/src/lib.rs
Expand Up @@ -12,6 +12,7 @@ pub use store_do_action::CreateDatabaseAction;
pub use store_do_action::CreateDatabaseActionResult;
pub use store_do_action::CreateTableAction;
pub use store_do_action::CreateTableActionResult;
pub use store_do_action::DataPartInfo;
pub use store_do_action::DropDatabaseAction;
pub use store_do_action::DropDatabaseActionResult;
pub use store_do_action::DropTableAction;
Expand All @@ -20,10 +21,12 @@ pub use store_do_action::GetTableAction;
pub use store_do_action::GetTableActionResult;
pub use store_do_action::ReadPlanAction;
pub use store_do_action::ReadPlanActionResult;
pub use store_do_action::ScanPartitionAction;
pub use store_do_action::ScanPartitionResult;
pub use store_do_action::StoreDoAction;
pub use store_do_action::StoreDoActionResult;
pub use store_do_get::ReadAction;
pub use store_do_get::StoreDoGet;
// TODO refine these
pub use store_do_put::get_do_put_meta;
pub use store_do_put::set_do_put_meta;
pub use store_do_put::AppendResult;
Expand All @@ -34,7 +37,7 @@ mod flight_token;
mod store_client;
mod store_do_action;
mod store_do_get;
pub mod store_do_put;
mod store_do_put;

// ProtoBuf generated files.
#[allow(clippy::all)]
Expand Down
70 changes: 61 additions & 9 deletions common/flights/src/store_client.rs
Expand Up @@ -12,14 +12,19 @@ use common_arrow::arrow::record_batch::RecordBatch;
use common_arrow::arrow_flight::flight_service_client::FlightServiceClient;
use common_arrow::arrow_flight::utils::flight_data_from_arrow_batch;
use common_arrow::arrow_flight::utils::flight_data_from_arrow_schema;
use common_arrow::arrow_flight::utils::flight_data_to_arrow_batch;
use common_arrow::arrow_flight::Action;
use common_arrow::arrow_flight::BasicAuth;
use common_arrow::arrow_flight::HandshakeRequest;
use common_arrow::arrow_flight::Ticket;
use common_datablocks::DataBlock;
use common_exception::ErrorCodes;
use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
use common_planners::DropTablePlan;
use common_planners::ScanPlan;
use common_streams::SendableDataBlockStream;
use futures::stream;
use futures::SinkExt;
use futures::StreamExt;
Expand All @@ -37,6 +42,7 @@ use crate::store_do_action::DropDatabaseAction;
use crate::store_do_action::DropDatabaseActionResult;
use crate::store_do_action::StoreDoAction;
use crate::store_do_action::StoreDoActionResult;
use crate::store_do_get::ReadAction;
use crate::store_do_put;
use crate::store_do_put::AppendResult;
use crate::ConnectionFactory;
Expand All @@ -46,6 +52,9 @@ use crate::DropTableAction;
use crate::DropTableActionResult;
use crate::GetTableAction;
use crate::GetTableActionResult;
use crate::ScanPartitionAction;
use crate::ScanPartitionResult;
use crate::StoreDoGet;

pub type BlockStream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = DataBlock> + Sync + Send + 'static>>;
Expand Down Expand Up @@ -159,6 +168,43 @@ impl StoreClient {
anyhow::bail!("invalid response")
}

pub async fn scan_partition(
&mut self,
db_name: String,
tbl_name: String,
scan_plan: &ScanPlan,
) -> anyhow::Result<ScanPartitionResult> {
let mut plan = scan_plan.clone();
plan.schema_name = format!("{}/{}", db_name, tbl_name);
let action = StoreDoAction::ScanPartition(ScanPartitionAction { scan_plan: plan });
let rst = self.do_action(&action).await?;

if let StoreDoActionResult::ScanPartition(rst) = rst {
return Ok(rst);
}
anyhow::bail!("invalid response")
}

/// Get partition.
pub async fn read_partition(
&mut self,
schema: SchemaRef,
read_action: &ReadAction,
) -> anyhow::Result<SendableDataBlockStream> {
let cmd = StoreDoGet::Read(read_action.clone());
let mut req = tonic::Request::<Ticket>::from(&cmd);
req.set_timeout(self.timeout);
let res = self.client.do_get(req).await?.into_inner();
let res_stream = res.map(move |item| {
item.map_err(|status| ErrorCodes::TokioError(status.to_string()))
.and_then(|item| {
flight_data_to_arrow_batch(&item, schema.clone(), &[]).map_err(ErrorCodes::from)
})
.and_then(DataBlock::try_from)
});
Ok(Box::pin(res_stream))
}

/// Handshake.
async fn handshake(
client: &mut FlightServiceClient<Channel>,
Expand Down Expand Up @@ -233,17 +279,23 @@ impl StoreClient {
tokio::spawn(async move {
while let Some(block) = block_stream.next().await {
info!("next data block");
if let Ok(batch) = RecordBatch::try_from(block) {
if let Err(_e) = tx
.send(flight_data_from_arrow_batch(&batch, &ipc_write_opt).1)
.await
{
log::info!("failed to send flight-data to downstream, breaking out");
match RecordBatch::try_from(block) {
Ok(batch) => {
if let Err(_e) = tx
.send(flight_data_from_arrow_batch(&batch, &ipc_write_opt).1)
.await
{
log::info!("failed to send flight-data to downstream, breaking out");
break;
}
}
Err(e) => {
log::info!(
"failed to convert DataBlock to RecordBatch , breaking out, {:?}",
e
);
break;
}
} else {
log::info!("failed to convert DataBlock to RecordBatch , breaking out");
break;
}
}
});
Expand Down
17 changes: 17 additions & 0 deletions common/flights/src/store_do_action.rs
Expand Up @@ -13,7 +13,9 @@ use common_planners::CreateDatabasePlan;
use common_planners::CreateTablePlan;
use common_planners::DropDatabasePlan;
use common_planners::DropTablePlan;
use common_planners::Partition;
use common_planners::ScanPlan;
use common_planners::Statistics;
use prost::Message;
use tonic::Request;

Expand Down Expand Up @@ -71,6 +73,19 @@ pub struct GetTableActionResult {
pub schema: DataSchemaRef,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct ScanPartitionAction {
pub scan_plan: ScanPlan,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct DataPartInfo {
pub partition: Partition,
pub stats: Statistics,
}

pub type ScanPartitionResult = Option<Vec<DataPartInfo>>;

// Action wrapper for do_action.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub enum StoreDoAction {
Expand All @@ -79,6 +94,7 @@ pub enum StoreDoAction {
DropDatabase(DropDatabaseAction),
CreateTable(CreateTableAction),
DropTable(DropTableAction),
ScanPartition(ScanPartitionAction),
GetTable(GetTableAction),
}

Expand All @@ -89,6 +105,7 @@ pub enum StoreDoActionResult {
DropDatabase(DropDatabaseActionResult),
CreateTable(CreateTableActionResult),
DropTable(DropTableActionResult),
ScanPartition(ScanPartitionResult),
GetTable(GetTableActionResult),
}

Expand Down
10 changes: 8 additions & 2 deletions common/flights/src/store_do_get.rs
Expand Up @@ -6,15 +6,21 @@
use std::convert::TryInto;

use common_arrow::arrow_flight::Ticket;
use common_planners::Partitions;
use common_planners::Partition;
use common_planners::PlanNode;
use common_planners::ScanPlan;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct ReadAction {
pub partition: Partitions,
pub partition: Partition,
pub push_down: PlanNode,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct ScanPartitionsAction {
pub can_plan: ScanPlan,
}

/// Pull a file. This is used to replicate data between store servers, which is only used internally.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct PullAction {
Expand Down
2 changes: 1 addition & 1 deletion common/planners/src/plan_partition.rs
Expand Up @@ -4,7 +4,7 @@

pub type Partitions = Vec<Partition>;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct Partition {
pub name: String,
pub version: u64,
Expand Down
2 changes: 1 addition & 1 deletion common/planners/src/plan_statistics.rs
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct Statistics {
/// Total rows of the query read.
pub read_rows: usize,
Expand Down
92 changes: 76 additions & 16 deletions fusequery/query/src/datasources/remote/remote_table.rs
Expand Up @@ -3,30 +3,34 @@
// SPDX-License-Identifier: Apache-2.0.

use std::any::Any;
use std::sync::mpsc::channel;
use std::sync::Arc;

use common_datavalues::DataSchemaRef;
use common_exception::ErrorCodes;
use common_exception::Result;
use common_flights::ScanPartitionResult;
use common_planners::InsertIntoPlan;
use common_planners::Partition;
use common_planners::ReadDataSourcePlan;
use common_planners::ScanPlan;
use common_planners::Statistics;
use common_planners::TableOptions;
use common_streams::SendableDataBlockStream;

use crate::datasources::remote::store_client_provider::StoreClientProvider;
use crate::datasources::remote::StoreClientProvider;
use crate::datasources::ITable;
use crate::sessions::FuseQueryContextRef;

#[allow(dead_code)]
pub struct RemoteTable {
pub(crate) db: String,
name: String,
schema: DataSchemaRef,
store_client_provider: StoreClientProvider,
pub(crate) name: String,
pub(crate) schema: DataSchemaRef,
pub(crate) store_client_provider: StoreClientProvider,
}

impl RemoteTable {
#[allow(dead_code)]
pub fn try_create(
db: String,
name: String,
Expand Down Expand Up @@ -68,23 +72,43 @@ impl ITable for RemoteTable {

fn read_plan(
&self,
_ctx: FuseQueryContextRef,
_scan: &ScanPlan,
ctx: FuseQueryContextRef,
scan: &ScanPlan,
_partitions: usize,
) -> Result<ReadDataSourcePlan> {
Result::Err(ErrorCodes::UnImplement(
"RemoteTable read_plan not yet implemented",
))
// Change this method to async at current stage might be harsh
let (tx, rx) = channel();
let cli_provider = self.store_client_provider.clone();
let db_name = self.db.clone();
let tbl_name = self.name.clone();
{
let scan = scan.clone();
ctx.execute_task(async move {
match cli_provider.try_get_client().await {
Ok(mut client) => {
let parts_info = client
.scan_partition(db_name, tbl_name, &scan)
.await
.map_err(ErrorCodes::from);
let _ = tx.send(parts_info);
}
Err(e) => {
let _ = tx.send(Err(e));
}
}
});
}

rx.recv()
.map_err(ErrorCodes::from_std_error)?
.map(|v| self.partitions_to_plan(v, scan.clone()))
}

async fn read(&self, _ctx: FuseQueryContextRef) -> Result<SendableDataBlockStream> {
Result::Err(ErrorCodes::UnImplement(
"RemoteTable read not yet implemented",
))
async fn read(&self, ctx: FuseQueryContextRef) -> Result<SendableDataBlockStream> {
self.do_read(ctx).await
}

async fn append_data(&self, _ctx: FuseQueryContextRef, plan: InsertIntoPlan) -> Result<()> {
// goes like this
let opt_stream = {
let mut inner = plan.input_stream.lock().unwrap();
(*inner).take()
Expand All @@ -94,7 +118,7 @@ impl ITable for RemoteTable {
let block_stream =
opt_stream.ok_or_else(|| ErrorCodes::EmptyData("input stream consumed"))?;
let mut client = self.store_client_provider.try_get_client().await?;
(client)
client
.append_data(
plan.db_name.clone(),
plan.tbl_name.clone(),
Expand All @@ -107,3 +131,39 @@ impl ITable for RemoteTable {
Ok(())
}
}

impl RemoteTable {
fn partitions_to_plan(
&self,
res: ScanPartitionResult,
scan_plan: ScanPlan,
) -> ReadDataSourcePlan {
let mut partitions = vec![];
let mut statistics = Statistics {
read_rows: 0,
read_bytes: 0,
};

if let Some(parts) = res {
for part in parts {
partitions.push(Partition {
name: part.partition.name,
version: 0,
});
statistics.read_rows += part.stats.read_rows;
statistics.read_bytes += part.stats.read_bytes;
}
}

ReadDataSourcePlan {
db: self.db.clone(),
table: self.name.clone(),
schema: self.schema.clone(),
partitions,
statistics,
description: "".to_string(),
scan_plan: Arc::new(scan_plan),
remote: true,
}
}
}

0 comments on commit b55eacf

Please sign in to comment.