Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,18 +1215,6 @@ pub struct QueryConfig {
#[clap(skip)]
pub jwt_key_files: Vec<String>,

/// The maximum memory size of the buffered data collected per insert before being inserted.
#[clap(long, default_value = "10000")]
pub async_insert_max_data_size: u64,

/// The maximum timeout in milliseconds since the first insert before inserting collected data.
#[clap(long, default_value = "200")]
pub async_insert_busy_timeout: u64,

/// The maximum timeout in milliseconds since the last insert before inserting collected data.
#[clap(long, default_value = "0")]
pub async_insert_stale_timeout: u64,

#[clap(long, default_value = "auto")]
pub default_storage_format: String,

Expand Down Expand Up @@ -1349,9 +1337,6 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
management_mode: self.management_mode,
jwt_key_file: self.jwt_key_file,
jwt_key_files: self.jwt_key_files,
async_insert_max_data_size: self.async_insert_max_data_size,
async_insert_busy_timeout: self.async_insert_busy_timeout,
async_insert_stale_timeout: self.async_insert_stale_timeout,
default_storage_format: self.default_storage_format,
default_compression: self.default_compression,
idm: InnerIDMConfig {
Expand Down Expand Up @@ -1409,9 +1394,6 @@ impl From<InnerQueryConfig> for QueryConfig {
management_mode: inner.management_mode,
jwt_key_file: inner.jwt_key_file,
jwt_key_files: inner.jwt_key_files,
async_insert_max_data_size: inner.async_insert_max_data_size,
async_insert_busy_timeout: inner.async_insert_busy_timeout,
async_insert_stale_timeout: inner.async_insert_stale_timeout,
default_storage_format: inner.default_storage_format,
default_compression: inner.default_compression,

Expand Down
6 changes: 0 additions & 6 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ pub struct QueryConfig {
pub management_mode: bool,
pub jwt_key_file: String,
pub jwt_key_files: Vec<String>,
pub async_insert_max_data_size: u64,
pub async_insert_busy_timeout: u64,
pub async_insert_stale_timeout: u64,
pub default_storage_format: String,
pub default_compression: String,
pub idm: IDMConfig,
Expand Down Expand Up @@ -204,9 +201,6 @@ impl Default for QueryConfig {
management_mode: false,
jwt_key_file: "".to_string(),
jwt_key_files: Vec::new(),
async_insert_max_data_size: 10000,
async_insert_busy_timeout: 200,
async_insert_stale_timeout: 0,
default_storage_format: "auto".to_string(),
default_compression: "auto".to_string(),
idm: IDMConfig::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl InterpreterFactory {
*alter_user.clone(),
)?)),

Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone(), false),
Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()),

Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),

Expand Down
243 changes: 113 additions & 130 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,14 @@ pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
plan: Insert,
source_pipe_builder: Mutex<Option<SourcePipeBuilder>>,
async_insert: bool,
}

impl InsertInterpreter {
pub fn try_create(
ctx: Arc<QueryContext>,
plan: Insert,
async_insert: bool,
) -> Result<InterpreterPtr> {
pub fn try_create(ctx: Arc<QueryContext>, plan: Insert) -> Result<InterpreterPtr> {
Ok(Arc::new(InsertInterpreter {
ctx,
plan,
source_pipe_builder: Mutex::new(None),
async_insert,
}))
}

Expand Down Expand Up @@ -360,132 +354,121 @@ impl Interpreter for InsertInterpreter {

let mut build_res = PipelineBuildResult::create();

if self.async_insert {
build_res.main_pipeline.add_pipe(
((*self.source_pipe_builder.lock()).clone())
.ok_or_else(|| ErrorCode::EmptyData("empty source pipe builder"))?
.finalize(),
);
} else {
match &self.plan.source {
InsertInputSource::Values(data) => {
let settings = self.ctx.get_settings();

build_res.main_pipeline.add_source(
|output| {
let name_resolution_ctx =
NameResolutionContext::try_from(settings.as_ref())?;
let inner = ValueSource::new(
data.to_string(),
self.ctx.clone(),
name_resolution_ctx,
plan.schema(),
);
AsyncSourcer::create(self.ctx.clone(), output, inner)
},
1,
)?;
}
InsertInputSource::StreamingWithFormat(_, _, input_context) => {
let input_context = input_context.as_ref().expect("must success").clone();
input_context
.format
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
}
InsertInputSource::StreamingWithFileFormat(_, _, input_context) => {
let input_context = input_context.as_ref().expect("must success").clone();
input_context
.format
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
}
InsertInputSource::Stage(opts) => {
tracing::info!("insert: from stage with options {:?}", opts);
self.build_insert_from_stage_pipeline(
table.clone(),
opts.clone(),
&mut build_res.main_pipeline,
)
.await?;
return Ok(build_res);
}
InsertInputSource::SelectPlan(plan) => {
let table1 = table.clone();
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
Plan::Query {
s_expr,
metadata,
bind_context,
..
} => {
let mut builder1 =
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone());
(builder1.build(s_expr).await?, bind_context.columns.clone())
}
_ => unreachable!(),
};

let catalog = self.plan.catalog.clone();

let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
let input = exchange.input.clone();
exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(
Box::new(DistributedInsertSelect {
input,
catalog,
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
cast_needed: self.check_schema_cast(plan)?,
}),
));
select_plan
}
other_plan => {
// insert should wait until all nodes finished
PhysicalPlan::DistributedInsertSelect(Box::new(
DistributedInsertSelect {
input: Box::new(other_plan),
catalog,
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
cast_needed: self.check_schema_cast(plan)?,
},
))
}
};

let mut build_res =
build_query_pipeline(&self.ctx, &[], &insert_select_plan, false, false)
.await?;
match &self.plan.source {
InsertInputSource::Values(data) => {
let settings = self.ctx.get_settings();

let ctx = self.ctx.clone();
let overwrite = self.plan.overwrite;
build_res.main_pipeline.set_on_finished(move |may_error| {
// capture out variable
let overwrite = overwrite;
let ctx = ctx.clone();
let table = table.clone();

if may_error.is_none() {
let append_entries = ctx.consume_precommit_blocks();
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
return GlobalIORuntime::instance().block_on(async move {
table.commit_insertion(ctx, append_entries, overwrite).await
});
}
build_res.main_pipeline.add_source(
|output| {
let name_resolution_ctx =
NameResolutionContext::try_from(settings.as_ref())?;
let inner = ValueSource::new(
data.to_string(),
self.ctx.clone(),
name_resolution_ctx,
plan.schema(),
);
AsyncSourcer::create(self.ctx.clone(), output, inner)
},
1,
)?;
}
InsertInputSource::StreamingWithFormat(_, _, input_context) => {
let input_context = input_context.as_ref().expect("must success").clone();
input_context
.format
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
}
InsertInputSource::StreamingWithFileFormat(_, _, input_context) => {
let input_context = input_context.as_ref().expect("must success").clone();
input_context
.format
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
}
InsertInputSource::Stage(opts) => {
tracing::info!("insert: from stage with options {:?}", opts);
self.build_insert_from_stage_pipeline(
table.clone(),
opts.clone(),
&mut build_res.main_pipeline,
)
.await?;
return Ok(build_res);
}
InsertInputSource::SelectPlan(plan) => {
let table1 = table.clone();
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
Plan::Query {
s_expr,
metadata,
bind_context,
..
} => {
let mut builder1 =
PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone());
(builder1.build(s_expr).await?, bind_context.columns.clone())
}
_ => unreachable!(),
};

let catalog = self.plan.catalog.clone();

let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
let input = exchange.input.clone();
exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new(
DistributedInsertSelect {
input,
catalog,
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
cast_needed: self.check_schema_cast(plan)?,
},
)));
select_plan
}
other_plan => {
// insert should wait until all nodes finished
PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect {
input: Box::new(other_plan),
catalog,
table_info: table1.get_table_info().clone(),
select_schema: plan.schema(),
select_column_bindings,
insert_schema: self.plan.schema(),
cast_needed: self.check_schema_cast(plan)?,
}))
}
};

