Skip to content

Commit

Permalink
support for multi-char delimiters/quotes/escapes (#108), WIP support …
Browse files Browse the repository at this point in the history
…accross ReadBuffer borders
  • Loading branch information
Tania Bogatsch committed Oct 7, 2019
1 parent 2f3fd10 commit 42c4e42
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 56 deletions.
147 changes: 131 additions & 16 deletions src/execution/operator/persistent/buffered_csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <algorithm>
#include <fstream>
#include <queue>

using namespace duckdb;
using namespace std;
Expand All @@ -34,6 +35,66 @@ BufferedCSVReader::BufferedCSVReader(CopyInfo &info, vector<SQLType> sql_types,
}
}

bool BufferedCSVReader::MatchControlString(bool &delim_str, bool &quote_str, bool &escape_str, index_t &delim_l, index_t &quote_l, index_t & escape_l) {
index_t tmp_position = position;
index_t control_string_offset = 0;

bool delim = true;
bool quote = true;
bool escape = true;

while (true) {

// check if the delimiter string matches
if (delim && control_string_offset < delim_l) {
if (buffer[tmp_position] != info.delimiter[control_string_offset]) {
delim = false;
} else {
if (control_string_offset == delim_l - 1) {
delim = false;
delim_str = true;
}
}
}

// check if the quote string matches
if (quote && control_string_offset < quote_l) {
if (buffer[tmp_position] != info.quote[control_string_offset]) {
quote = false;
} else {
if (control_string_offset == quote_l - 1) {
quote = false;
quote_str = true;
}
}
}

// check if the escape matches
if (escape && control_string_offset < escape_l) {
if (buffer[tmp_position] != info.escape[control_string_offset]) {
escape = false;
} else {
if (control_string_offset == escape_l - 1) {
escape = false;
escape_str = true;
}
}
}

if (!delim && !quote && !escape) {
return false;
}

tmp_position++;
control_string_offset++;

// make sure not to exceed buffer size, and return if there cannot be any further control strings
if (tmp_position >= buffer_size) {
return true;
}
}
}

void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
cached_buffers.clear();

Expand All @@ -43,6 +104,16 @@ void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
bool finished_chunk = false;
bool seen_escape = true;

// used for fast control sequence detection
bool delimiter = false;
bool quote = false;
bool escape = false;
index_t delim_l = info.delimiter.length();
index_t quote_l = info.quote.length();
index_t escape_l = info.escape.length();
std::queue<index_t> escape_positions;
bool read_whole_buffer;

