Skip to content

Commit

Permalink
Merge pull request #10411 from pdet/projection_pushdown_csv
Browse files Browse the repository at this point in the history
[CSV Reader] Use array instead of unordered_map in projection pushdown
  • Loading branch information
Mytherin committed Feb 1, 2024
2 parents 2f525a9 + 0a252f7 commit 7456cef
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 82 deletions.
3 changes: 2 additions & 1 deletion .github/regression/csv.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
benchmark/micro/csv/sniffer.benchmark
benchmark/micro/csv/read.benchmark
benchmark/micro/csv/small_csv.benchmark
benchmark/micro/csv/null_padding.benchmark
benchmark/micro/csv/null_padding.benchmark
benchmark/micro/csv/projection_pushdown.benchmark
16 changes: 16 additions & 0 deletions benchmark/micro/csv/projection_pushdown.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# name: benchmark/micro/csv/projection_pushdown.benchmark
# description: Runs the CSV Scanner with Projection Pushdown
# group: [csv]

name CSV Projection Pushdown
group csv

require tpch

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
SELECT l_returnflag, MIN(l_orderkey) FROM '${BENCHMARK_DIR}/lineitem.csv' GROUP BY l_returnflag;
8 changes: 5 additions & 3 deletions benchmark/micro/csv/read.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ name CSV Read Benchmark
group csv

require tpch
load
CALL dbgen(sf=1.5, suffix='_normal');
COPY lineitem_normal TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
SELECT * FROM '${BENCHMARK_DIR}/lineitem.csv'
8 changes: 5 additions & 3 deletions benchmark/micro/csv/sniffer.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ name CSV Sniffer Benchmark
group csv

require tpch
load
CALL dbgen(sf=2, suffix='_normal');
COPY lineitem_normal TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
DESCRIBE SELECT * FROM '${BENCHMARK_DIR}/lineitem.csv'
9 changes: 4 additions & 5 deletions benchmark/micro/csv/sniffer_quotes.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ name CSV Read Benchmark
group csv

require tpch
load
CALL dbgen(sf=1, suffix='_normal');
COPY lineitem_normal TO 'lineitem_quoted.csv' (FORMAT CSV, DELIMITER '|', HEADER, FORCE_QUOTE *);

run
select prompt FROM sniff_csv('${BENCHMARK_DIR}/lineitem_quoted.csv')
cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER, FORCE_QUOTE *);

