Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSV Reader] Use array instead of unordered_map in projection pushdown #10411

Merged
merged 8 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/regression/csv.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
benchmark/micro/csv/sniffer.benchmark
benchmark/micro/csv/read.benchmark
benchmark/micro/csv/small_csv.benchmark
benchmark/micro/csv/null_padding.benchmark
benchmark/micro/csv/null_padding.benchmark
benchmark/micro/csv/projection_pushdown.benchmark
16 changes: 16 additions & 0 deletions benchmark/micro/csv/projection_pushdown.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# name: benchmark/micro/csv/projection_pushdown.benchmark
# description: Runs the CSV Scanner with Projection Pushdown
# group: [csv]

name CSV Projection Pushdown
group csv

require tpch

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
SELECT l_returnflag, MIN(l_orderkey) FROM '${BENCHMARK_DIR}/lineitem.csv' GROUP BY l_returnflag;
8 changes: 5 additions & 3 deletions benchmark/micro/csv/read.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ name CSV Read Benchmark
group csv

require tpch
load
CALL dbgen(sf=1.5, suffix='_normal');
COPY lineitem_normal TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
SELECT * FROM '${BENCHMARK_DIR}/lineitem.csv'
8 changes: 5 additions & 3 deletions benchmark/micro/csv/sniffer.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ name CSV Sniffer Benchmark
group csv

require tpch
load
CALL dbgen(sf=2, suffix='_normal');
COPY lineitem_normal TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER);

run
DESCRIBE SELECT * FROM '${BENCHMARK_DIR}/lineitem.csv'
9 changes: 4 additions & 5 deletions benchmark/micro/csv/sniffer_quotes.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ name CSV Read Benchmark
group csv

require tpch
load
CALL dbgen(sf=1, suffix='_normal');
COPY lineitem_normal TO 'lineitem_quoted.csv' (FORMAT CSV, DELIMITER '|', HEADER, FORCE_QUOTE *);

run
select prompt FROM sniff_csv('${BENCHMARK_DIR}/lineitem_quoted.csv')
cache tpch_sf1.duckdb

load benchmark/tpch/sf1/load.sql
COPY lineitem TO '${BENCHMARK_DIR}/lineitem.csv' (FORMAT CSV, DELIMITER '|', HEADER, FORCE_QUOTE *);

