Skip to content

Commit

Permalink
Merge pull request #7535 from zhyass/feature_fix
Browse files Browse the repository at this point in the history
feat(storage): add clustering_history system table
  • Loading branch information
mergify[bot] committed Sep 9, 2022
2 parents dbaf2e5 + ea3f0a8 commit fa49f1c
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 45 deletions.
4 changes: 4 additions & 0 deletions src/query/service/src/databases/system/system_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl SystemDatabase {
sys_db_meta.next_table_id(),
config.query.max_query_log_size as i32,
)),
Arc::new(system::ClusteringHistoryTable::create(
sys_db_meta.next_table_id(),
config.query.max_query_log_size as i32,
)),
system::EnginesTable::create(sys_db_meta.next_table_id()),
system::RolesTable::create(sys_db_meta.next_table_id()),
system::StagesTable::create(sys_db_meta.next_table_id()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use common_catalog::catalog::CATALOG_DEFAULT;
use common_datablocks::DataBlock;
use common_datavalues::prelude::Series;
use common_datavalues::prelude::SeriesFrom;
use common_exception::Result;

use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::storages::system::ClusteringHistoryTable;

pub struct InterpreterClusteringHistory {
ctx: Arc<QueryContext>,
}

impl InterpreterClusteringHistory {
pub fn create(ctx: Arc<QueryContext>) -> Self {
InterpreterClusteringHistory { ctx }
}

pub async fn write_log(
&self,
start: SystemTime,
db_name: &str,
table_name: &str,
) -> Result<()> {
let start_time = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as i64;
let end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as i64;
let reclustered_bytes = self.ctx.get_scan_progress_value().bytes as u64;
let reclustered_rows = self.ctx.get_scan_progress_value().rows as u64;

let table = self
.ctx
.get_table(CATALOG_DEFAULT, "system", "clustering_history")
.await?;
let schema = table.get_table_info().meta.schema.clone();

let block = DataBlock::create(schema.clone(), vec![
Series::from_data(vec![start_time]),
Series::from_data(vec![end_time]),
Series::from_data(vec![db_name]),
Series::from_data(vec![table_name]),
Series::from_data(vec![reclustered_bytes]),
Series::from_data(vec![reclustered_rows]),
]);
let blocks = vec![Ok(block)];
let input_stream = futures::stream::iter::<Vec<Result<DataBlock>>>(blocks);

let clustering_history_table: &ClusteringHistoryTable =
table.as_any().downcast_ref().unwrap();
clustering_history_table
.append_data(self.ctx.clone(), Box::pin(input_stream))
.await?;
Ok(())
}
}
30 changes: 15 additions & 15 deletions src/query/service/src/interpreters/interpreter_query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,29 @@ impl InterpreterQueryLog {
Series::from_data(vec![event.query_id.as_str()]),
Series::from_data(vec![event.query_kind.as_str()]),
Series::from_data(vec![event.query_text.as_str()]),
Series::from_data(vec![event.event_date as i32]),
Series::from_data(vec![event.event_time as i64]),
Series::from_data(vec![event.event_date]),
Series::from_data(vec![event.event_time]),
// Schema.
Series::from_data(vec![event.current_database.as_str()]),
Series::from_data(vec![event.databases.as_str()]),
Series::from_data(vec![event.tables.as_str()]),
Series::from_data(vec![event.columns.as_str()]),
Series::from_data(vec![event.projections.as_str()]),
// Stats.
Series::from_data(vec![event.written_rows as u64]),
Series::from_data(vec![event.written_bytes as u64]),
Series::from_data(vec![event.written_io_bytes as u64]),
Series::from_data(vec![event.written_io_bytes_cost_ms as u64]),
Series::from_data(vec![event.scan_rows as u64]),
Series::from_data(vec![event.scan_bytes as u64]),
Series::from_data(vec![event.scan_io_bytes as u64]),
Series::from_data(vec![event.scan_io_bytes_cost_ms as u64]),
Series::from_data(vec![event.scan_partitions as u64]),
Series::from_data(vec![event.total_partitions as u64]),
Series::from_data(vec![event.result_rows as u64]),
Series::from_data(vec![event.result_bytes as u64]),
Series::from_data(vec![event.written_rows]),
Series::from_data(vec![event.written_bytes]),
Series::from_data(vec![event.written_io_bytes]),
Series::from_data(vec![event.written_io_bytes_cost_ms]),
Series::from_data(vec![event.scan_rows]),
Series::from_data(vec![event.scan_bytes]),
Series::from_data(vec![event.scan_io_bytes]),
Series::from_data(vec![event.scan_io_bytes_cost_ms]),
Series::from_data(vec![event.scan_partitions]),
Series::from_data(vec![event.total_partitions]),
Series::from_data(vec![event.result_rows]),
Series::from_data(vec![event.result_bytes]),
Series::from_data(vec![event.cpu_usage]),
Series::from_data(vec![event.memory_usage as u64]),
Series::from_data(vec![event.memory_usage]),
// Client.
Series::from_data(vec![event.client_info.as_str()]),
Series::from_data(vec![event.client_address.as_str()]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use common_exception::Result;
use common_planners::ReclusterTablePlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::Pipeline;
Expand Down Expand Up @@ -48,6 +50,7 @@ impl Interpreter for ReclusterTableInterpreter {
let ctx = self.ctx.clone();
let settings = ctx.get_settings();
let tenant = ctx.get_tenant();
let start = SystemTime::now();
loop {
let table = self
.ctx
Expand Down Expand Up @@ -92,6 +95,10 @@ impl Interpreter for ReclusterTableInterpreter {
}
}

InterpreterClusteringHistory::create(ctx.clone())
.write_log(start, &plan.database, &plan.table)
.await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod interpreter;
mod interpreter_call;
mod interpreter_cluster_key_alter;
mod interpreter_cluster_key_drop;
mod interpreter_clustering_history;
mod interpreter_common;
mod interpreter_copy_v2;
mod interpreter_database_create;
Expand Down Expand Up @@ -106,6 +107,7 @@ pub use interpreter::InterpreterPtr;
pub use interpreter_call::CallInterpreter;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
pub use interpreter_clustering_history::InterpreterClusteringHistory;
pub use interpreter_common::append2table;
pub use interpreter_common::commit2table;
pub use interpreter_common::list_files_from_dal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,22 @@ async fn test_alter_recluster_interpreter() -> Result<()> {
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());
}

// clustering_history.
{
let query = "select count(*) from system.clustering_history";
let (plan, _, _) = planner.plan_sql(query).await?;
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
let stream = executor.execute().await?;
let result = stream.try_collect::<Vec<_>>().await?;
let expected = vec![
"+----------+",
"| count(*) |",
"+----------+",
"| 1 |",
"+----------+",
];
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use databend_query::storages::ToReadDataSourcePlan;
use futures::TryStreamExt;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_tables_table() -> Result<()> {
async fn test_databases_table() -> Result<()> {
let (_guard, ctx) = crate::tests::create_query_context().await?;
let table = DatabasesTable::create(1);
let source_plan = table.read_plan(ctx.clone(), None).await?;
Expand Down

1 comment on commit fa49f1c

@vercel
Copy link

@vercel vercel bot commented on fa49f1c Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.