Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/common/io/src/cursor_ext/cursor_read_number_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,19 @@ where B: AsRef<[u8]>
fn read_int_text<T: FromLexical>(&mut self) -> Result<T> {
let buf = self.remaining_slice();
let (n_in, n_out) = collect_number(buf);
if n_in == 0 {
return Err(ErrorCode::BadBytes("number is not exist"));
}
let n = read_num_text_exact(&buf[..n_out])?;
self.consume(n_in);
Ok(n)
}

fn read_float_text<T: FromLexical>(&mut self) -> Result<T> {
let (n_in, n_out) = collect_number(self.remaining_slice());
if n_in == 0 {
return Err(ErrorCode::BadBytes("number is not exist"));
}
let n = read_num_text_exact(&self.remaining_slice()[..n_out])?;
self.consume(n_in);
Ok(n)
Expand Down
66 changes: 55 additions & 11 deletions src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ impl FastFieldDecoderValues {
}
}

fn pop_inner_values(&self, column: &mut ColumnBuilder, size: usize) {
for _ in 0..size {
let _ = column.pop();
}
}

pub fn read_field<R: AsRef<[u8]>>(
&self,
column: &mut ColumnBuilder,
Expand Down Expand Up @@ -285,10 +291,16 @@ impl FastFieldDecoderValues {
break;
}
if idx != 0 {
reader.must_ignore_byte(b',')?;
if let Err(err) = reader.must_ignore_byte(b',') {
self.pop_inner_values(&mut column.builder, idx);
return Err(err.into());
}
}
let _ = reader.ignore_white_spaces();
self.read_field(&mut column.builder, reader, positions)?;
if let Err(err) = self.read_field(&mut column.builder, reader, positions) {
self.pop_inner_values(&mut column.builder, idx);
return Err(err);
}
}
column.commit_row();
Ok(())
Expand All @@ -311,24 +323,41 @@ impl FastFieldDecoderValues {
break;
}
if idx != 0 {
reader.must_ignore_byte(b',')?;
if let Err(err) = reader.must_ignore_byte(b',') {
self.pop_inner_values(&mut map_builder[KEY], idx);
self.pop_inner_values(&mut map_builder[VALUE], idx);
return Err(err.into());
}
}
let _ = reader.ignore_white_spaces();
self.read_field(&mut map_builder[KEY], reader, positions)?;
if let Err(err) = self.read_field(&mut map_builder[KEY], reader, positions) {
self.pop_inner_values(&mut map_builder[KEY], idx);
self.pop_inner_values(&mut map_builder[VALUE], idx);
return Err(err);
}
// check duplicate map keys
let key = map_builder[KEY].pop().unwrap();
if set.contains(&key) {
column.commit_row();
self.pop_inner_values(&mut map_builder[KEY], idx);
self.pop_inner_values(&mut map_builder[VALUE], idx);
return Err(ErrorCode::BadBytes(
"map keys have to be unique".to_string(),
));
}
set.insert(key.clone());
map_builder[KEY].push(key.as_ref());
let _ = reader.ignore_white_spaces();
reader.must_ignore_byte(b':')?;
if let Err(err) = reader.must_ignore_byte(b':') {
self.pop_inner_values(&mut map_builder[KEY], idx + 1);
self.pop_inner_values(&mut map_builder[VALUE], idx);
return Err(err.into());
}
let _ = reader.ignore_white_spaces();
self.read_field(&mut map_builder[VALUE], reader, positions)?;
if let Err(err) = self.read_field(&mut map_builder[VALUE], reader, positions) {
self.pop_inner_values(&mut map_builder[KEY], idx + 1);
self.pop_inner_values(&mut map_builder[VALUE], idx);
return Err(err);
}
}
column.commit_row();
Ok(())
Expand All @@ -341,15 +370,30 @@ impl FastFieldDecoderValues {
positions: &mut VecDeque<usize>,
) -> Result<()> {
reader.must_ignore_byte(b'(')?;
for (idx, field) in fields.iter_mut().enumerate() {
for idx in 0..fields.len() {
let _ = reader.ignore_white_spaces();
if idx != 0 {
reader.must_ignore_byte(b',')?;
if let Err(err) = reader.must_ignore_byte(b',') {
for field in fields.iter_mut().take(idx) {
self.pop_inner_values(field, 1);
}
return Err(err.into());
}
}
let _ = reader.ignore_white_spaces();
self.read_field(field, reader, positions)?;
if let Err(err) = self.read_field(&mut fields[idx], reader, positions) {
for field in fields.iter_mut().take(idx) {
self.pop_inner_values(field, 1);
}
return Err(err);
}
}
if let Err(err) = reader.must_ignore_byte(b')') {
for field in fields.iter_mut() {
self.pop_inner_values(field, 1);
}
return Err(err.into());
}
reader.must_ignore_byte(b')')?;
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion src/query/formats/src/field_decoder/tsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ impl FieldDecoderRowBased for FieldDecoderTSV {
// check duplicate map keys
let key = map_builder[KEY].pop().unwrap();
if set.contains(&key) {
column.commit_row();
return Err(ErrorCode::BadBytes(
"map keys have to be unique".to_string(),
));
Expand Down
1 change: 0 additions & 1 deletion src/query/formats/src/field_decoder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ impl FieldDecoderRowBased for FieldDecoderValues {
// check duplicate map keys
let key = map_builder[KEY].pop().unwrap();
if set.contains(&key) {
column.commit_row();
return Err(ErrorCode::BadBytes(
"map keys have to be unique".to_string(),
));
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ impl ValueSource {
for col in columns.iter_mut().take(pop_count) {
col.pop();
}
// rollback to start position of the row
reader.rollback(start_pos_of_row + 1);
skip_to_next_row(reader, 1)?;
let end_pos_of_row = reader.position();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,21 +324,21 @@ statement ok
CREATE TABLE t15(id Int, arr Array(String)) Engine = Fuse

statement ok
INSERT INTO t15 (id, arr) VALUES(1, ['aa', 'bb']), (2, ['cc', 'dd']), (3, ['ee', 'ff'])
INSERT INTO t15 (id, arr) VALUES(1, ['aa', 'bb']), (2, ['cc', 'dd']), (3, [12, 34])

query IT
select * from t15
----
1 ['aa','bb']
2 ['cc','dd']
3 ['ee','ff']
3 ['12','34']

query TT
select arr[1], arr[2] from t15
----
aa bb
cc dd
ee ff
12 34

statement ok
drop table if exists t16
Expand All @@ -347,19 +347,21 @@ statement ok
CREATE TABLE t16(id Int, arr Array(Int64)) Engine = Fuse

statement ok
INSERT INTO t16 (id, arr) VALUES(1, [1,2,3,4]), (2, [5,6,7,8])
INSERT INTO t16 (id, arr) VALUES(1, [1,2,3,4]), (2, [5,6,7,8]), (3, ['9','10','11'])

query IT
select * from t16
----
1 [1,2,3,4]
2 [5,6,7,8]
3 [9,10,11]

query II
select arr[1], arr[2] from t16
----
1 2
5 6
9 10

query II
select arr[1], arr[2] from t16 where arr[2] = 6 order by arr[3] desc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ statement ok
CREATE TABLE IF NOT EXISTS t3(id Int, t Tuple(a Tuple(m Int64, n Int64), b Tuple(x Int64, y Int64))) Engine = Fuse

statement ok
INSERT INTO t3 (id, t) VALUES(1, ((10, 11), (20, 21))), (2, ((30, 31), (40, 41)))
INSERT INTO t3 (id, t) VALUES(1, ((10, 11), (20, 21))), (2, (('30', '31'), (40, 41)))

query IT
select * from t3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ statement ok
CREATE TABLE IF NOT EXISTS t1(id Int, m Map(Int64, String)) Engine = Fuse

statement ok
INSERT INTO t1 (id, m) VALUES(1, {100:'abc',200:'def'}),(2, {300:'mn'}),(3, {});
INSERT INTO t1 (id, m) VALUES(1, {100:'abc',200:'def'}),(2, {'300':123}),(3, {});

query IT
select * from t1
----
1 {100:'abc',200:'def'}
2 {300:'mn'}
2 {300:'123'}
3 {}

query TTTT
select m[100], m[200], m[300], m[400] from t1
----
abc def NULL NULL
NULL NULL mn NULL
NULL NULL 123 NULL
NULL NULL NULL NULL

query IT
select * from t1 where m[300] = 'mn'
select * from t1 where m[300] = '123'
----
2 {300:'mn'}
2 {300:'123'}

statement error 1001
INSERT INTO t1 (id, m) VALUES(1, {100:'k1',100:'k2'})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ echo "truncate table wrong_csv" | $MYSQL_CLIENT_CONNECT

WRONG_CSV="COPY INTO wrong_csv FROM 'fs://${DATADIR}/wrong_sample.csv' FILE_FORMAT = (type = CSV field_delimiter = ',' record_delimiter = '\n' skip_header = 0) ON_ERROR=abort_2"

echo "$WRONG_CSV" | $MYSQL_CLIENT_CONNECT 2>&1 | grep -c "bad field end"
echo "$WRONG_CSV" | $MYSQL_CLIENT_CONNECT 2>&1 | grep -c "fail to decode column"
echo "select count(1) from wrong_csv" | $MYSQL_CLIENT_CONNECT
echo "truncate table wrong_csv" | $MYSQL_CLIENT_CONNECT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ EOF
curl -H "insert_sql:insert into test_csv file_format = (type = CSV)" -F "upload=@/tmp/whitespace.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
echo "select * from test_csv" | $MYSQL_CLIENT_CONNECT

curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "bad field"
curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "fail to decode column"
curl -s -H "insert_sql:insert into test_csv_number file_format = (type = CSV)" -F "upload=@/tmp/whitespace_number2.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "bad field"

echo "drop table if exists test_csv" | $MYSQL_CLIENT_CONNECT
Expand Down