if (position >= buffer_size) {
if (!ReadBuffer(start)) {
return;
Expand All @@ -54,40 +125,49 @@ void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
if (finished_chunk) {
return;
}

MatchControlString(delimiter, quote, escape, delim_l, quote_l, escape_l);

if (in_quotes) {
if (buffer[position] == info.escape) {
if (escape) {
seen_escape = true;
// FIXME this is only part of the deal, we also need to zap the escapes below
position += escape_l - 1;
}
else if (!seen_escape) {
if (buffer[position] == info.quote) {
// end quote
offset = 1;
if (quote) {
// found quote without escape, end quote
offset = quote_l;
position += quote_l - 1;
in_quotes = false;
}
} else if (seen_escape && quote) {
escape_positions.push(position);
seen_escape = false;
} else {
seen_escape = false;
}

} else {
if (buffer[position] == info.quote) {
if (quote) {
// start quotes can only occur at the start of a field
if (position == start) {
// increment start by 1
start++;
// increment start by quote length
start += quote_l;
position += quote_l - 1;
// read until we encounter a quote again
in_quotes = true;
}
} else if (buffer[position] == info.delimiter) {
} else if (delimiter) {
// encountered delimiter
AddValue(buffer.get() + start, position - start - offset, column);
start = position + 1;
AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
start = position + delim_l;
position += delim_l - 1;
offset = 0;
}
if (is_newline(buffer[position]) || (source.eof() && position + 1 == buffer_size)) {
char newline = buffer[position];
// encountered a newline, add the current value and push the row
AddValue(buffer.get() + start, position - start - offset, column);
AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
finished_chunk = AddRow(insert_chunk, column);

// move to the next character
Expand Down Expand Up @@ -117,13 +197,25 @@ void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
}

position++;
if (position >= buffer_size) {
if (position >= buffer_size && !read_whole_buffer) {
// exhausted the buffer
if (!ReadBuffer(start)) {
break;
}
}
} else if (position >= buffer_size && read_whole_buffer) {
break;
} /* else if (position >= buffer_size - delim_l || position >= buffer_size - quote_l || position >= buffer_size - escape_l) {
if (!ReadBuffer(start)) {
read_whole_buffer = true;
}
} */

// reset values
delimiter = false;
quote = false;
escape = false;
}

if (in_quotes) {
throw ParserException("Error on line %lld: unterminated quotes", linenr);
}
Expand Down Expand Up @@ -161,7 +253,11 @@ bool BufferedCSVReader::ReadBuffer(index_t &start) {
return read_count > 0;
}

void BufferedCSVReader::AddValue(char *str_val, index_t length, index_t &column) {
void BufferedCSVReader::AddValue(char *str_val, index_t length, index_t &column, std::queue<index_t> &escape_positions) {
// used to remove escape characters
index_t pos = start;
bool in_escape = false;

if (column == sql_types.size() && length == 0) {
// skip a single trailing delimiter
column++;
Expand All @@ -179,7 +275,26 @@ void BufferedCSVReader::AddValue(char *str_val, index_t length, index_t &column)
if (info.null_str == str_val) {
parse_chunk.data[column].nullmask[row_entry] = true;
} else {
// no null value
// remove escape(s)
if (!escape_positions.empty()) {
string new_val = "";
for (const char *val = str_val; *val; val++) {
if (escape_positions.front() == pos) {
in_escape = false;
escape_positions.pop();
}
if (escape_positions.front() - info.escape.length() == pos) {
in_escape = true;
}
if (!in_escape) {
new_val += *val;
}
pos++;
}
strcpy(str_val, new_val.c_str());
}

// test for valid utf-8 string
if (!Value::IsUTF8String(str_val)) {
throw ParserException("Error on line %lld: file is not valid UTF8", linenr);
}
Expand Down
29 changes: 21 additions & 8 deletions src/execution/operator/persistent/physical_copy_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,37 +54,50 @@ class BufferedWriter {
ofstream to_csv;
};

static void WriteQuotedString(BufferedWriter &writer, const char *str_value, char delimiter, char quote) {
// scan the string for the delimiter
static void WriteQuotedString(BufferedWriter &writer, const char *str_value, string delimiter, string quote) {
// scan the string for the delimiter and for a newline
bool write_quoted = false;

// FIXME: check for delimiter in string
index_t len = 0;
// check for newline in string
for (const char *val = str_value; *val; val++) {
len++;
if (*val == delimiter || *val == '\n' || *val == '\r') {
// delimiter or newline, write a quoted string
if (*val == '\n' || *val == '\r') {
// newline, write a quoted string
write_quoted = true;
}
}

if (!write_quoted) {
writer.Write(str_value, len);
} else {
writer.Write(&quote, 1);
const char *quote_cstr = quote.c_str();
index_t quote_l = quote.length();
writer.Write(quote_cstr, quote_l);
writer.Write(str_value, len);
writer.Write(&quote, 1);
writer.Write(quote_cstr, quote_l);
}

// FIXME: add escapes!
}

void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state) {
auto &info = *this->info;
index_t total = 0;

// delimiter as cstr and its length
const char *delimiter_cstr = info.delimiter.c_str();
index_t delimiter_l = info.delimiter.length();

string newline = "\n";
BufferedWriter writer(info.file_path);
if (info.header) {
// write the header line
for (index_t i = 0; i < names.size(); i++) {
if (i != 0) {
writer.Write(&info.delimiter, 1);

writer.Write(delimiter_cstr, delimiter_l);
}
WriteQuotedString(writer, names[i].c_str(), info.delimiter, info.quote);
}
Expand Down Expand Up @@ -122,7 +135,7 @@ void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chu
// write values
for (index_t col_idx = 0; col_idx < state->child_chunk.column_count; col_idx++) {
if (col_idx != 0) {
writer.Write(&info.delimiter, 1);
writer.Write(delimiter_cstr, delimiter_l);
}
if (cast_chunk.data[col_idx].nullmask[i]) {
// write null value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "execution/physical_operator.hpp"
#include "parser/parsed_data/copy_info.hpp"

#include <queue>

namespace duckdb {
struct CopyInfo;

Expand Down Expand Up @@ -44,13 +46,15 @@ class BufferedCSVReader {

private:
//! Adds a value to the current row
void AddValue(char *str_val, index_t length, index_t &column);
void AddValue(char *str_val, index_t length, index_t &column, std::queue<index_t> &escape_positions);
//! Adds a row to the insert_chunk, returns true if the chunk is filled as a result of this row being added
bool AddRow(DataChunk &insert_chunk, index_t &column);
//! Finalizes a chunk, parsing all values that have been added so far and adding them to the insert_chunk
void Flush(DataChunk &insert_chunk);
//! Reads a new buffer from the CSV file if the current one has been exhausted
bool ReadBuffer(index_t &start);
//! Sets the control strings starting at the current buffer position, returns false if the buffer was exhausted
bool MatchControlString(bool &delim_str, bool &quote_str, bool &escape_str, index_t &delim_l, index_t &quote_l, index_t & escape_l);
};

} // namespace duckdb
8 changes: 4 additions & 4 deletions src/include/parser/parsed_data/copy_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ struct CopyInfo {
//! Whether or not this is a copy to file (false) or copy from a file (true)
bool is_from;
//! Delimiter to separate columns within each line
char delimiter;
string delimiter;
//! Quote used for columns that contain reserved characters, e.g., delimiter
char quote;
string quote;
//! Escape character to escape quote character
char escape;
string escape;
//! Whether or not the file has a header line
bool header;
//! The file format of the external file
Expand All @@ -48,7 +48,7 @@ struct CopyInfo {
vector<string> force_not_null_list;

CopyInfo()
: schema(DEFAULT_SCHEMA), is_from(false), delimiter(','), quote('"'), header(false),
: schema(DEFAULT_SCHEMA), is_from(false), delimiter(","), quote("\""), escape(""), header(false),
format(ExternalFileFormat::CSV), null_str(""), quote_all(false) {
}
};
Expand Down
Loading

0 comments on commit 42c4e42

Please sign in to comment.