Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Optimize `RowCursor` by reusing buffer capacity where possible ([#340])
* All `Query::fetch*` methods will always use POST instead of GET. It is now allowed to change `readonly` value via
`Query::with_option`. ([#342])
* In case of a schema mismatch, the client now emits `clickhouse::error::Error::SchemaMismatch` instead of panicking.
([#346])

[#283]: https://github.com/ClickHouse/clickhouse-rs/pull/283
[#340]: https://github.com/ClickHouse/clickhouse-rs/pull/340
[#342]: https://github.com/ClickHouse/clickhouse-rs/pull/342
[#346]: https://github.com/ClickHouse/clickhouse-rs/pull/346

## [0.14.0] - 2025-10-08

Expand Down
3 changes: 2 additions & 1 deletion src/cursors/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl<T> RowCursor<T> {
match parse_rbwnat_columns_header(&mut slice) {
Ok(columns) if !columns.is_empty() => {
self.bytes.set_remaining(slice.len());
self.row_metadata = Some(RowMetadata::new_for_cursor::<T>(columns));
let row_metadata = RowMetadata::new_for_cursor::<T>(columns)?;
self.row_metadata = Some(row_metadata);
return Poll::Ready(Ok(()));
}
Ok(_) => {
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum Error {
TimedOut,
#[error("error while parsing columns header from the response: {0}")]
InvalidColumnsHeader(#[source] BoxedError),
#[error("schema mismatch: {0}")]
SchemaMismatch(String),
#[error("unsupported: {0}")]
Unsupported(String),
#[error("{0}")]
Expand Down
12 changes: 5 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,8 @@ impl Client {
pub async fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
if self.get_validation() {
let metadata = self.get_insert_metadata(table).await?;
return Ok(insert::Insert::new(
self,
table,
Some(metadata.to_row::<T>()),
));
let row = metadata.to_row::<T>()?;
return Ok(insert::Insert::new(self, table, Some(row)));
}
Ok(insert::Insert::new(self, table, None))
}
Expand Down Expand Up @@ -771,7 +768,8 @@ mod client_tests {

#[test]
fn get_row_metadata() {
let metadata = RowMetadata::new_for_cursor::<SystemRolesRow>(SystemRolesRow::columns());
let metadata =
RowMetadata::new_for_cursor::<SystemRolesRow>(SystemRolesRow::columns()).unwrap();
assert_eq!(metadata.columns, SystemRolesRow::columns());
assert_eq!(metadata.access_type, AccessType::WithSeqAccess);

Expand All @@ -781,7 +779,7 @@ mod client_tests {
Column::new("storage".to_string(), DataTypeNode::String),
Column::new("name".to_string(), DataTypeNode::String),
];
let metadata = RowMetadata::new_for_cursor::<SystemRolesRow>(columns.clone());
let metadata = RowMetadata::new_for_cursor::<SystemRolesRow>(columns.clone()).unwrap();
assert_eq!(metadata.columns, columns);
assert_eq!(
metadata.access_type,
Expand Down
59 changes: 31 additions & 28 deletions src/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::Row;
use crate::error::Error;
use crate::error::Result;
use crate::row::RowKind;
use clickhouse_types::Column;
use std::collections::HashMap;
Expand Down Expand Up @@ -50,48 +51,48 @@ pub(crate) enum ColumnDefaultKind {
}

impl RowMetadata {
pub(crate) fn new_for_cursor<T: Row>(columns: Vec<Column>) -> Self {
pub(crate) fn new_for_cursor<T: Row>(columns: Vec<Column>) -> Result<Self> {
let access_type = match T::KIND {
RowKind::Primitive => {
if columns.len() != 1 {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing a primitive row: \
expected only 1 column in the database schema, \
but got {} instead.\n#### All schema columns:\n{}",
columns.len(),
join_panic_schema_hint(&columns),
);
)));
}
AccessType::WithSeqAccess // ignored
}
RowKind::Tuple => {
if T::COLUMN_COUNT != columns.len() {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing a tuple row: database schema has {} columns, \
but the tuple definition has {} fields in total.\
\n#### All schema columns:\n{}",
columns.len(),
T::COLUMN_COUNT,
join_panic_schema_hint(&columns),
);
)));
}
AccessType::WithSeqAccess // ignored
}
RowKind::Vec => {
if columns.len() != 1 {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing a row defined as a vector: \
expected only 1 column in the database schema, \
but got {} instead.\n#### All schema columns:\n{}",
columns.len(),
join_panic_schema_hint(&columns),
);
)));
}
AccessType::WithSeqAccess // ignored
}
RowKind::Struct => {
if columns.len() != T::COLUMN_NAMES.len() {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has {} columns, \
but the struct definition has {} fields.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
Expand All @@ -100,7 +101,7 @@ impl RowMetadata {
T::COLUMN_NAMES.len(),
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&columns),
);
)));
}
let mut mapping = Vec::with_capacity(T::COLUMN_NAMES.len());
let mut expected_index = 0;
Expand All @@ -114,14 +115,14 @@ impl RowMetadata {
expected_index += 1;
mapping.push(index);
} else {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has a column {col} \
that was not found in the struct definition.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&columns),
);
)));
}
}
if should_use_map {
Expand All @@ -131,10 +132,10 @@ impl RowMetadata {
}
}
};
Self {
Ok(Self {
columns,
access_type,
}
})
}

