Skip to content

Commit

Permalink
ARROW-6825: [C++] Rework CSV reader IO around readahead iterator
Browse files Browse the repository at this point in the history
Make the delimiting chunker a common facility used by CSV and JSON.

Closes #5727 from pitrou/ARROW-6825-delimited and squashes the following commits:

68a5a02 <Antoine Pitrou> Address review comments
394fecc <Antoine Pitrou> ARROW-6825:  Rework CSV reader IO around readahead iterator

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
pitrou authored and bkietz committed Nov 4, 2019
1 parent 77d4d49 commit 21ca13a
Show file tree
Hide file tree
Showing 22 changed files with 969 additions and 1,288 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ set(ARROW_SRCS
io/hdfs_internal.cc
io/interfaces.cc
io/memory.cc
io/readahead.cc
io/slow.cc
testing/util.cc
util/basic_decimal.cc
util/bit_util.cc
util/compression.cc
util/cpu_info.cc
util/decimal.cc
util/delimiting.cc
util/formatting.cc
util/int_util.cc
util/io_util.cc
Expand Down
300 changes: 190 additions & 110 deletions cpp/src/arrow/csv/chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,168 +18,248 @@
#include "arrow/csv/chunker.h"

#include <cstdint>
#include <memory>
#include <utility>

#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/stl.h"
#include "arrow/util/string_view.h"

namespace arrow {
namespace csv {

namespace {

// Find the last newline character in the given data block.
// nullptr is returned if not found (like memchr()).
const char* FindNewlineReverse(const char* data, uint32_t size) {
if (size == 0) {
return nullptr;
}
const char* s = data + size - 1;
while (size > 0) {
if (*s == '\r' || *s == '\n') {
return s;
}
--s;
--size;
}
return nullptr;
}
// NOTE: csvmonkey (https://github.com/dw/csvmonkey) has optimization ideas

} // namespace
template <bool quoting, bool escaping>
class Lexer {
public:
enum State {
FIELD_START,
IN_FIELD,
AT_ESCAPE,
IN_QUOTED_FIELD,
AT_QUOTED_QUOTE,
AT_QUOTED_ESCAPE
};

Chunker::Chunker(ParseOptions options) : options_(options) {}
explicit Lexer(const ParseOptions& options) : options_(options) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
}

// NOTE: cvsmonkey (https://github.com/dw/csvmonkey) has optimization ideas
const char* ReadLine(const char* data, const char* data_end) {
// The parsing state machine
char c;
DCHECK_GT(data_end - data, 0);
if (ARROW_PREDICT_TRUE(state_ == FIELD_START)) {
goto FieldStart;
}
switch (state_) {
case FIELD_START:
goto FieldStart;
case IN_FIELD:
goto InField;
case AT_ESCAPE:
goto AtEscape;
case IN_QUOTED_FIELD:
goto InQuotedField;
case AT_QUOTED_QUOTE:
goto AtQuotedQuote;
case AT_QUOTED_ESCAPE:
goto AtQuotedEscape;
}

template <bool quoting, bool escaping>
inline const char* Chunker::ReadLine(const char* data, const char* data_end) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
FieldStart:
// At the start of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = FIELD_START;
goto AbortLine;
}
// Quoting is only recognized at start of field
if (quoting && *data == options_.quote_char) {
data++;
goto InQuotedField;
} else {
goto InField;
}

// The parsing state machine
char c;
InField:
// Inside a non-quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = IN_FIELD;
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_ESCAPE;
goto AbortLine;
}
data++;
goto InField;
}
if (ARROW_PREDICT_FALSE(c == '\r')) {
if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') {
data++;
}
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == '\n')) {
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
goto InField;

FieldStart:
// At the start of a field
// Quoting is only recognized at start of field
if (quoting && ARROW_PREDICT_TRUE(data != data_end) && *data == options_.quote_char) {
AtEscape:
// Coming here if last block ended on a non-quoted escape
data++;
goto InQuotedField;
} else {
goto InField;
}

InField:
// Inside a non-quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
InQuotedField:
// Inside a quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = IN_QUOTED_FIELD;
goto AbortLine;
}
data++;
goto InField;
}
if (ARROW_PREDICT_FALSE(c == '\r')) {
if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') {
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_QUOTED_ESCAPE;
goto AbortLine;
}
data++;
goto InQuotedField;
}
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == '\n')) {
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
goto InField;

InQuotedField:
// Inside a quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (data == data_end) {
goto AbortLine;
if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_QUOTED_QUOTE;
goto AbortLine;
}
if (options_.double_quote && *data == options_.quote_char) {
// Double-quoting
data++;
} else {
// End of single-quoting
goto InField;
}
}
goto InQuotedField;

