diff --git a/CHANGELOG.md b/CHANGELOG.md index 85118484..a709707f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - [Variant data type](https://clickhouse.com/docs/en/sql-reference/data-types/variant) support ([#170]). +### Fixed +- query/cursor: return `NotEnoughData` if a row is unparsed when the stream ends ([#185]). + [#170]: https://github.com/ClickHouse/clickhouse-rs/pull/170 +[#185]: https://github.com/ClickHouse/clickhouse-rs/pull/185 ## [0.13.1] - 2024-10-21 ### Added diff --git a/src/cursor.rs b/src/cursor.rs index a58603ca..b2074331 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -126,6 +126,11 @@ impl RowCursor { match self.raw.next().await? { Some(chunk) => self.bytes.extend(chunk), + None if self.bytes.remaining() > 0 => { + // If some data is left, we have an incomplete row in the buffer. + // This is usually a schema mismatch on the client side. + return Err(Error::NotEnoughData); + } None => return Ok(None), } } diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index 7e27eeac..c7c41392 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -2,9 +2,8 @@ use std::{convert::TryFrom, mem, str}; use crate::error::{Error, Result}; use bytes::Buf; -use serde::de::{EnumAccess, VariantAccess}; use serde::{ - de::{DeserializeSeed, Deserializer, SeqAccess, Visitor}, + de::{DeserializeSeed, Deserializer, EnumAccess, SeqAccess, VariantAccess, Visitor}, Deserialize, }; diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index 21d1bad3..e4894dc4 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -1,4 +1,6 @@ -use clickhouse::{Client, Compression}; +use serde::Deserialize; + +use clickhouse::{error::Error, Client, Compression, Row}; #[tokio::test] async fn deferred() { @@ -96,3 +98,40 @@ async fn deferred_lz4() { assert_ne!(i, 0); // we're interested only in errors during processing assert!(err.to_string().contains("TIMEOUT_EXCEEDED")); } + +// See #185. +#[tokio::test] +async fn invalid_schema() { + #[derive(Debug, Row, Deserialize)] + #[allow(dead_code)] + struct MyRow { + no: u32, + dec: Option, // valid schema: u64-based types + } + + let client = prepare_database!(); + + client + .query( + "CREATE TABLE test(no UInt32, dec Nullable(Decimal64(4))) + ENGINE = MergeTree + ORDER BY no", + ) + .execute() + .await + .unwrap(); + + client + .query("INSERT INTO test VALUES (1, 1.1), (2, 2.2), (3, 3.3)") + .execute() + .await + .unwrap(); + + let err = client + .query("SELECT ?fields FROM test") + .fetch_all::() + .await + .unwrap_err(); + + assert!(matches!(err, Error::NotEnoughData)); +}