Skip to content

Commit

Permalink
Merge pull request #3700 from Mytherin/parallelinsertorderpreserving
Browse files Browse the repository at this point in the history
Support Parallel Order-Preserving Result Set Materialization
  • Loading branch information
Mytherin committed May 25, 2022
2 parents 6cb2e60 + 01c4a2e commit a25b6e3
Show file tree
Hide file tree
Showing 93 changed files with 2,251 additions and 493 deletions.
4 changes: 2 additions & 2 deletions .github/regression/micro.csv
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Append100KIntegersPREPARED
benchmark/micro/cast/cast_date_string.benchmark
benchmark/micro/cast/cast_int_string.benchmark
benchmark/micro/cast/cast_double_string.benchmark
benchmark/micro/cast/cast_int32_int64.benchmark
benchmark/micro/cast/cast_int64_int32.benchmark
benchmark/micro/cast/cast_string_double.benchmark
benchmark/micro/cast/cast_string_int.benchmark
benchmark/micro/cast/cast_timestamp_string.benchmark
benchmark/micro/limit/parallel_limit.benchmark
benchmark/micro/filter/parallel_complex_filter.benchmark
16 changes: 12 additions & 4 deletions benchmark/benchmark_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ static bool endsWith(const string &mainStr, const string &toMatch) {
BenchmarkRunner::BenchmarkRunner() {
}

void BenchmarkRunner::SaveDatabase(DuckDB &db, string name) {
auto &fs = db.GetFileSystem();
void BenchmarkRunner::InitializeBenchmarkDirectory() {
auto fs = FileSystem::CreateLocal();
// check if the database directory exists; if not create it
if (!fs.DirectoryExists(DUCKDB_BENCHMARK_DIRECTORY)) {
fs.CreateDirectory(DUCKDB_BENCHMARK_DIRECTORY);
if (!fs->DirectoryExists(DUCKDB_BENCHMARK_DIRECTORY)) {
fs->CreateDirectory(DUCKDB_BENCHMARK_DIRECTORY);
}
}

void BenchmarkRunner::SaveDatabase(DuckDB &db, string name) {
InitializeBenchmarkDirectory();

auto &fs = db.GetFileSystem();
Connection con(db);
auto result = con.Query(
StringUtil::Format("EXPORT DATABASE '%s' (FORMAT CSV)", fs.JoinPath(DUCKDB_BENCHMARK_DIRECTORY, name)));
Expand Down Expand Up @@ -266,6 +272,8 @@ void parse_arguments(const int arg_counter, char const *const *arg_values) {
* Returns an configuration error code.
*/
ConfigurationError run_benchmarks() {
BenchmarkRunner::InitializeBenchmarkDirectory();

auto &instance = BenchmarkRunner::GetInstance();
auto &benchmarks = instance.benchmarks;
if (!instance.configuration.name_pattern.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions benchmark/include/benchmark_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class BenchmarkRunner {
return instance;
}

static void InitializeBenchmarkDirectory();

//! Save the current database state, exporting it to a set of CSVs in the DUCKDB_BENCHMARK_DIRECTORY directory
static void SaveDatabase(DuckDB &db, string name);
//! Try to initialize the database from the DUCKDB_BENCHMARK_DIRECTORY
Expand Down
19 changes: 19 additions & 0 deletions benchmark/micro/filter/parallel_complex_filter.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# name: benchmark/micro/filter/parallel_complex_filter.benchmark
# description: Benchmark of parallel complex filter limit computation
# group: [filter]

name Parallel Complex Filter
group micro
subgroup filter

load
CREATE TABLE integers AS SELECT * FROM range(100000000) tbl(i);
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934;

run
SELECT * FROM integers WHERE i IN (SELECT * FROM other_table)

result I
337
948247
17797934
20 changes: 20 additions & 0 deletions benchmark/micro/limit/parallel_limit.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# name: benchmark/micro/limit/parallel_limit.benchmark
# description: Benchmark of parallel limit computation
# group: [limit]

name Parallel Limit
group micro
subgroup limit

load
CREATE TABLE integers AS SELECT * FROM range(100000000) tbl(i);
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999

run
SELECT * FROM integers WHERE i IN (SELECT * FROM other_table) LIMIT 4

result I
337
948247
17797934
99999998
22 changes: 22 additions & 0 deletions benchmark/micro/limit/parallel_streaming_limit.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# name: benchmark/micro/limit/parallel_streaming_limit.benchmark
# description: Benchmark of parallel streaming limit computation
# group: [limit]

name Parallel Streaming Limit
group micro
subgroup limit

load
SET preserve_insertion_order=false;
CREATE TABLE integers AS SELECT i, 1 AS j FROM range(100000000) tbl(i);
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999


run
SELECT j FROM integers WHERE i IN (SELECT * FROM other_table) LIMIT 4

result I
1
1
1
1
24 changes: 24 additions & 0 deletions benchmark/micro/limit/parquet_parallel_limit.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# name: benchmark/micro/limit/parquet_parallel_limit.benchmark
# description: Benchmark of parallel limit computation
# group: [limit]

name Parallel Limit (Parquet)
group micro
subgroup limit

require parquet

load
CREATE TABLE tmp_integers AS SELECT * FROM range(100000000) tbl(i);
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999;
COPY tmp_integers TO '${BENCHMARK_DIR}/integers.parquet';
CREATE VIEW integers AS SELECT * FROM '${BENCHMARK_DIR}/integers.parquet';

run
SELECT * FROM integers WHERE i IN (SELECT * FROM other_table) LIMIT 4

result I
337
948247
17797934
99999998
24 changes: 24 additions & 0 deletions benchmark/micro/limit/parquet_parallel_limit_glob.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# name: benchmark/micro/limit/parquet_parallel_limit_glob.benchmark
# description: Benchmark of parallel limit computation
# group: [limit]

name Parallel Limit (Parquet Glob)
group micro
subgroup limit

require parquet

load
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999;
COPY (SELECT * FROM range(50000000) t(i)) TO '${BENCHMARK_DIR}/integers1.parquet';
COPY (SELECT * FROM range(50000000, 100000000) t(i)) TO '${BENCHMARK_DIR}/integers2.parquet';
CREATE VIEW integers AS SELECT * FROM parquet_scan(['${BENCHMARK_DIR}/integers1.parquet', '${BENCHMARK_DIR}/integers2.parquet']);

run
SELECT * FROM integers WHERE i IN (SELECT * FROM other_table) LIMIT 4

result I
337
948247
17797934
99999998
25 changes: 18 additions & 7 deletions extension/parquet/parquet-extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct ParquetReadOperatorData : public FunctionOperatorData {
shared_ptr<ParquetReader> reader;
ParquetReaderScanState scan_state;
bool is_parallel;
idx_t batch_index;
idx_t file_index;
vector<column_t> column_ids;
TableFilterSet *table_filters;
Expand All @@ -58,6 +59,7 @@ struct ParquetReadOperatorData : public FunctionOperatorData {
struct ParquetReadParallelState : public ParallelState {
mutex lock;
shared_ptr<ParquetReader> current_reader;
idx_t batch_index;
idx_t file_index;
idx_t row_group_index;
};
Expand All @@ -74,14 +76,11 @@ class ParquetScanFunction {
ParquetInitParallelState, ParquetScanFuncParallel, ParquetScanParallelInit,
ParquetParallelStateNext, true, true, ParquetProgress);
table_function.named_parameters["binary_as_string"] = LogicalType::BOOLEAN;
table_function.get_batch_index = ParquetScanGetBatchIndex;
table_function.supports_batch_index = true;
set.AddFunction(table_function);
table_function = TableFunction({LogicalType::LIST(LogicalType::VARCHAR)}, ParquetScanImplementation,
ParquetScanBindList, ParquetScanInit, /* statistics */ ParquetScanStats,
/* cleanup */ nullptr,
/* dependency */ nullptr, ParquetCardinality,
/* pushdown_complex_filter */ nullptr, /* to_string */ nullptr,
ParquetScanMaxThreads, ParquetInitParallelState, ParquetScanFuncParallel,
ParquetScanParallelInit, ParquetParallelStateNext, true, true, ParquetProgress);
table_function.arguments = {LogicalType::LIST(LogicalType::VARCHAR)};
table_function.bind = ParquetScanBindList;
table_function.named_parameters["binary_as_string"] = LogicalType::BOOLEAN;
set.AddFunction(table_function);
return set;
Expand Down Expand Up @@ -280,13 +279,20 @@ class ParquetScanFunction {
auto result = make_unique<ParquetReadOperatorData>();
result->column_ids = column_ids;
result->is_parallel = true;
result->batch_index = 0;
result->table_filters = filters->table_filters;
if (!ParquetParallelStateNext(context, bind_data_p, result.get(), parallel_state_p)) {
return nullptr;
}
return move(result);
}

static idx_t ParquetScanGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
FunctionOperatorData *operator_state, ParallelState *parallel_state_p) {
auto &data = (ParquetReadOperatorData &)*operator_state;
return data.batch_index;
}

static void ParquetScanImplementation(ClientContext &context, const FunctionData *bind_data_p,
FunctionOperatorData *operator_state, DataChunk &output) {
if (!operator_state) {
Expand Down Expand Up @@ -343,6 +349,7 @@ class ParquetScanFunction {
result->current_reader = bind_data.initial_reader;
result->row_group_index = 0;
result->file_index = 0;
result->batch_index = 0;
return move(result);
}

Expand All @@ -362,6 +369,8 @@ class ParquetScanFunction {
vector<idx_t> group_indexes {parallel_state.row_group_index};
scan_data.reader->InitializeScan(scan_data.scan_state, scan_data.column_ids, group_indexes,
scan_data.table_filters);
scan_data.batch_index = parallel_state.batch_index++;
scan_data.file_index = parallel_state.file_index;
parallel_state.row_group_index++;
return true;
} else {
Expand All @@ -381,6 +390,8 @@ class ParquetScanFunction {
vector<idx_t> group_indexes {0};
scan_data.reader->InitializeScan(scan_data.scan_state, scan_data.column_ids, group_indexes,
scan_data.table_filters);
scan_data.batch_index = parallel_state.batch_index++;
scan_data.file_index = parallel_state.file_index;
parallel_state.row_group_index = 1;
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/enums/physical_operator_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ string PhysicalOperatorToString(PhysicalOperatorType type) {
return "LIMIT";
case PhysicalOperatorType::LIMIT_PERCENT:
return "LIMIT_PERCENT";
case PhysicalOperatorType::STREAMING_LIMIT:
return "STREAMING_LIMIT";
case PhysicalOperatorType::RESERVOIR_SAMPLE:
return "RESERVOIR_SAMPLE";
case PhysicalOperatorType::STREAMING_SAMPLE:
Expand Down Expand Up @@ -117,6 +119,8 @@ string PhysicalOperatorToString(PhysicalOperatorType type) {
return "INOUT_FUNCTION";
case PhysicalOperatorType::CREATE_TYPE:
return "CREATE_TYPE";
case PhysicalOperatorType::RESULT_COLLECTOR:
return "RESULT_COLLECTOR";
case PhysicalOperatorType::INVALID:
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/common/types/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_library_unity(
duckdb_common_types
OBJECT
batched_chunk_collection.cpp
blob.cpp
cast_helpers.cpp
chunk_collection.cpp
Expand Down
71 changes: 71 additions & 0 deletions src/common/types/batched_chunk_collection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include "duckdb/common/types/batched_chunk_collection.hpp"
#include "duckdb/common/printer.hpp"

namespace duckdb {

BatchedChunkCollection::BatchedChunkCollection() {
}

void BatchedChunkCollection::Append(DataChunk &input, idx_t batch_index) {
D_ASSERT(batch_index != DConstants::INVALID_INDEX);
auto entry = data.find(batch_index);
ChunkCollection *collection;
if (entry == data.end()) {
auto new_collection = make_unique<ChunkCollection>();
collection = new_collection.get();
data.insert(make_pair(batch_index, move(new_collection)));
} else {
collection = entry->second.get();
}
collection->Append(input);
}

void BatchedChunkCollection::Merge(BatchedChunkCollection &other) {
for (auto &entry : other.data) {
if (data.find(entry.first) != data.end()) {
throw InternalException(
"BatchChunkCollection::Merge error - batch index %d is present in both collections. This occurs when "
"batch indexes are not uniquely distributed over threads",
entry.first);
}
data[entry.first] = move(entry.second);
}
other.data.clear();
}

void BatchedChunkCollection::InitializeScan(BatchedChunkScanState &state) {
state.iterator = data.begin();
state.chunk_index = 0;
}

void BatchedChunkCollection::Scan(BatchedChunkScanState &state, DataChunk &output) {
while (state.iterator != data.end()) {
// check if there is a chunk remaining in this collection
auto collection = state.iterator->second.get();
if (state.chunk_index < collection->ChunkCount()) {
// there is! increment the chunk count
output.Reference(collection->GetChunk(state.chunk_index));
state.chunk_index++;
return;
}
// there isn't! move to the next collection
state.iterator++;
state.chunk_index = 0;
}
}

string BatchedChunkCollection::ToString() const {
string result;
result += "Batched Chunk Collection\n";
for (auto &entry : data) {
result += "Batch Index - " + to_string(entry.first) + "\n";
result += entry.second->ToString() + "\n\n";
}
return result;
}

void BatchedChunkCollection::Print() const {
Printer::Print(ToString());
}

} // namespace duckdb
7 changes: 6 additions & 1 deletion src/common/types/chunk_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ void ChunkCollection::CopyCell(idx_t column, idx_t index, Vector &target, idx_t
VectorOperations::Copy(source, target, source_offset + 1, source_offset, target_offset);
}

void ChunkCollection::Print() {
string ChunkCollection::ToString() const {
return chunks.empty() ? "ChunkCollection [ 0 ]"
: "ChunkCollection [ " + std::to_string(count) + " ]: \n" + chunks[0]->ToString();
}

void ChunkCollection::Print() const {
Printer::Print(ToString());
}

Expand Down
Loading

0 comments on commit a25b6e3

Please sign in to comment.