AtQuotedEscape:
// Coming here if last block ended on a quoted escape
data++;
goto InQuotedField;
}
if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
if (options_.double_quote && data != data_end && *data == options_.quote_char) {

AtQuotedQuote:
// Coming here if last block ended on a quoted quote
if (options_.double_quote && *data == options_.quote_char) {
// Double-quoting
data++;
goto InQuotedField;
} else {
// End of single-quoting
goto InField;
}
}
goto InQuotedField;

FieldEnd:
// At the end of a field
goto FieldStart;
FieldEnd:
// At the end of a field
goto FieldStart;

LineEnd:
return data;
LineEnd:
return data;

AbortLine:
// Truncated line at end of block
return nullptr;
}
AbortLine:
// Truncated line
return nullptr;
}

protected:
const ParseOptions& options_;
State state_ = FIELD_START;
};

// A BoundaryFinder implementation that assumes CSV cells can contain raw newlines,
// and uses actual CSV lexing to delimit them.
template <bool quoting, bool escaping>
Status Chunker::ProcessSpecialized(const char* start, uint32_t size, uint32_t* out_size) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
class LexingBoundaryFinder : public BoundaryFinder {
public:
explicit LexingBoundaryFinder(ParseOptions options) : options_(std::move(options)) {}

Status FindFirst(util::string_view partial, util::string_view block,
int64_t* out_pos) override {
Lexer<quoting, escaping> lexer(options_);

const char* data = start;
const char* data_end = start + size;
const char* line_end =
lexer.ReadLine(partial.data(), partial.data() + partial.size());
DCHECK_EQ(line_end, nullptr); // Otherwise `partial` is a whole CSV line
line_end = lexer.ReadLine(block.data(), block.data() + block.size());

while (data < data_end) {
const char* line_end = ReadLine<quoting, escaping>(data, data_end);
if (line_end == nullptr) {
// Cannot read any further
break;
// No complete CSV line
*out_pos = -1;
} else {
*out_pos = static_cast<int64_t>(line_end - block.data());
DCHECK_GT(*out_pos, 0);
}
data = line_end;
return Status::OK();
}
*out_size = static_cast<uint32_t>(data - start);
return Status::OK();
}

Status Chunker::Process(const char* start, uint32_t size, uint32_t* out_size) {
if (!options_.newlines_in_values) {
// In newlines are not accepted in CSV values, we can simply search for
// the last newline character.
// For common block sizes and CSV row sizes, this avoids reading
// most of the data block, making the chunker extremely fast compared
// to the rest of the CSV reading pipeline.
const char* nl = FindNewlineReverse(start, size);
if (nl == nullptr) {
*out_size = 0;
Status FindLast(util::string_view block, int64_t* out_pos) override {
Lexer<quoting, escaping> lexer(options_);

const char* data = block.data();
const char* const data_end = block.data() + block.size();

while (data < data_end) {
const char* line_end = lexer.ReadLine(data, data_end);
if (line_end == nullptr) {
// Cannot read any further
break;
}
DCHECK_GT(line_end, data);
data = line_end;
}
if (data == block.data()) {
// No complete CSV line
*out_pos = -1;
} else {
*out_size = static_cast<uint32_t>(nl - start + 1);
*out_pos = static_cast<int64_t>(data - block.data());
DCHECK_GT(*out_pos, 0);
}
return Status::OK();
}

if (options_.quoting) {
if (options_.escaping) {
return ProcessSpecialized<true, true>(start, size, out_size);
} else {
return ProcessSpecialized<true, false>(start, size, out_size);
}
protected:
ParseOptions options_;
};

} // namespace

std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
std::shared_ptr<BoundaryFinder> delimiter;
if (!options.newlines_in_values) {
delimiter = MakeNewlineBoundaryFinder();
} else {
if (options_.escaping) {
return ProcessSpecialized<false, true>(start, size, out_size);
if (options.quoting) {
if (options.escaping) {
delimiter = std::make_shared<LexingBoundaryFinder<true, true>>(options);
} else {
delimiter = std::make_shared<LexingBoundaryFinder<true, false>>(options);
}
} else {
return ProcessSpecialized<false, false>(start, size, out_size);
if (options.escaping) {
delimiter = std::make_shared<LexingBoundaryFinder<false, true>>(options);
} else {
delimiter = std::make_shared<LexingBoundaryFinder<false, false>>(options);
}
}
}
return internal::make_unique<Chunker>(std::move(delimiter));
}

} // namespace csv
Expand Down
Loading

0 comments on commit 21ca13a

Please sign in to comment.