Skip to content

Commit

Permalink
feat: create tables in batch on prom write (#3246)
Browse files Browse the repository at this point in the history
* feat: create tables in batch on prom write

* feat: add logic table ids to log

* fix: miss tabble ids in response
  • Loading branch information
fengjiachun committed Jan 26, 2024
1 parent 7da8f22 commit 3201aea
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 16 deletions.
6 changes: 4 additions & 2 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ impl CreateLogicalTablesProcedure {
manager.create_logic_tables_metadata(tables_data).await?;
}

info!("Created {num_tables} tables metadata for physical table {physical_table_id}");
let table_ids = self.creator.data.real_table_ids();

Ok(Status::done_with_output(self.creator.data.real_table_ids()))
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");

Ok(Status::done_with_output(table_ids))
}

fn create_region_request_builder(
Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {

fn try_from(resp: PbSubmitDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.map(|t| t.id);
let table_ids = resp.table_ids.iter().map(|t| t.id).collect();
let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
Ok(Self {
key: resp.key,
table_id,
Expand All @@ -219,6 +219,11 @@ impl From<SubmitDdlTaskResponse> for PbSubmitDdlTaskResponse {
table_id: val
.table_id
.map(|table_id| api::v1::meta::TableId { id: table_id }),
table_ids: val
.table_ids
.into_iter()
.map(|id| api::v1::meta::TableId { id })
.collect(),
..Default::default()
}
}
Expand Down
92 changes: 79 additions & 13 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,36 @@ impl Inserter {
on_physical_table: Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
// TODO(jeremy): create and alter in batch? (from `handle_metric_row_inserts`)
let mut create_tables = vec![];
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
match table {
Some(table) => {
// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
validate_request_with_table(req, &table)?;
self.alter_table_on_demand(req, table, ctx, statement_executor)
.await?
}
None => {
self.create_table(req, ctx, &on_physical_table, statement_executor)
.await?
create_tables.push(req);
}
}
}
if !create_tables.is_empty() {
if let Some(on_physical_table) = on_physical_table {
// Creates logical tables in batch.
self.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
}
}
}
Expand Down Expand Up @@ -403,7 +419,6 @@ impl Inserter {
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
on_physical_table: &Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
let table_ref =
Expand All @@ -412,15 +427,7 @@ impl Inserter {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;

if let Some(physical_table) = on_physical_table {
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr.table_options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_table.clone(),
);
}

info!("Table `{table_ref}` does not exist, try creating table",);
info!("Table `{table_ref}` does not exist, try creating table");

// TODO(weny): multiple regions table.
let res = statement_executor
Expand All @@ -444,6 +451,65 @@ impl Inserter {
}
}
}

async fn create_logical_tables(
&self,
create_tables: Vec<&RowInsertRequest>,
ctx: &QueryContextRef,
physical_table: &str,
statement_executor: &StatementExecutor,
) -> Result<()> {
let create_table_exprs = create_tables
.iter()
.map(|req| {
let table_ref = TableReference::full(
ctx.current_catalog(),
ctx.current_schema(),
&req.table_name,
);

info!("Logical table `{table_ref}` does not exist, try creating table");

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr.table_options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_table.to_string(),
);

Ok(create_table_expr)
})
.collect::<Result<Vec<_>>>()?;

let res = statement_executor
.create_logical_tables(&create_table_exprs)
.await;

match res {
Ok(_) => {
info!("Successfully created logical tables");
Ok(())
}
Err(err) => {
let failed_tables = create_table_exprs
.into_iter()
.map(|expr| {
format!(
"{}.{}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
)
})
.collect::<Vec<_>>();
error!(
"Failed to create logical tables {:?}: {}",
failed_tables, err
);
Err(err)
}
}
}
}

fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
Expand Down

0 comments on commit 3201aea

Please sign in to comment.