Skip to content

Commit

Permalink
chore: Update vendored sources to duckdb/duckdb@a20a91c
Browse files Browse the repository at this point in the history
Merge pull request duckdb/duckdb#10650 from hannes/noprintf
Merge pull request duckdb/duckdb#10658 from hannes/csvpathlength
Merge pull request duckdb/duckdb#10245 from Tishj/parallel_streaming_query_result
  • Loading branch information
krlmlr committed Feb 27, 2024
1 parent 4a82721 commit f4e3bd7
Show file tree
Hide file tree
Showing 48 changed files with 1,000 additions and 368 deletions.
2 changes: 1 addition & 1 deletion src/Makevars.win
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ include Makevars.duckdb
CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -Iduckdb/src/include -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/re2 -Iduckdb/third_party/miniz -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/utf8proc -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/skiplist -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/tdigest -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/libpg_query -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/pcg -Iduckdb/third_party/httplib -Iduckdb/third_party/fast_float -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_PLATFORM_RTOOLS=1
OBJECTS=database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
PKG_LIBS=-lws2_32 -lrstrtmgr
PKG_LIBS=-lws2_32
5 changes: 5 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4138,6 +4138,8 @@ const char* EnumUtil::ToChars<PendingExecutionResult>(PendingExecutionResult val
return "RESULT_NOT_READY";
case PendingExecutionResult::EXECUTION_ERROR:
return "EXECUTION_ERROR";
case PendingExecutionResult::BLOCKED:
return "BLOCKED";
case PendingExecutionResult::NO_TASKS_AVAILABLE:
return "NO_TASKS_AVAILABLE";
default:
Expand All @@ -4156,6 +4158,9 @@ PendingExecutionResult EnumUtil::FromString<PendingExecutionResult>(const char *
if (StringUtil::Equals(value, "EXECUTION_ERROR")) {
return PendingExecutionResult::EXECUTION_ERROR;
}
if (StringUtil::Equals(value, "BLOCKED")) {
return PendingExecutionResult::BLOCKED;
}
if (StringUtil::Equals(value, "NO_TASKS_AVAILABLE")) {
return PendingExecutionResult::NO_TASKS_AVAILABLE;
}
Expand Down
15 changes: 15 additions & 0 deletions src/duckdb/src/common/types/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ void Vector::ReferenceAndSetType(const Vector &other) {

void Vector::Reinterpret(const Vector &other) {
vector_type = other.vector_type;
#ifdef DEBUG
auto &this_type = GetType();
auto &other_type = other.GetType();

auto type_is_same = other_type == this_type;
bool this_is_nested = this_type.IsNested();
bool other_is_nested = other_type.IsNested();

bool not_nested = this_is_nested == false && other_is_nested == false;
bool type_size_equal = GetTypeIdSize(this_type.InternalType()) == GetTypeIdSize(other_type.InternalType());
//! Either the types are completely identical, or they are not nested and their physical type size is the same
//! The reason nested types are not allowed is because copying the auxiliary buffer does not happen recursively
//! e.g DOUBLE[] to BIGINT[], the type of the LIST would say BIGINT but the child Vector says DOUBLE
D_ASSERT((not_nested && type_size_equal) || type_is_same);
#endif
AssignSharedPointer(buffer, other.buffer);
AssignSharedPointer(auxiliary, other.auxiliary);
data = other.data;
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/src/core_functions/scalar/map/map_entries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace duckdb {

// Reverse of map_from_entries
static void MapEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) {
idx_t count = args.size();
auto count = args.size();

result.Reinterpret(args.data[0]);
MapUtil::ReinterpretMap(result, args.data[0], count);

if (args.AllConstant()) {
result.SetVectorType(VectorType::CONSTANT_VECTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace duckdb {
static void MapFromEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) {
auto count = args.size();

result.Reinterpret(args.data[0]);

MapUtil::ReinterpretMap(result, args.data[0], count);
MapVector::MapConversionVerify(result, count);
result.Verify(count);

Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/core_functions/scalar/string/parse_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static void TrimPathFunction(DataChunk &args, ExpressionState &state, Vector &re
// set default values
Vector &path = args.data[0];
Vector separator(string_t("default"));
Vector trim_extension(false);
Vector trim_extension(Value::BOOLEAN(false));
ReadOptionalArgs(args, separator, trim_extension, FRONT_TRIM);

TernaryExecutor::Execute<string_t, string_t, bool, string_t>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
#include "duckdb/planner/expression/bound_constant_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/common/optional_idx.hpp"

namespace duckdb {

Expand Down Expand Up @@ -545,14 +546,22 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask {
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override;

private:
void AggregateDistinctGrouping(const idx_t grouping_idx);
TaskExecutionResult AggregateDistinctGrouping(const idx_t grouping_idx);

private:
Pipeline &pipeline;
shared_ptr<Event> event;

const PhysicalHashAggregate &op;
HashAggregateGlobalSinkState &gstate;

unique_ptr<LocalSinkState> local_sink_state;
idx_t grouping_idx = 0;
unique_ptr<LocalSourceState> radix_table_lstate;
bool blocked = false;
idx_t aggregation_idx = 0;
idx_t payload_idx = 0;
idx_t next_payload_idx = 0;
};

void HashAggregateDistinctFinalizeEvent::Schedule() {
Expand Down Expand Up @@ -604,14 +613,22 @@ void HashAggregateDistinctFinalizeEvent::FinishEvent() {
}

TaskExecutionResult HashAggregateDistinctFinalizeTask::ExecuteTask(TaskExecutionMode mode) {
for (idx_t grouping_idx = 0; grouping_idx < op.groupings.size(); grouping_idx++) {
AggregateDistinctGrouping(grouping_idx);
for (; grouping_idx < op.groupings.size(); grouping_idx++) {
auto res = AggregateDistinctGrouping(grouping_idx);
if (res == TaskExecutionResult::TASK_BLOCKED) {
return res;
}
D_ASSERT(res == TaskExecutionResult::TASK_FINISHED);
aggregation_idx = 0;
payload_idx = 0;
next_payload_idx = 0;
local_sink_state = nullptr;
}
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) {
TaskExecutionResult HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) {
D_ASSERT(op.distinct_collection_info);
auto &info = *op.distinct_collection_info;

Expand All @@ -628,9 +645,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
ExecutionContext execution_context(executor.context, thread_context, &pipeline);

// Sink state to sink into global HTs
InterruptState interrupt_state;
InterruptState interrupt_state(shared_from_this());
auto &global_sink_state = *grouping_state.table_state;
auto local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context);
if (!local_sink_state) {
local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context);
}
OperatorSinkInput sink_input {global_sink_state, *local_sink_state, interrupt_state};

// Create a chunk that mimics the 'input' chunk in Sink, for storing the group vectors
Expand All @@ -639,24 +658,24 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
group_chunk.Initialize(executor.context, op.input_group_types);
}

auto &groups = op.grouped_aggregate_data.groups;
const idx_t group_by_size = groups.size();
const idx_t group_by_size = op.grouped_aggregate_data.groups.size();

DataChunk aggregate_input_chunk;
if (!gstate.payload_types.empty()) {
aggregate_input_chunk.Initialize(executor.context, gstate.payload_types);
}

auto &finalize_event = event->Cast<HashAggregateDistinctFinalizeEvent>();
const auto &finalize_event = event->Cast<HashAggregateDistinctFinalizeEvent>();

idx_t payload_idx;
idx_t next_payload_idx = 0;
for (idx_t agg_idx = 0; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) {
auto &agg_idx = aggregation_idx;
for (; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) {
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();

// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();
if (!blocked) {
// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();
}

// If aggregate is not distinct, skip it
if (!distinct_data.IsDistinct(agg_idx)) {
Expand All @@ -668,8 +687,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
auto &radix_table = distinct_data.radix_tables[table_idx];

auto &sink = *distinct_state.radix_states[table_idx];
auto local_source = radix_table->GetLocalSourceState(execution_context);
OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], *local_source,
if (!blocked) {
radix_table_lstate = radix_table->GetLocalSourceState(execution_context);
}
auto &local_source = *radix_table_lstate;
OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], local_source,
interrupt_state};

// Create a duplicate of the output_chunk, because of multi-threading we cant alter the original
Expand All @@ -687,8 +709,8 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
D_ASSERT(output_chunk.size() == 0);
break;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException(
"Unexpected interrupt from radix table GetData in HashAggregateDistinctFinalizeTask");
blocked = true;
return TaskExecutionResult::TASK_BLOCKED;
}

auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx];
Expand All @@ -708,8 +730,10 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
// Sink it into the main ht
grouping_data.table_data.Sink(execution_context, group_chunk, sink_input, aggregate_input_chunk, {agg_idx});
}
blocked = false;
}
grouping_data.table_data.Combine(execution_context, global_sink_state, *local_sink_state);
return TaskExecutionResult::TASK_FINISHED;
}

SinkFinalizeType PhysicalHashAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context,
Expand Down Expand Up @@ -809,6 +833,7 @@ class HashAggregateLocalSourceState : public LocalSourceState {
}
}

optional_idx radix_idx;
vector<unique_ptr<LocalSourceState>> radix_states;
};

Expand All @@ -823,32 +848,37 @@ SourceResultType PhysicalHashAggregate::GetData(ExecutionContext &context, DataC
auto &gstate = input.global_state.Cast<HashAggregateGlobalSourceState>();
auto &lstate = input.local_state.Cast<HashAggregateLocalSourceState>();
while (true) {
idx_t radix_idx = gstate.state_index;
if (!lstate.radix_idx.IsValid()) {
lstate.radix_idx = gstate.state_index.load();
}
const auto radix_idx = lstate.radix_idx.GetIndex();
if (radix_idx >= groupings.size()) {
break;
}

auto &grouping = groupings[radix_idx];
auto &radix_table = grouping.table_data;
auto &grouping_gstate = sink_gstate.grouping_states[radix_idx];

InterruptState interrupt_state;
OperatorSourceInput source_input {*gstate.radix_states[radix_idx], *lstate.radix_states[radix_idx],
interrupt_state};
input.interrupt_state};
auto res = radix_table.GetData(context, chunk, *grouping_gstate.table_state, source_input);
if (res == SourceResultType::BLOCKED) {
return res;
}
if (chunk.size() != 0) {
return SourceResultType::HAVE_MORE_OUTPUT;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException("Unexpectedly Blocked from radix_table");
}

// move to the next table
lock_guard<mutex> l(gstate.lock);
radix_idx++;
if (radix_idx > gstate.state_index) {
lstate.radix_idx = lstate.radix_idx.GetIndex() + 1;
if (lstate.radix_idx.GetIndex() > gstate.state_index) {
// we have not yet worked on the table
// move the global index forwards
gstate.state_index = radix_idx;
gstate.state_index = lstate.radix_idx.GetIndex();
}
lstate.radix_idx = gstate.state_index.load();
}

return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
Expand Down

0 comments on commit f4e3bd7

Please sign in to comment.