let mut build_res =
build_query_pipeline(&self.ctx, &[], &insert_select_plan, false, false).await?;

let ctx = self.ctx.clone();
let overwrite = self.plan.overwrite;
build_res.main_pipeline.set_on_finished(move |may_error| {
// capture out variable
let overwrite = overwrite;
let ctx = ctx.clone();
let table = table.clone();

if may_error.is_none() {
let append_entries = ctx.consume_precommit_blocks();
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
return GlobalIORuntime::instance().block_on(async move {
table.commit_insertion(ctx, append_entries, overwrite).await
});
}

Err(may_error.as_ref().unwrap().clone())
});
Err(may_error.as_ref().unwrap().clone())
});

return Ok(build_res);
}
};
}
return Ok(build_res);
}
};

let append_mode = match &self.plan.source {
InsertInputSource::StreamingWithFormat(..)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl CreateTableInterpreter {
source: InsertInputSource::SelectPlan(select_plan),
};

InsertInterpreter::try_create(self.ctx.clone(), insert_plan, false)?
InsertInterpreter::try_create(self.ctx.clone(), insert_plan)?
.execute2()
.await
}
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

mod access;
// mod async_insert_queue_v2;
mod common;
mod interpreter;
mod interpreter_call;
Expand Down
3 changes: 0 additions & 3 deletions src/query/service/tests/it/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,9 +707,6 @@ wait_timeout_mills = 5000
max_query_log_size = 10000
management_mode = false
jwt_key_file = ""
async_insert_max_data_size = 10000
async_insert_busy_timeout = 200
async_insert_stale_timeout = 0
users = []
share_endpoint_address = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| "query" | "api_tls_server_cert" | "" | "" |
| "query" | "api_tls_server_key" | "" | "" |
| "query" | "api_tls_server_root_ca_cert" | "" | "" |
| "query" | "async_insert_busy_timeout" | "200" | "" |
| "query" | "async_insert_max_data_size" | "10000" | "" |
| "query" | "async_insert_stale_timeout" | "0" | "" |
| "query" | "clickhouse_handler_host" | "127.0.0.1" | "" |
| "query" | "clickhouse_handler_port" | "9000" | "" |
| "query" | "clickhouse_http_handler_host" | "127.0.0.1" | "" |
Expand Down
21 changes: 0 additions & 21 deletions src/query/service/tests/it/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,6 @@ impl ConfigBuilder {
self
}

/// TODO: remove this settings in the future?
#[allow(dead_code)]
pub fn async_insert_busy_timeout(mut self, value: u64) -> ConfigBuilder {
self.conf.query.async_insert_busy_timeout = value;
self
}

/// TODO: remove this settings in the future?
#[allow(dead_code)]
pub fn async_insert_max_data_size(mut self, value: u64) -> ConfigBuilder {
self.conf.query.async_insert_max_data_size = value;
self
}

/// TODO: remove this settings in the future?
#[allow(dead_code)]
pub fn async_insert_stale_timeout(mut self, value: u64) -> ConfigBuilder {
self.conf.query.async_insert_stale_timeout = value;
self
}

pub fn http_handler_result_timeout(mut self, value: impl Into<u64>) -> ConfigBuilder {
self.conf.query.http_handler_result_timeout_secs = value.into();
self
Expand Down