From 48eb1b68d4a5a6e223e007a6e8fb3a3f70ee9fe3 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 13 Mar 2023 17:39:28 +0800 Subject: [PATCH 1/2] fix(query): fix insert nested data types with expr --- .../src/cursor_ext/cursor_read_number_ext.rs | 6 ++ .../formats/src/field_decoder/fast_values.rs | 66 +++++++++++++++---- src/query/formats/src/field_decoder/tsv.rs | 1 - src/query/formats/src/field_decoder/values.rs | 1 - .../src/interpreters/interpreter_insert.rs | 2 + .../base/03_common/03_0023_insert_into_array | 10 +-- .../base/03_common/03_0026_insert_into_tuple | 2 +- .../base/03_common/03_0037_insert_into_map | 10 +-- 8 files changed, 75 insertions(+), 23 deletions(-) diff --git a/src/common/io/src/cursor_ext/cursor_read_number_ext.rs b/src/common/io/src/cursor_ext/cursor_read_number_ext.rs index d3c0b5ec7981c..f5e033daf5062 100644 --- a/src/common/io/src/cursor_ext/cursor_read_number_ext.rs +++ b/src/common/io/src/cursor_ext/cursor_read_number_ext.rs @@ -103,6 +103,9 @@ where B: AsRef<[u8]> fn read_int_text(&mut self) -> Result { let buf = self.remaining_slice(); let (n_in, n_out) = collect_number(buf); + if n_in == 0 { + return Err(ErrorCode::BadBytes("number is not exist")); + } let n = read_num_text_exact(&buf[..n_out])?; self.consume(n_in); Ok(n) @@ -110,6 +113,9 @@ where B: AsRef<[u8]> fn read_float_text(&mut self) -> Result { let (n_in, n_out) = collect_number(self.remaining_slice()); + if n_in == 0 { + return Err(ErrorCode::BadBytes("number is not exist")); + } let n = read_num_text_exact(&self.remaining_slice()[..n_out])?; self.consume(n_in); Ok(n) diff --git a/src/query/formats/src/field_decoder/fast_values.rs b/src/query/formats/src/field_decoder/fast_values.rs index 4fa7304ed80d1..64c654e154b60 100644 --- a/src/query/formats/src/field_decoder/fast_values.rs +++ b/src/query/formats/src/field_decoder/fast_values.rs @@ -99,6 +99,12 @@ impl FastFieldDecoderValues { } } + fn pop_inner_values(&self, column: &mut ColumnBuilder, size: usize) { + for _ in 0..size { + let _ = column.pop(); + } + } + pub fn read_field>( &self, column: &mut ColumnBuilder, @@ -285,10 +291,16 @@ impl FastFieldDecoderValues { break; } if idx != 0 { - reader.must_ignore_byte(b',')?; + if let Err(err) = reader.must_ignore_byte(b',') { + self.pop_inner_values(&mut column.builder, idx); + return Err(err.into()); + } } let _ = reader.ignore_white_spaces(); - self.read_field(&mut column.builder, reader, positions)?; + if let Err(err) = self.read_field(&mut column.builder, reader, positions) { + self.pop_inner_values(&mut column.builder, idx); + return Err(err); + } } column.commit_row(); Ok(()) @@ -311,14 +323,23 @@ impl FastFieldDecoderValues { break; } if idx != 0 { - reader.must_ignore_byte(b',')?; + if let Err(err) = reader.must_ignore_byte(b',') { + self.pop_inner_values(&mut map_builder[KEY], idx); + self.pop_inner_values(&mut map_builder[VALUE], idx); + return Err(err.into()); + } } let _ = reader.ignore_white_spaces(); - self.read_field(&mut map_builder[KEY], reader, positions)?; + if let Err(err) = self.read_field(&mut map_builder[KEY], reader, positions) { + self.pop_inner_values(&mut map_builder[KEY], idx); + self.pop_inner_values(&mut map_builder[VALUE], idx); + return Err(err); + } // check duplicate map keys let key = map_builder[KEY].pop().unwrap(); if set.contains(&key) { - column.commit_row(); + self.pop_inner_values(&mut map_builder[KEY], idx); + self.pop_inner_values(&mut map_builder[VALUE], idx); return Err(ErrorCode::BadBytes( "map keys have to be unique".to_string(), )); @@ -326,9 +347,17 @@ impl FastFieldDecoderValues { set.insert(key.clone()); map_builder[KEY].push(key.as_ref()); let _ = reader.ignore_white_spaces(); - reader.must_ignore_byte(b':')?; + if let Err(err) = reader.must_ignore_byte(b':') { + self.pop_inner_values(&mut map_builder[KEY], idx + 1); + self.pop_inner_values(&mut map_builder[VALUE], idx); + return Err(err.into()); + } let _ = reader.ignore_white_spaces(); - self.read_field(&mut map_builder[VALUE], reader, positions)?; + if let Err(err) = self.read_field(&mut map_builder[VALUE], reader, positions) { + self.pop_inner_values(&mut map_builder[KEY], idx + 1); + self.pop_inner_values(&mut map_builder[VALUE], idx); + return Err(err); + } } column.commit_row(); Ok(()) @@ -341,15 +370,30 @@ impl FastFieldDecoderValues { positions: &mut VecDeque, ) -> Result<()> { reader.must_ignore_byte(b'(')?; - for (idx, field) in fields.iter_mut().enumerate() { + for idx in 0..fields.len() { let _ = reader.ignore_white_spaces(); if idx != 0 { - reader.must_ignore_byte(b',')?; + if let Err(err) = reader.must_ignore_byte(b',') { + for field in fields.iter_mut().take(idx) { + self.pop_inner_values(field, 1); + } + return Err(err.into()); + } } let _ = reader.ignore_white_spaces(); - self.read_field(field, reader, positions)?; + if let Err(err) = self.read_field(&mut fields[idx], reader, positions) { + for field in fields.iter_mut().take(idx) { + self.pop_inner_values(field, 1); + } + return Err(err); + } + } + if let Err(err) = reader.must_ignore_byte(b')') { + for field in fields.iter_mut() { + self.pop_inner_values(field, 1); + } + return Err(err.into()); } - reader.must_ignore_byte(b')')?; Ok(()) } diff --git a/src/query/formats/src/field_decoder/tsv.rs b/src/query/formats/src/field_decoder/tsv.rs index a964b68d191e7..8bae475fa1a58 100644 --- a/src/query/formats/src/field_decoder/tsv.rs +++ b/src/query/formats/src/field_decoder/tsv.rs @@ -143,7 +143,6 @@ impl FieldDecoderRowBased for FieldDecoderTSV { // check duplicate map keys let key = map_builder[KEY].pop().unwrap(); if set.contains(&key) { - column.commit_row(); return Err(ErrorCode::BadBytes( "map keys have to be unique".to_string(), )); diff --git a/src/query/formats/src/field_decoder/values.rs b/src/query/formats/src/field_decoder/values.rs index b98a8edfed4a1..98c2ec8397279 100644 --- a/src/query/formats/src/field_decoder/values.rs +++ b/src/query/formats/src/field_decoder/values.rs @@ -173,7 +173,6 @@ impl FieldDecoderRowBased for FieldDecoderValues { // check duplicate map keys let key = map_builder[KEY].pop().unwrap(); if set.contains(&key) { - column.commit_row(); return Err(ErrorCode::BadBytes( "map keys have to be unique".to_string(), )); diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index 7349e43403b5e..5a8855d490ce5 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -670,6 +670,8 @@ impl ValueSource { for col in columns.iter_mut().take(pop_count) { col.pop(); } + // rollback to start position of the row + reader.rollback(start_pos_of_row + 1); skip_to_next_row(reader, 1)?; let end_pos_of_row = reader.position(); diff --git a/tests/sqllogictests/suites/base/03_common/03_0023_insert_into_array b/tests/sqllogictests/suites/base/03_common/03_0023_insert_into_array index 13e03125a24df..228127ac19532 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0023_insert_into_array +++ b/tests/sqllogictests/suites/base/03_common/03_0023_insert_into_array @@ -324,21 +324,21 @@ statement ok CREATE TABLE t15(id Int, arr Array(String)) Engine = Fuse statement ok -INSERT INTO t15 (id, arr) VALUES(1, ['aa', 'bb']), (2, ['cc', 'dd']), (3, ['ee', 'ff']) +INSERT INTO t15 (id, arr) VALUES(1, ['aa', 'bb']), (2, ['cc', 'dd']), (3, [12, 34]) query IT select * from t15 ---- 1 ['aa','bb'] 2 ['cc','dd'] -3 ['ee','ff'] +3 ['12','34'] query TT select arr[1], arr[2] from t15 ---- aa bb cc dd -ee ff +12 34 statement ok drop table if exists t16 @@ -347,19 +347,21 @@ statement ok CREATE TABLE t16(id Int, arr Array(Int64)) Engine = Fuse statement ok -INSERT INTO t16 (id, arr) VALUES(1, [1,2,3,4]), (2, [5,6,7,8]) +INSERT INTO t16 (id, arr) VALUES(1, [1,2,3,4]), (2, [5,6,7,8]), (3, ['9','10','11']) query IT select * from t16 ---- 1 [1,2,3,4] 2 [5,6,7,8] +3 [9,10,11] query II select arr[1], arr[2] from t16 ---- 1 2 5 6 +9 10 query II select arr[1], arr[2] from t16 where arr[2] = 6 order by arr[3] desc diff --git a/tests/sqllogictests/suites/base/03_common/03_0026_insert_into_tuple b/tests/sqllogictests/suites/base/03_common/03_0026_insert_into_tuple index 93ffc11a4ff61..372d78a7fa6a5 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0026_insert_into_tuple +++ b/tests/sqllogictests/suites/base/03_common/03_0026_insert_into_tuple @@ -47,7 +47,7 @@ statement ok CREATE TABLE IF NOT EXISTS t3(id Int, t Tuple(a Tuple(m Int64, n Int64), b Tuple(x Int64, y Int64))) Engine = Fuse statement ok -INSERT INTO t3 (id, t) VALUES(1, ((10, 11), (20, 21))), (2, ((30, 31), (40, 41))) +INSERT INTO t3 (id, t) VALUES(1, ((10, 11), (20, 21))), (2, (('30', '31'), (40, 41))) query IT select * from t3 diff --git a/tests/sqllogictests/suites/base/03_common/03_0037_insert_into_map b/tests/sqllogictests/suites/base/03_common/03_0037_insert_into_map index fb4f1826411e7..eda175f78b93f 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0037_insert_into_map +++ b/tests/sqllogictests/suites/base/03_common/03_0037_insert_into_map @@ -11,26 +11,26 @@ statement ok CREATE TABLE IF NOT EXISTS t1(id Int, m Map(Int64, String)) Engine = Fuse statement ok -INSERT INTO t1 (id, m) VALUES(1, {100:'abc',200:'def'}),(2, {300:'mn'}),(3, {}); +INSERT INTO t1 (id, m) VALUES(1, {100:'abc',200:'def'}),(2, {'300':123}),(3, {}); query IT select * from t1 ---- 1 {100:'abc',200:'def'} -2 {300:'mn'} +2 {300:'123'} 3 {} query TTTT select m[100], m[200], m[300], m[400] from t1 ---- abc def NULL NULL -NULL NULL mn NULL +NULL NULL 123 NULL NULL NULL NULL NULL query IT -select * from t1 where m[300] = 'mn' +select * from t1 where m[300] = '123' ---- -2 {300:'mn'} +2 {300:'123'} statement error 1001 INSERT INTO t1 (id, m) VALUES(1, {100:'k1',100:'k2'}) From 9aaaa2da5e1464d4871d4c1078b0e5c17c4db869 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 13 Mar 2023 23:10:54 +0800 Subject: [PATCH 2/2] fix tests --- .../suites/1_stateful/00_copy/00_0002_copy_from_fs_on_error.sh | 2 +- .../1_stateful/05_formats/05_02_csv/05_02_02_csv_spaces.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_on_error.sh b/tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_on_error.sh index bfd7449aceea1..7a3c0a1340632 100755 --- a/tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_on_error.sh +++ b/tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_on_error.sh @@ -51,7 +51,7 @@ echo "truncate table wrong_csv" | $MYSQL_CLIENT_CONNECT WRONG_CSV="COPY INTO wrong_csv FROM 'fs://${DATADIR}/wrong_sample.csv' FILE_FORMAT = (type = CSV field_delimiter = ',' record_delimiter = '\n' skip_header = 0) ON_ERROR=abort_2" -echo "$WRONG_CSV" | $MYSQL_CLIENT_CONNECT 2>&1 | grep -c "bad field end" +echo "$WRONG_CSV" | $MYSQL_CLIENT_CONNECT 2>&1 | grep -c "fail to decode column" echo "select count(1) from wrong_csv" | $MYSQL_CLIENT_CONNECT echo "truncate table wrong_csv" | $MYSQL_CLIENT_CONNECT diff --git a/tests/suites/1_stateful/05_formats/05_02_csv/05_02_02_csv_spaces.sh b/tests/suites/1_stateful/05_formats/05_02_csv/05_02_02_csv_spaces.sh index ee8e56a7fc4ff..2d5106bc03365 100755 --- a/tests/suites/1_stateful/05_formats/05_02_csv/05_02_02_csv_spaces.sh +++ b/tests/suites/1_stateful/05_formats/05_02_csv/05_02_02_csv_spaces.sh @@ -40,7 +40,7 @@ EOF curl -H "insert_sql:insert into test_csv file_format = (type = CSV)" -F "upload=@/tmp/whitespace.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1 echo "select * from test_csv" | $MYSQL_CLIENT_CONNECT -curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "bad field" +curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "fail to decode column" curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number2.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "bad field" echo "drop table if exists test_csv" | $MYSQL_CLIENT_CONNECT