/// Returns the index of the column in the database schema
Expand All @@ -144,17 +145,19 @@ impl RowMetadata {
/// since we write the header with the field order defined in the struct,
/// and ClickHouse server figures out the rest on its own.
#[inline]
pub(crate) fn get_schema_index(&self, struct_idx: usize) -> usize {
pub(crate) fn get_schema_index(&self, struct_idx: usize) -> Result<usize> {
match &self.access_type {
AccessType::WithMapAccess(mapping) => {
if struct_idx < mapping.len() {
mapping[struct_idx]
Ok(mapping[struct_idx])
} else {
// unreachable
panic!("Struct has more fields than columns in the database schema")
Err(Error::SchemaMismatch(
"Struct has more fields than columns in the database schema".to_string(),
))
}
}
AccessType::WithSeqAccess => struct_idx, // should be unreachable
AccessType::WithSeqAccess => Ok(struct_idx), // should be unreachable
}
}

Expand Down Expand Up @@ -211,14 +214,14 @@ impl Display for ColumnDefaultKind {
}

impl InsertMetadata {
pub(crate) fn to_row<T: Row>(&self) -> RowMetadata {
pub(crate) fn to_row<T: Row>(&self) -> Result<RowMetadata> {
if T::KIND != RowKind::Struct {
panic!(
return Err(Error::SchemaMismatch(format!(
"SerializerRowMetadata can only be created for structs, \
but got {:?} instead.\n#### All schema columns:\n{}",
T::KIND,
join_panic_schema_hint(&self.row_metadata.columns),
);
)));
}

let mut result_columns: Vec<Column> = Vec::with_capacity(T::COLUMN_COUNT);
Expand All @@ -228,11 +231,11 @@ impl InsertMetadata {
match self.column_lookup.get(*struct_column_name) {
Some(&col) => {
if self.column_default_kinds[col].is_immutable() {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: column {struct_column_name} is immutable (declared as `{}`)",
T::NAME,
self.column_default_kinds[col],
);
)));
}

// TODO: what should happen if a column is mentioned multiple times?
Expand All @@ -241,13 +244,13 @@ impl InsertMetadata {
result_columns.push(self.row_metadata.columns[col].clone())
}
None => {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has no column named {struct_column_name}.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&self.row_metadata.columns),
);
)));
}
}
}
Expand All @@ -263,19 +266,19 @@ impl InsertMetadata {
let missing_columns_hint = join_panic_schema_hint(missing_columns);

if !missing_columns_hint.is_empty() {
panic!(
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: the following non-default columns are missing:\n{missing_columns_hint}\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&self.row_metadata.columns),
)
)));
}

RowMetadata {
Ok(RowMetadata {
columns: result_columns,
access_type: AccessType::WithSeqAccess, // ignored
}
})
}
}

Expand Down
Loading