Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ async-trait = "0.1"
bytes = "1"
futures = "0.3"
opendal = { workspace = true }
parking_lot = "0.12.1"
regex = "1.6.0"
serde = { workspace = true }

Expand Down
36 changes: 8 additions & 28 deletions src/common/storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use opendal::raw::RpRead;
use opendal::raw::RpScan;
use opendal::raw::RpWrite;
use opendal::Result;
use parking_lot::RwLock;

/// StorageMetrics represents the metrics of storage (all bytes metrics are compressed size).
#[derive(Debug, Default)]
Expand All @@ -50,8 +49,6 @@ pub struct StorageMetrics {
partitions_scanned: AtomicU64,
/// Number of partitions, before pruning
partitions_total: AtomicU64,
/// Status of the operation.
status: Arc<RwLock<String>>,
}

impl StorageMetrics {
Expand All @@ -72,13 +69,6 @@ impl StorageMetrics {
partitions_total: AtomicU64::new(
vs.iter().map(|v| v.as_ref().get_partitions_total()).sum(),
),
// Get the last one status, mainly used for the single table operation.
status: Arc::new(RwLock::new(
vs.iter()
.map(|v| v.as_ref().get_status())
.collect::<Vec<String>>()
.join("|"),
)),
}
}

Expand Down Expand Up @@ -141,16 +131,6 @@ impl StorageMetrics {
pub fn get_partitions_total(&self) -> u64 {
self.partitions_total.load(Ordering::Relaxed)
}

pub fn set_status(&self, new: &str) {
let mut status = self.status.write();
*status = new.to_string();
}

pub fn get_status(&self) -> String {
let status = self.status.read();
status.clone()
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -210,6 +190,14 @@ impl<A: Accessor> LayeredAccessor for StorageMetricsAccessor<A> {
.map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone())))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner.scan(path, args).await
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner
.blocking_read(path, args)
Expand All @@ -222,14 +210,6 @@ impl<A: Accessor> LayeredAccessor for StorageMetricsAccessor<A> {
.map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone())))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner.scan(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait TableContext: Send + Sync {
fn get_write_progress_value(&self) -> ProgressValues;
fn get_result_progress(&self) -> Arc<Progress>;
fn get_result_progress_value(&self) -> ProgressValues;
fn get_status_info(&self) -> String;
fn set_status_info(&self, info: &str);

fn get_partition(&self) -> Option<PartInfoPtr>;
fn get_partitions(&self, num: usize) -> Vec<PartInfoPtr>;
Expand Down
75 changes: 62 additions & 13 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::sessions::TableContext;
use crate::sql::plans::CopyPlan;
use crate::sql::plans::Plan;

const MAX_QUERY_COPIED_FILES_NUM: usize = 50;
const MAX_QUERY_COPIED_FILES_NUM: usize = 1000;

pub struct CopyInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl CopyInterpreter {
}

// Colored.
let mut results = vec![];
let mut results = Vec::with_capacity(files.len());
for mut file in files {
if let Some(copied_file) = copied_files.get(&file.path) {
match &copied_file.etag {
Expand Down Expand Up @@ -280,10 +280,32 @@ impl CopyInterpreter {
let start = Instant::now();
let ctx = self.ctx.clone();
let table_ctx: Arc<dyn TableContext> = ctx.clone();

// Status.
{
let status = "begin to list files";
ctx.set_status_info(status);
info!(status);
}

let mut stage_table_info = stage_table_info.clone();
let mut all_source_file_infos = StageTable::list_files(&stage_table_info).await?;

// Status.
{
let status = format!("end to list files: {}", all_source_file_infos.len());
ctx.set_status_info(&status);
info!(status);
}

if !force {
// Status.
{
let status = "begin to color copied files";
ctx.set_status_info(status);
info!(status);
}

all_source_file_infos = CopyInterpreter::color_copied_files(
&table_ctx,
catalog_name,
Expand All @@ -293,7 +315,12 @@ impl CopyInterpreter {
)
.await?;

// Need copied file info.
// Status.
{
let status = format!("end to color copied files: {}", all_source_file_infos.len());
ctx.set_status_info(&status);
info!(status);
}
}

let mut need_copied_file_infos = vec![];
Expand All @@ -315,15 +342,28 @@ impl CopyInterpreter {
return Ok(build_res);
}

stage_table_info.files_to_copy = Some(need_copied_file_infos.clone());
// Status.
{
let status = "begin to read stage table plan";
ctx.set_status_info(status);
info!(status);
}

stage_table_info.files_to_copy = Some(need_copied_file_infos.clone());
let stage_table = StageTable::try_create(stage_table_info.clone())?;
let read_source_plan = {
stage_table
.read_plan_with_catalog(ctx.clone(), catalog_name.to_string(), None)
.await?
};

// Status.
{
let status = "begin to read stage table data";
ctx.set_status_info(status);
info!(status);
}

let to_table = ctx
.get_table(catalog_name, database_name, table_name)
.await?;
Expand Down Expand Up @@ -429,17 +469,26 @@ impl CopyInterpreter {
&all_source_files,
)
.await;
info!(
"copy: try to purge files:{}, elapsed:{}",
all_source_files.len(),
purge_start.elapsed().as_secs()
);

// Status.
{
let status = format!(
"try to purge files:{}, elapsed:{}",
all_source_files.len(),
purge_start.elapsed().as_secs()
);
ctx.set_status_info(&status);
info!(status);
}
}

info!(
"copy: all copy finished, elapsed:{}",
start.elapsed().as_secs()
);
// Status.
{
let status =
format!("all copy finished, elapsed:{}", start.elapsed().as_secs());
ctx.set_status_info(&status);
info!(status);
}

Ok(())
});
Expand Down
17 changes: 1 addition & 16 deletions src/query/service/src/interpreters/interpreter_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::number::UInt64Type;
use common_expression::types::StringType;
Expand All @@ -25,7 +24,6 @@ use common_expression::FromOptData;
use common_sql::plans::ListPlan;
use common_storages_stage::list_file;
use common_storages_stage::StageTable;
use regex::Regex;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -57,20 +55,7 @@ impl Interpreter for ListInterpreter {
async fn execute2(&self) -> Result<PipelineBuildResult> {
let plan = &self.plan;
let op = StageTable::get_op(&plan.stage)?;
let mut files = list_file(&op, &plan.path).await?;

let files = if plan.pattern.is_empty() {
files
} else {
let regex = Regex::new(&plan.pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
&plan.pattern, e
))
})?;
files.retain(|v| regex.is_match(&v.path));
files
};
let files = list_file(&op, &plan.path, &plan.pattern).await?;

let names: Vec<Vec<u8>> = files
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_sql::plans::RemoveStagePlan;
use common_storages_fuse::io::Files;
use common_storages_stage::list_file;
use common_storages_stage::StageTable;
use regex::Regex;
use tracing::error;

use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -50,21 +48,7 @@ impl Interpreter for RemoveUserStageInterpreter {
async fn execute2(&self) -> Result<PipelineBuildResult> {
let plan = self.plan.clone();
let op = StageTable::get_op(&self.plan.stage)?;
let mut stage_files = list_file(&op, &plan.path).await?;

let files = if plan.pattern.is_empty() {
stage_files
} else {
let regex = Regex::new(&plan.pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
&plan.pattern, e
))
})?;

stage_files.retain(|v| regex.is_match(&v.path));
stage_files
};
let files = list_file(&op, &plan.path, &plan.pattern).await?;

let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let file_op = Files::create(table_ctx, op);
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ impl TableContext for QueryContext {
self.shared.result_progress.as_ref().get_values()
}

fn get_status_info(&self) -> String {
let status = self.shared.status.read();
status.clone()
}

fn set_status_info(&self, info: &str) {
let mut status = self.shared.status.write();
*status = info.to_string();
}

fn get_partition(&self) -> Option<PartInfoPtr> {
self.partition_queue.write().pop_front()
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct QueryContextShared {
/// partitions_sha for each table in the query. Not empty only when enabling query result cache.
pub(in crate::sessions) partitions_shas: Arc<RwLock<Vec<String>>>,
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,
}

impl QueryContextShared {
Expand Down Expand Up @@ -115,6 +117,7 @@ impl QueryContextShared {
on_error_map: Arc::new(RwLock::new(None)),
partitions_shas: Arc::new(RwLock::new(vec![])),
cacheable: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
}))
}

Expand Down
6 changes: 6 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_status_info(&self) -> String {
"".to_string()
}

fn set_status_info(&self, _info: &str) {}

fn get_partition(&self) -> Option<PartInfoPtr> {
todo!()
}
Expand Down
Loading