Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): fix input format CSV #6524

Merged
merged 8 commits into from Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 5 additions & 97 deletions common/datavalues/src/types/deserializations/string.rs
Expand Up @@ -111,106 +111,14 @@ impl TypeDeserializer for StringDeserializer {
reader: &mut NestedCheckpointReader<R>,
settings: &FormatSettings,
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;
if read_buffer.is_empty() {
self.buffer.clear();
reader.read_csv_string(&mut self.buffer, settings)?;
if self.buffer.is_empty() {
self.builder.append_default();
return Ok(());
}

let maybe_quote = read_buffer[0];
if maybe_quote == b'\'' || maybe_quote == b'"' {
self.buffer.clear();
reader.read_quoted_text(&mut self.buffer, maybe_quote)?;
self.builder.append_value(self.buffer.as_slice());
Ok(())
} else {
// Unquoted case. Look for field_delimiter or record_delimiter.
let mut field_delimiter = b',';

if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}
if maybe_quote == field_delimiter || maybe_quote == b'\r' || maybe_quote == b'\n' {
self.builder.append_default();
return Ok(());
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
|| settings.record_delimiter[0] == b'\n'
{
let mut index = 0;
let mut bytes = 0;

'outer1: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == b'\r'
|| read_buffer[index] == b'\n'
{
break 'outer1;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer1;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
} else {
let record_delimiter = settings.record_delimiter[0];

let mut index = 0;
let mut bytes = 0;

'outer2: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == record_delimiter
{
break 'outer2;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer2;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
}

Ok(())
self.builder.append_value(self.buffer.as_slice());
}
Ok(())
}

fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> {
Expand Down
39 changes: 21 additions & 18 deletions common/formats/src/format_csv.rs
Expand Up @@ -20,7 +20,6 @@ use common_datavalues::DataType;
use common_datavalues::TypeDeserializer;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::position2;
use common_io::prelude::position4;
use common_io::prelude::BufferReadExt;
use common_io::prelude::FormatSettings;
Expand All @@ -32,11 +31,13 @@ use crate::InputFormat;
use crate::InputState;

pub struct CsvInputState {
// quotes == 0u8 means not in quote
pub quotes: u8,
pub memory: Vec<u8>,
pub accepted_rows: usize,
pub accepted_bytes: usize,
pub need_more_data: bool,
// used to ignore \n in \r\n
pub ignore_if_first: Option<u8>,
}

Expand Down Expand Up @@ -119,14 +120,13 @@ impl CsvInputFormat {
}))
}

fn find_quotes(buf: &[u8], pos: usize, state: &mut CsvInputState) -> usize {
let index = pos + position2::<true, b'"', b'\''>(&buf[pos..]);

if index != buf.len() {
state.quotes = 0;
return index + 1;
fn find_quote(buf: &[u8], pos: usize, state: &mut CsvInputState, quote: u8) -> usize {
for (index, item) in buf.iter().enumerate().skip(pos) {
if *item == quote {
state.quotes = 0;
return index + 1;
}
}

buf.len()
}

Expand All @@ -146,14 +146,17 @@ impl CsvInputFormat {
let position = pos + position4::<true, b'"', b'\'', b'\r', b'\n'>(&buf[pos..]);

if position != buf.len() {
if buf[position] == b'"' || buf[position] == b'\'' {
state.quotes = buf[position];
return position + 1;
} else if buf[position] == b'\r' {
return self.accept_row::<b'\n'>(buf, pos, state, position);
} else if buf[position] == b'\n' {
return self.accept_row::<b'\r'>(buf, pos, state, position);
}
return match buf[position] {
b'"' | b'\'' => {
state.quotes = buf[position];
position + 1
}
b'\n' => self.accept_row::<0>(buf, pos, state, position),
_ => {
// b'\r'
self.accept_row::<b'\n'>(buf, pos, state, position)
}
};
}
}

Expand Down Expand Up @@ -284,7 +287,7 @@ impl InputFormat for CsvInputFormat {
state.need_more_data = true;
while index < buf.len() && state.need_more_data {
index = match state.quotes != 0 {
true => Self::find_quotes(buf, index, state),
true => Self::find_quote(buf, index, state, state.quotes),
false => self.find_delimiter(buf, index, state),
}
}
Expand All @@ -300,7 +303,7 @@ impl InputFormat for CsvInputFormat {

while index < buf.len() {
index = match state.quotes != 0 {
true => Self::find_quotes(buf, index, state),
true => Self::find_quote(buf, index, state, state.quotes),
false => self.find_delimiter(buf, index, state),
};

Expand Down
3 changes: 1 addition & 2 deletions common/formats/tests/it/format_csv.rs
Expand Up @@ -31,7 +31,6 @@ fn test_accepted_multi_lines() -> Result<()> {
assert_complete_line("first,second\n")?;
assert_complete_line("first,second\r")?;
assert_complete_line("first,second\r\n")?;
assert_complete_line("first,second\n\r")?;
assert_complete_line("first,\"\n\"second\n")?;
assert_complete_line("first,\"\r\"second\n")?;

Expand Down Expand Up @@ -97,7 +96,7 @@ fn test_deserialize_multi_lines() -> Result<()> {
let mut csv_input_state = csv_input_format.create_state();

csv_input_format.read_buf(
"1,\"{\\\"second\\\" : 33}\"\n".as_bytes(),
"1,\"{\"\"second\"\" : 33}\"\n".as_bytes(),
&mut csv_input_state,
)?;
assert_blocks_eq(
Expand Down
138 changes: 138 additions & 0 deletions common/io/src/buffer/buffer_read_string_ext.rs
@@ -0,0 +1,138 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;

use super::BufferRead;
use crate::prelude::FormatSettings;

pub trait BufferReadStringExt: BufferRead {
fn read_csv_string(&mut self, buf: &mut Vec<u8>, settings: &FormatSettings) -> Result<()>;
}

impl<R> BufferReadStringExt for R
where R: BufferRead
{
fn read_csv_string(&mut self, buf: &mut Vec<u8>, settings: &FormatSettings) -> Result<()> {
let read_buffer = self.fill_buf()?;
if read_buffer.is_empty() {
return Ok(());
}
let maybe_quote = read_buffer[0];
if maybe_quote == b'\'' || maybe_quote == b'"' {
self.consume(1);
let mut maybe_end = false;
loop {
let read_buffer = self.fill_buf()?;
if read_buffer.is_empty() {
// TODO(youngsofun): to be strict, the string should ends with maybe_quote too
return Ok(());
} else {
let mut index = 0;
if maybe_end {
if read_buffer[0] == maybe_quote {
index = 1;
} else {
return Ok(());
}
}
while index < read_buffer.len() && read_buffer[index] != maybe_quote {
index += 1;
}
buf.extend_from_slice(&read_buffer[0..index]);
if index == read_buffer.len() {
self.consume(index);
maybe_end = false;
} else {
self.consume(index + 1);
maybe_end = true;
}
}
}
} else {
// Unquoted case. Look for field_delimiter or record_delimiter.
let mut field_delimiter = b',';

if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}
if maybe_quote == field_delimiter {
return Ok(());
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
|| settings.record_delimiter[0] == b'\n'
{
let mut index = 0;
let mut read_buffer = self.fill_buf()?;

'outer1: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == b'\r'
|| read_buffer[index] == b'\n'
{
break 'outer1;
}
index += 1;
}

buf.extend_from_slice(&read_buffer[..index]);
self.consume(index);

index = 0;
read_buffer = self.fill_buf()?;

if read_buffer.is_empty() {
break 'outer1;
}
}

buf.extend_from_slice(&read_buffer[..index]);
self.consume(index);
} else {
let record_delimiter = settings.record_delimiter[0];
let mut read_buffer = self.fill_buf()?;

let mut index = 0;

'outer2: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == record_delimiter
{
break 'outer2;
}
index += 1;
}

buf.extend_from_slice(&read_buffer[..index]);
self.consume(index);

index = 0;
read_buffer = self.fill_buf()?;

if read_buffer.is_empty() {
break 'outer2;
}
}

buf.extend_from_slice(&read_buffer[..index]);
self.consume(index);
}
Ok(())
}
}
}
2 changes: 2 additions & 0 deletions common/io/src/buffer/mod.rs
Expand Up @@ -17,6 +17,7 @@ mod buffer_read;
mod buffer_read_datetime_ext;
mod buffer_read_ext;
mod buffer_read_number_ext;
mod buffer_read_string_ext;
mod buffer_reader;
mod checkpoint_read;
mod nested_checkpoint_reader;
Expand All @@ -26,6 +27,7 @@ pub use buffer_read::*;
pub use buffer_read_datetime_ext::*;
pub use buffer_read_ext::*;
pub use buffer_read_number_ext::*;
pub use buffer_read_string_ext::BufferReadStringExt;
pub use buffer_reader::*;
pub use checkpoint_read::*;
pub use nested_checkpoint_reader::NestedCheckpointReader;
1 change: 1 addition & 0 deletions common/io/src/prelude.rs
Expand Up @@ -23,6 +23,7 @@ pub use crate::buffer::BufferRead;
pub use crate::buffer::BufferReadDateTimeExt;
pub use crate::buffer::BufferReadExt;
pub use crate::buffer::BufferReadNumberExt;
pub use crate::buffer::BufferReadStringExt;
pub use crate::buffer::BufferReader;
pub use crate::buffer::CheckpointRead;
pub use crate::buffer::MemoryReader;
Expand Down