result I
FROM read_csv('${BENCHMARK_DIR}/lineitem_quoted.csv', auto_detect=false, delim='|', quote='"', escape='"', new_line='\n', skip=0, header=true, columns={'l_orderkey': 'BIGINT', 'l_partkey': 'BIGINT', 'l_suppkey': 'BIGINT', 'l_linenumber': 'BIGINT', 'l_quantity': 'DOUBLE', 'l_extendedprice': 'DOUBLE', 'l_discount': 'DOUBLE', 'l_tax': 'DOUBLE', 'l_returnflag': 'VARCHAR', 'l_linestatus': 'VARCHAR', 'l_shipdate': 'DATE', 'l_commitdate': 'DATE', 'l_receiptdate': 'DATE', 'l_shipinstruct': 'VARCHAR', 'l_shipmode': 'VARCHAR', 'l_comment': 'VARCHAR'}, dateformat='%Y-%m-%d');
81 changes: 42 additions & 39 deletions src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
for (idx_t i = 0; i < number_of_columns; i++) {
parse_types[i] = LogicalTypeId::VARCHAR;
logical_types.emplace_back(LogicalType::VARCHAR);
string name = "Column_" + to_string(i);
names.emplace_back(name);
}
} else {
if (csv_file_scan->file_types.size() > number_of_columns) {
Expand All @@ -53,26 +55,21 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
logical_types.emplace_back(LogicalType::VARCHAR);
}
}
}
// Fill out Names
if (!csv_file_scan) {
for (idx_t i = 0; i < number_of_columns; i++) {
string name = "Column_" + to_string(i);
names.emplace_back(name);
}
} else {
names = csv_file_scan->file_names;
bool projecting_columns = false;
idx_t i = 0;
for (auto &col_idx : csv_file_scan->projected_columns) {
projected_columns[col_idx] = i;
if (col_idx != i) {
projecting_columns = true;
names = csv_file_scan->names;
if (!csv_file_scan->projected_columns.empty()) {
projecting_columns = false;
projected_columns = make_unsafe_uniq_array<bool>(number_of_columns);
for (idx_t col_idx = 0; col_idx < number_of_columns; col_idx++) {
if (csv_file_scan->projected_columns.find(col_idx) == csv_file_scan->projected_columns.end()) {
// Column is not projected
projecting_columns = true;
projected_columns[col_idx] = false;
} else {
projected_columns[col_idx] = true;
}
}
i++;
}
if (!projecting_columns && projected_columns.size() == number_of_columns) {
projected_columns.clear();
if (!projecting_columns) {
for (idx_t j = logical_types.size(); j < number_of_columns; j++) {
// This can happen if we have sneaky null columns at the end that we wish to ignore
parse_types[j] = LogicalTypeId::VARCHAR;
Expand All @@ -90,13 +87,11 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
}

void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size, bool allocate) {
idx_t chunk_col_id = cur_col_id;
if (!projected_columns.empty()) {
if (projected_columns.find(cur_col_id) == projected_columns.end()) {
if (projecting_columns) {
if (!projected_columns[cur_col_id]) {
cur_col_id++;
return;
}
chunk_col_id = projected_columns[cur_col_id];
}
if (size == null_str_size) {
if (((quoted && state_machine.options.allow_quoted_nulls) || !quoted)) {
Expand Down Expand Up @@ -129,19 +124,18 @@ void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size
validity_mask[chunk_col_id]->SetInvalid(number_of_rows);
}
cur_col_id++;
chunk_col_id++;
return;
}
}
}
if (chunk_col_id >= number_of_columns) {
HandleOverLimitRows();
chunk_col_id = cur_col_id;
if (!projected_columns.empty()) {
if (projected_columns.find(cur_col_id) == projected_columns.end()) {
if (projecting_columns) {
if (!projected_columns[cur_col_id]) {
cur_col_id++;
return;
}
chunk_col_id = projected_columns[cur_col_id];
}
}
bool success = true;
Expand Down Expand Up @@ -211,9 +205,10 @@ void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size
}
if (!success) {
// We had a casting error, we push it here because we can only error when finishing the line read.
cast_errors[chunk_col_id] = std::string(value_ptr, size);
cast_errors[cur_col_id] = std::string(value_ptr, size);
}
cur_col_id++;
chunk_col_id++;
}

Value StringValueResult::GetValue(idx_t row_idx, idx_t col_idx) {
Expand All @@ -234,18 +229,16 @@ DataChunk &StringValueResult::ToChunk() {

void StringValueResult::AddQuotedValue(StringValueResult &result, const idx_t buffer_pos) {
if (result.escaped) {
idx_t chunk_col_id = result.cur_col_id;
if (!result.projected_columns.empty()) {
if (result.projected_columns.find(result.cur_col_id) == result.projected_columns.end()) {
if (result.projecting_columns) {
if (!result.projected_columns[result.cur_col_id]) {
result.cur_col_id++;
return;
}
chunk_col_id = result.projected_columns[result.cur_col_id];
}
// If it's an escaped value we have to remove all the escapes, this is not really great
auto value = StringValueScanner::RemoveEscape(
result.buffer_ptr + result.last_position + 1, buffer_pos - result.last_position - 2,
result.state_machine.options.GetEscape()[0], result.parse_chunk.data[chunk_col_id]);
result.state_machine.options.GetEscape()[0], result.parse_chunk.data[result.chunk_col_id]);
result.AddValueToVector(value.GetData(), value.GetSize());
} else {
if (buffer_pos < result.last_position + 2) {
Expand Down Expand Up @@ -280,6 +273,7 @@ void StringValueResult::HandleOverLimitRows() {
error_handler.Error(lines_per_batch, csv_error);
// If we get here we need to remove the last line
cur_col_id = 0;
chunk_col_id = 0;
}

void StringValueResult::QuotedNewLine(StringValueResult &result) {
Expand Down Expand Up @@ -327,6 +321,7 @@ bool StringValueResult::AddRowInternal() {
// Cleanup this line and continue
cast_errors.clear();
cur_col_id = 0;
chunk_col_id = 0;
return false;
}
NullPaddingQuotedNewlineCheck();
Expand All @@ -342,11 +337,12 @@ bool StringValueResult::AddRowInternal() {
empty = state_machine.options.force_not_null[cur_col_id];
}
if (empty) {
static_cast<string_t *>(vector_ptr[cur_col_id])[number_of_rows] = string_t();
static_cast<string_t *>(vector_ptr[chunk_col_id])[number_of_rows] = string_t();
} else {
validity_mask[cur_col_id]->SetInvalid(number_of_rows);
validity_mask[chunk_col_id]->SetInvalid(number_of_rows);
}
cur_col_id++;
chunk_col_id++;
}
} else {
// If we are not nullpadding this is an error
Expand All @@ -359,6 +355,7 @@ bool StringValueResult::AddRowInternal() {
}
}
cur_col_id = 0;
chunk_col_id = 0;
number_of_rows++;
if (number_of_rows >= result_size) {
// We have a full chunk
Expand Down Expand Up @@ -407,7 +404,7 @@ bool StringValueResult::AddRow(StringValueResult &result, const idx_t buffer_pos
void StringValueResult::InvalidState(StringValueResult &result) {
// FIXME: How do we recover from an invalid state? Can we restart the state machine and jump to the next row?
auto csv_error = CSVError::UnterminatedQuotesError(result.state_machine.options,
static_cast<string_t *>(result.vector_ptr[result.cur_col_id]),
static_cast<string_t *>(result.vector_ptr[result.chunk_col_id]),
result.number_of_rows, result.cur_col_id);
LinesPerBoundary lines_per_batch(result.iterator.GetBoundaryIdx(), result.number_of_rows);
result.error_handler.Error(lines_per_batch, csv_error);
Expand Down Expand Up @@ -481,6 +478,7 @@ bool StringValueScanner::FinishedIterator() {
StringValueResult &StringValueScanner::ParseChunk() {
result.number_of_rows = 0;
result.cur_col_id = 0;
result.chunk_col_id = 0;
for (auto &v : result.validity_mask) {
v->SetAllValid(result.result_size);
}
Expand All @@ -506,8 +504,11 @@ void StringValueScanner::Flush(DataChunk &insert_chunk) {
auto &reader_data = csv_file_scan->reader_data;
// Now Do the cast-aroo
for (idx_t c = 0; c < reader_data.column_ids.size(); c++) {
auto col_idx = c;
auto result_idx = reader_data.column_mapping[c];
idx_t col_idx = c;
idx_t result_idx = reader_data.column_mapping[c];
if (!csv_file_scan->projection_ids.empty()) {
result_idx = reader_data.column_mapping[csv_file_scan->projection_ids[c].second];
}
if (col_idx >= parse_chunk.ColumnCount()) {
throw InvalidInputException("Mismatch between the schema of different files");
}
Expand Down Expand Up @@ -792,7 +793,7 @@ void StringValueScanner::ProcessOverbufferValue() {
value =
StringValueScanner::RemoveEscape(str_ptr, overbuffer_string.size() - 2,
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.cur_col_id]);
result.parse_chunk.data[result.chunk_col_id]);
}
} else {
value = string_t(overbuffer_string.c_str(), overbuffer_string.size());
Expand Down Expand Up @@ -839,6 +840,7 @@ bool StringValueScanner::MoveToNextBuffer() {
result.number_of_rows++;
}
result.cur_col_id = 0;
result.chunk_col_id = 0;
return false;
} else if (states.NewValue()) {
lines_read++;
Expand Down Expand Up @@ -1046,7 +1048,8 @@ void StringValueScanner::FinalizeChunkProcess() {
iterator.done = FinishedFile();
if (result.null_padding) {
while (result.cur_col_id < result.number_of_columns) {
result.validity_mask[result.cur_col_id++]->SetInvalid(result.number_of_rows);
result.validity_mask[result.chunk_col_id++]->SetInvalid(result.number_of_rows);
result.cur_col_id++;
}
result.number_of_rows++;
}
Expand Down
Loading

0 comments on commit 7456cef

Please sign in to comment.