result I
FROM read_csv('${BENCHMARK_DIR}/lineitem_quoted.csv', auto_detect=false, delim='|', quote='"', escape='"', new_line='\n', skip=0, header=true, columns={'l_orderkey': 'BIGINT', 'l_partkey': 'BIGINT', 'l_suppkey': 'BIGINT', 'l_linenumber': 'BIGINT', 'l_quantity': 'DOUBLE', 'l_extendedprice': 'DOUBLE', 'l_discount': 'DOUBLE', 'l_tax': 'DOUBLE', 'l_returnflag': 'VARCHAR', 'l_linestatus': 'VARCHAR', 'l_shipdate': 'DATE', 'l_commitdate': 'DATE', 'l_receiptdate': 'DATE', 'l_shipinstruct': 'VARCHAR', 'l_shipmode': 'VARCHAR', 'l_comment': 'VARCHAR'}, dateformat='%Y-%m-%d');
81 changes: 42 additions & 39 deletions src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
for (idx_t i = 0; i < number_of_columns; i++) {
parse_types[i] = LogicalTypeId::VARCHAR;
logical_types.emplace_back(LogicalType::VARCHAR);
string name = "Column_" + to_string(i);
names.emplace_back(name);
}
} else {
if (csv_file_scan->file_types.size() > number_of_columns) {
Expand All @@ -53,26 +55,21 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
logical_types.emplace_back(LogicalType::VARCHAR);
}
}
}
// Fill out Names
if (!csv_file_scan) {
for (idx_t i = 0; i < number_of_columns; i++) {
string name = "Column_" + to_string(i);
names.emplace_back(name);
}
} else {
names = csv_file_scan->file_names;
bool projecting_columns = false;
idx_t i = 0;
for (auto &col_idx : csv_file_scan->projected_columns) {
projected_columns[col_idx] = i;
if (col_idx != i) {
projecting_columns = true;
names = csv_file_scan->names;
if (!csv_file_scan->projected_columns.empty()) {
projecting_columns = false;
projected_columns = make_unsafe_uniq_array<bool>(number_of_columns);
for (idx_t col_idx = 0; col_idx < number_of_columns; col_idx++) {
if (csv_file_scan->projected_columns.find(col_idx) == csv_file_scan->projected_columns.end()) {
// Column is not projected
projecting_columns = true;
projected_columns[col_idx] = false;
} else {
projected_columns[col_idx] = true;
}
}
i++;
}
if (!projecting_columns && projected_columns.size() == number_of_columns) {
projected_columns.clear();
if (!projecting_columns) {
for (idx_t j = logical_types.size(); j < number_of_columns; j++) {
// This can happen if we have sneaky null columns at the end that we wish to ignore
parse_types[j] = LogicalTypeId::VARCHAR;
Expand All @@ -90,13 +87,11 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
}

void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size, bool allocate) {
idx_t chunk_col_id = cur_col_id;
if (!projected_columns.empty()) {
if (projected_columns.find(cur_col_id) == projected_columns.end()) {
if (projecting_columns) {
if (!projected_columns[cur_col_id]) {
cur_col_id++;
return;
}
chunk_col_id = projected_columns[cur_col_id];
}
if (size == null_str_size) {
if (((quoted && state_machine.options.allow_quoted_nulls) || !quoted)) {
Expand Down Expand Up @@ -129,19 +124,18 @@ void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size
validity_mask[chunk_col_id]->SetInvalid(number_of_rows);
}
cur_col_id++;
chunk_col_id++;
return;
}
}
}
if (chunk_col_id >= number_of_columns) {
HandleOverLimitRows();
chunk_col_id = cur_col_id;
if (!projected_columns.empty()) {
if (projected_columns.find(cur_col_id) == projected_columns.end()) {
if (projecting_columns) {
if (!projected_columns[cur_col_id]) {
cur_col_id++;
return;
}
chunk_col_id = projected_columns[cur_col_id];
}
}
bool success = true;
Expand Down Expand Up @@ -211,9 +205,10 @@ void StringValueResult::AddValueToVector(const char *value_ptr, const idx_t size
}
if (!success) {
// We had a casting error, we push it here because we can only error when finishing the line read.
cast_errors[chunk_col_id] = std::string(value_ptr, size);
cast_errors[cur_col_id] = std::string(value_ptr, size);
}
cur_col_id++;
chunk_col_id++;
}

Value StringValueResult::GetValue(idx_t row_idx, idx_t col_idx) {
Expand All @@ -234,18 +229,16 @@ DataChunk &StringValueResult::ToChunk() {

void StringValueResult::AddQuotedValue(StringValueResult &result, const idx_t buffer_pos) {
if (result.escaped) {
idx_t chunk_col_id = result.cur_col_id;
if (!result.projected_columns.empty()) {
if (result.projected_columns.find(result.cur_col_id) == result.projected_columns.end()) {
if (result.projecting_columns) {
if (!result.projected_columns[result.cur_col_id]) {
result.cur_col_id++;
return;
}
chunk_col_id = result.projected_columns[result.cur_col_id];
}
// If it's an escaped value we have to remove all the escapes, this is not really great
auto value = StringValueScanner::RemoveEscape(
result.buffer_ptr + result.last_position + 1, buffer_pos - result.last_position - 2,
result.state_machine.options.GetEscape()[0], result.parse_chunk.data[chunk_col_id]);
result.state_machine.options.GetEscape()[0], result.parse_chunk.data[result.chunk_col_id]);
result.AddValueToVector(value.GetData(), value.GetSize());
} else {
if (buffer_pos < result.last_position + 2) {
Expand Down Expand Up @@ -280,6 +273,7 @@ void StringValueResult::HandleOverLimitRows() {
error_handler.Error(lines_per_batch, csv_error);
// If we get here we need to remove the last line
cur_col_id = 0;
chunk_col_id = 0;
}

void StringValueResult::QuotedNewLine(StringValueResult &result) {
Expand Down Expand Up @@ -327,6 +321,7 @@ bool StringValueResult::AddRowInternal() {
// Cleanup this line and continue
cast_errors.clear();
cur_col_id = 0;
chunk_col_id = 0;
return false;
}
NullPaddingQuotedNewlineCheck();
Expand All @@ -342,11 +337,12 @@ bool StringValueResult::AddRowInternal() {
empty = state_machine.options.force_not_null[cur_col_id];
}
if (empty) {
static_cast<string_t *>(vector_ptr[cur_col_id])[number_of_rows] = string_t();
static_cast<string_t *>(vector_ptr[chunk_col_id])[number_of_rows] = string_t();
} else {
validity_mask[cur_col_id]->SetInvalid(number_of_rows);
validity_mask[chunk_col_id]->SetInvalid(number_of_rows);
}
cur_col_id++;
chunk_col_id++;
}
} else {
// If we are not nullpadding this is an error
Expand All @@ -359,6 +355,7 @@ bool StringValueResult::AddRowInternal() {
}
}
cur_col_id = 0;
chunk_col_id = 0;
number_of_rows++;
if (number_of_rows >= result_size) {
// We have a full chunk
Expand Down Expand Up @@ -407,7 +404,7 @@ bool StringValueResult::AddRow(StringValueResult &result, const idx_t buffer_pos
void StringValueResult::InvalidState(StringValueResult &result) {
// FIXME: How do we recover from an invalid state? Can we restart the state machine and jump to the next row?
auto csv_error = CSVError::UnterminatedQuotesError(result.state_machine.options,
static_cast<string_t *>(result.vector_ptr[result.cur_col_id]),
static_cast<string_t *>(result.vector_ptr[result.chunk_col_id]),
result.number_of_rows, result.cur_col_id);
LinesPerBoundary lines_per_batch(result.iterator.GetBoundaryIdx(), result.number_of_rows);
result.error_handler.Error(lines_per_batch, csv_error);
Expand Down Expand Up @@ -481,6 +478,7 @@ bool StringValueScanner::FinishedIterator() {
StringValueResult &StringValueScanner::ParseChunk() {
result.number_of_rows = 0;
result.cur_col_id = 0;
result.chunk_col_id = 0;
for (auto &v : result.validity_mask) {
v->SetAllValid(result.result_size);
}
Expand All @@ -506,8 +504,11 @@ void StringValueScanner::Flush(DataChunk &insert_chunk) {
auto &reader_data = csv_file_scan->reader_data;
// Now Do the cast-aroo
for (idx_t c = 0; c < reader_data.column_ids.size(); c++) {
auto col_idx = c;
auto result_idx = reader_data.column_mapping[c];
idx_t col_idx = c;
idx_t result_idx = reader_data.column_mapping[c];
if (!csv_file_scan->projection_ids.empty()) {
result_idx = reader_data.column_mapping[csv_file_scan->projection_ids[c].second];
}
if (col_idx >= parse_chunk.ColumnCount()) {
throw InvalidInputException("Mismatch between the schema of different files");
}
Expand Down Expand Up @@ -792,7 +793,7 @@ void StringValueScanner::ProcessOverbufferValue() {
value =
StringValueScanner::RemoveEscape(str_ptr, overbuffer_string.size() - 2,
state_machine->dialect_options.state_machine_options.escape.GetValue(),
result.parse_chunk.data[result.cur_col_id]);
result.parse_chunk.data[result.chunk_col_id]);
}
} else {
value = string_t(overbuffer_string.c_str(), overbuffer_string.size());
Expand Down Expand Up @@ -839,6 +840,7 @@ bool StringValueScanner::MoveToNextBuffer() {
result.number_of_rows++;
}
result.cur_col_id = 0;
result.chunk_col_id = 0;
return false;
} else if (states.NewValue()) {
lines_read++;
Expand Down Expand Up @@ -1046,7 +1048,8 @@ void StringValueScanner::FinalizeChunkProcess() {
iterator.done = FinishedFile();
if (result.null_padding) {
while (result.cur_col_id < result.number_of_columns) {
result.validity_mask[result.cur_col_id++]->SetInvalid(result.number_of_rows);
result.validity_mask[result.chunk_col_id++]->SetInvalid(result.number_of_rows);
result.cur_col_id++;
}
result.number_of_rows++;
}
Expand Down
Loading
Loading