diff --git a/src/Makevars.win b/src/Makevars.win index d45a12aa4..629ca4519 100644 --- a/src/Makevars.win +++ b/src/Makevars.win @@ -15,4 +15,4 @@ include Makevars.duckdb CXX_STD = CXX17 PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -Iduckdb/src/include -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/re2 -Iduckdb/third_party/miniz -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/utf8proc -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/skiplist -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/tdigest -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/libpg_query -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/pcg -Iduckdb/third_party/httplib -Iduckdb/third_party/fast_float -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_PLATFORM_RTOOLS=1 OBJECTS=database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) -PKG_LIBS=-lws2_32 -lrstrtmgr +PKG_LIBS=-lws2_32 diff --git a/src/duckdb/src/common/enum_util.cpp b/src/duckdb/src/common/enum_util.cpp index 2c5c2568f..27b4364d4 100644 --- a/src/duckdb/src/common/enum_util.cpp +++ b/src/duckdb/src/common/enum_util.cpp @@ -4138,6 +4138,8 @@ const char* EnumUtil::ToChars(PendingExecutionResult val return "RESULT_NOT_READY"; case PendingExecutionResult::EXECUTION_ERROR: return "EXECUTION_ERROR"; + case PendingExecutionResult::BLOCKED: + return "BLOCKED"; case PendingExecutionResult::NO_TASKS_AVAILABLE: return "NO_TASKS_AVAILABLE"; default: @@ -4156,6 +4158,9 @@ PendingExecutionResult EnumUtil::FromString(const char * if (StringUtil::Equals(value, "EXECUTION_ERROR")) { return PendingExecutionResult::EXECUTION_ERROR; } + if (StringUtil::Equals(value, "BLOCKED")) { + return PendingExecutionResult::BLOCKED; + } if (StringUtil::Equals(value, "NO_TASKS_AVAILABLE")) { return PendingExecutionResult::NO_TASKS_AVAILABLE; } diff --git a/src/duckdb/src/common/types/vector.cpp b/src/duckdb/src/common/types/vector.cpp index f9db436e0..2884ea911 100644 --- a/src/duckdb/src/common/types/vector.cpp +++ b/src/duckdb/src/common/types/vector.cpp @@ -118,6 +118,21 @@ void Vector::ReferenceAndSetType(const Vector &other) { void Vector::Reinterpret(const Vector &other) { vector_type = other.vector_type; +#ifdef DEBUG + auto &this_type = GetType(); + auto &other_type = other.GetType(); + + auto type_is_same = other_type == this_type; + bool this_is_nested = this_type.IsNested(); + bool other_is_nested = other_type.IsNested(); + + bool not_nested = this_is_nested == false && other_is_nested == false; + bool type_size_equal = GetTypeIdSize(this_type.InternalType()) == GetTypeIdSize(other_type.InternalType()); + //! Either the types are completely identical, or they are not nested and their physical type size is the same + //! The reason nested types are not allowed is because copying the auxiliary buffer does not happen recursively + //! e.g DOUBLE[] to BIGINT[], the type of the LIST would say BIGINT but the child Vector says DOUBLE + D_ASSERT((not_nested && type_size_equal) || type_is_same); +#endif AssignSharedPointer(buffer, other.buffer); AssignSharedPointer(auxiliary, other.auxiliary); data = other.data; diff --git a/src/duckdb/src/core_functions/scalar/map/map_entries.cpp b/src/duckdb/src/core_functions/scalar/map/map_entries.cpp index caaeccee2..7629c7896 100644 --- a/src/duckdb/src/core_functions/scalar/map/map_entries.cpp +++ b/src/duckdb/src/core_functions/scalar/map/map_entries.cpp @@ -10,9 +10,9 @@ namespace duckdb { // Reverse of map_from_entries static void MapEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) { - idx_t count = args.size(); + auto count = args.size(); - result.Reinterpret(args.data[0]); + MapUtil::ReinterpretMap(result, args.data[0], count); if (args.AllConstant()) { result.SetVectorType(VectorType::CONSTANT_VECTOR); diff --git a/src/duckdb/src/core_functions/scalar/map/map_from_entries.cpp b/src/duckdb/src/core_functions/scalar/map/map_from_entries.cpp index be79503ed..fbaf1663d 100644 --- a/src/duckdb/src/core_functions/scalar/map/map_from_entries.cpp +++ b/src/duckdb/src/core_functions/scalar/map/map_from_entries.cpp @@ -10,8 +10,7 @@ namespace duckdb { static void MapFromEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) { auto count = args.size(); - result.Reinterpret(args.data[0]); - + MapUtil::ReinterpretMap(result, args.data[0], count); MapVector::MapConversionVerify(result, count); result.Verify(count); diff --git a/src/duckdb/src/core_functions/scalar/string/parse_path.cpp b/src/duckdb/src/core_functions/scalar/string/parse_path.cpp index e6ce3066a..cc304d512 100644 --- a/src/duckdb/src/core_functions/scalar/string/parse_path.cpp +++ b/src/duckdb/src/core_functions/scalar/string/parse_path.cpp @@ -175,7 +175,7 @@ static void TrimPathFunction(DataChunk &args, ExpressionState &state, Vector &re // set default values Vector &path = args.data[0]; Vector separator(string_t("default")); - Vector trim_extension(false); + Vector trim_extension(Value::BOOLEAN(false)); ReadOptionalArgs(args, separator, trim_extension, FRONT_TRIM); TernaryExecutor::Execute( diff --git a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp index 3b62fc69f..e6bd35875 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -14,6 +14,7 @@ #include "duckdb/planner/expression/bound_aggregate_expression.hpp" #include "duckdb/planner/expression/bound_constant_expression.hpp" #include "duckdb/planner/expression/bound_reference_expression.hpp" +#include "duckdb/common/optional_idx.hpp" namespace duckdb { @@ -545,7 +546,7 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask { TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; private: - void AggregateDistinctGrouping(const idx_t grouping_idx); + TaskExecutionResult AggregateDistinctGrouping(const idx_t grouping_idx); private: Pipeline &pipeline; @@ -553,6 +554,14 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask { const PhysicalHashAggregate &op; HashAggregateGlobalSinkState &gstate; + + unique_ptr local_sink_state; + idx_t grouping_idx = 0; + unique_ptr radix_table_lstate; + bool blocked = false; + idx_t aggregation_idx = 0; + idx_t payload_idx = 0; + idx_t next_payload_idx = 0; }; void HashAggregateDistinctFinalizeEvent::Schedule() { @@ -604,14 +613,22 @@ void HashAggregateDistinctFinalizeEvent::FinishEvent() { } TaskExecutionResult HashAggregateDistinctFinalizeTask::ExecuteTask(TaskExecutionMode mode) { - for (idx_t grouping_idx = 0; grouping_idx < op.groupings.size(); grouping_idx++) { - AggregateDistinctGrouping(grouping_idx); + for (; grouping_idx < op.groupings.size(); grouping_idx++) { + auto res = AggregateDistinctGrouping(grouping_idx); + if (res == TaskExecutionResult::TASK_BLOCKED) { + return res; + } + D_ASSERT(res == TaskExecutionResult::TASK_FINISHED); + aggregation_idx = 0; + payload_idx = 0; + next_payload_idx = 0; + local_sink_state = nullptr; } event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } -void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) { +TaskExecutionResult HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) { D_ASSERT(op.distinct_collection_info); auto &info = *op.distinct_collection_info; @@ -628,9 +645,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr ExecutionContext execution_context(executor.context, thread_context, &pipeline); // Sink state to sink into global HTs - InterruptState interrupt_state; + InterruptState interrupt_state(shared_from_this()); auto &global_sink_state = *grouping_state.table_state; - auto local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context); + if (!local_sink_state) { + local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context); + } OperatorSinkInput sink_input {global_sink_state, *local_sink_state, interrupt_state}; // Create a chunk that mimics the 'input' chunk in Sink, for storing the group vectors @@ -639,24 +658,24 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr group_chunk.Initialize(executor.context, op.input_group_types); } - auto &groups = op.grouped_aggregate_data.groups; - const idx_t group_by_size = groups.size(); + const idx_t group_by_size = op.grouped_aggregate_data.groups.size(); DataChunk aggregate_input_chunk; if (!gstate.payload_types.empty()) { aggregate_input_chunk.Initialize(executor.context, gstate.payload_types); } - auto &finalize_event = event->Cast(); + const auto &finalize_event = event->Cast(); - idx_t payload_idx; - idx_t next_payload_idx = 0; - for (idx_t agg_idx = 0; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) { + auto &agg_idx = aggregation_idx; + for (; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) { auto &aggregate = aggregates[agg_idx]->Cast(); - // Forward the payload idx - payload_idx = next_payload_idx; - next_payload_idx = payload_idx + aggregate.children.size(); + if (!blocked) { + // Forward the payload idx + payload_idx = next_payload_idx; + next_payload_idx = payload_idx + aggregate.children.size(); + } // If aggregate is not distinct, skip it if (!distinct_data.IsDistinct(agg_idx)) { @@ -668,8 +687,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr auto &radix_table = distinct_data.radix_tables[table_idx]; auto &sink = *distinct_state.radix_states[table_idx]; - auto local_source = radix_table->GetLocalSourceState(execution_context); - OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], *local_source, + if (!blocked) { + radix_table_lstate = radix_table->GetLocalSourceState(execution_context); + } + auto &local_source = *radix_table_lstate; + OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], local_source, interrupt_state}; // Create a duplicate of the output_chunk, because of multi-threading we cant alter the original @@ -687,8 +709,8 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr D_ASSERT(output_chunk.size() == 0); break; } else if (res == SourceResultType::BLOCKED) { - throw InternalException( - "Unexpected interrupt from radix table GetData in HashAggregateDistinctFinalizeTask"); + blocked = true; + return TaskExecutionResult::TASK_BLOCKED; } auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx]; @@ -708,8 +730,10 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr // Sink it into the main ht grouping_data.table_data.Sink(execution_context, group_chunk, sink_input, aggregate_input_chunk, {agg_idx}); } + blocked = false; } grouping_data.table_data.Combine(execution_context, global_sink_state, *local_sink_state); + return TaskExecutionResult::TASK_FINISHED; } SinkFinalizeType PhysicalHashAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context, @@ -809,6 +833,7 @@ class HashAggregateLocalSourceState : public LocalSourceState { } } + optional_idx radix_idx; vector> radix_states; }; @@ -823,32 +848,37 @@ SourceResultType PhysicalHashAggregate::GetData(ExecutionContext &context, DataC auto &gstate = input.global_state.Cast(); auto &lstate = input.local_state.Cast(); while (true) { - idx_t radix_idx = gstate.state_index; + if (!lstate.radix_idx.IsValid()) { + lstate.radix_idx = gstate.state_index.load(); + } + const auto radix_idx = lstate.radix_idx.GetIndex(); if (radix_idx >= groupings.size()) { break; } + auto &grouping = groupings[radix_idx]; auto &radix_table = grouping.table_data; auto &grouping_gstate = sink_gstate.grouping_states[radix_idx]; - InterruptState interrupt_state; OperatorSourceInput source_input {*gstate.radix_states[radix_idx], *lstate.radix_states[radix_idx], - interrupt_state}; + input.interrupt_state}; auto res = radix_table.GetData(context, chunk, *grouping_gstate.table_state, source_input); + if (res == SourceResultType::BLOCKED) { + return res; + } if (chunk.size() != 0) { return SourceResultType::HAVE_MORE_OUTPUT; - } else if (res == SourceResultType::BLOCKED) { - throw InternalException("Unexpectedly Blocked from radix_table"); } // move to the next table lock_guard l(gstate.lock); - radix_idx++; - if (radix_idx > gstate.state_index) { + lstate.radix_idx = lstate.radix_idx.GetIndex() + 1; + if (lstate.radix_idx.GetIndex() > gstate.state_index) { // we have not yet worked on the table // move the global index forwards - gstate.state_index = radix_idx; + gstate.state_index = lstate.radix_idx.GetIndex(); } + lstate.radix_idx = gstate.state_index.load(); } return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; diff --git a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp index c72709ba0..693d17a03 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp @@ -403,13 +403,13 @@ class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { const PhysicalUngroupedAggregate &op, UngroupedAggregateGlobalSinkState &state_p) : ExecutorTask(executor), event(std::move(event_p)), op(op), gstate(state_p), - allocator(gstate.CreateAllocator()) { + allocator(gstate.CreateAllocator()), aggregate_state(op.aggregates) { } TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; private: - void AggregateDistinct(); + TaskExecutionResult AggregateDistinct(); private: shared_ptr event; @@ -418,6 +418,12 @@ class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { UngroupedAggregateGlobalSinkState &gstate; ArenaAllocator &allocator; + + // Distinct aggregation state + AggregateState aggregate_state; + idx_t aggregation_idx = 0; + unique_ptr radix_table_lstate; + bool blocked = false; }; void UngroupedDistinctAggregateFinalizeEvent::Schedule() { @@ -460,19 +466,21 @@ void UngroupedDistinctAggregateFinalizeEvent::Schedule() { } TaskExecutionResult UngroupedDistinctAggregateFinalizeTask::ExecuteTask(TaskExecutionMode mode) { - AggregateDistinct(); + auto res = AggregateDistinct(); + if (res == TaskExecutionResult::TASK_BLOCKED) { + return res; + } event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } -void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { +TaskExecutionResult UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { D_ASSERT(gstate.distinct_state); auto &distinct_state = *gstate.distinct_state; auto &distinct_data = *op.distinct_data; - // Create thread-local copy of aggregate state auto &aggregates = op.aggregates; - AggregateState state(aggregates); + auto &state = aggregate_state; // Thread-local contexts ThreadContext thread_context(executor.context); @@ -481,14 +489,12 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { auto &finalize_event = event->Cast(); // Now loop through the distinct aggregates, scanning the distinct HTs - idx_t payload_idx = 0; - idx_t next_payload_idx = 0; - for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) { - auto &aggregate = aggregates[agg_idx]->Cast(); - // Forward the payload idx - payload_idx = next_payload_idx; - next_payload_idx = payload_idx + aggregate.children.size(); + // This needs to be preserved in case the radix_table.GetData blocks + auto &agg_idx = aggregation_idx; + + for (; agg_idx < aggregates.size(); agg_idx++) { + auto &aggregate = aggregates[agg_idx]->Cast(); // If aggregate is not distinct, skip it if (!distinct_data.IsDistinct(agg_idx)) { @@ -497,11 +503,15 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { const auto table_idx = distinct_data.info.table_map.at(agg_idx); auto &radix_table = *distinct_data.radix_tables[table_idx]; - auto lstate = radix_table.GetLocalSourceState(execution_context); + if (!blocked) { + // Because we can block, we need to make sure we preserve this state + radix_table_lstate = radix_table.GetLocalSourceState(execution_context); + } + auto &lstate = *radix_table_lstate; auto &sink = *distinct_state.radix_states[table_idx]; - InterruptState interrupt_state; - OperatorSourceInput source_input {*finalize_event.global_source_states[agg_idx], *lstate, interrupt_state}; + InterruptState interrupt_state(shared_from_this()); + OperatorSourceInput source_input {*finalize_event.global_source_states[agg_idx], lstate, interrupt_state}; DataChunk output_chunk; output_chunk.Initialize(executor.context, distinct_state.distinct_output_chunks[table_idx]->GetTypes()); @@ -519,8 +529,8 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { D_ASSERT(output_chunk.size() == 0); break; } else if (res == SourceResultType::BLOCKED) { - throw InternalException( - "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask"); + blocked = true; + return TaskExecutionResult::TASK_BLOCKED; } // We dont need to resolve the filter, we already did this in Sink @@ -539,12 +549,11 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, state.aggregates[agg_idx].get(), payload_chunk.size()); } + blocked = false; } // After scanning the distinct HTs, we can combine the thread-local agg states with the thread-global lock_guard guard(finalize_event.lock); - payload_idx = 0; - next_payload_idx = 0; for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) { if (!distinct_data.IsDistinct(agg_idx)) { continue; @@ -563,6 +572,7 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { if (++finalize_event.tasks_done == finalize_event.tasks_scheduled) { gstate.finished = true; } + return TaskExecutionResult::TASK_FINISHED; } SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context, diff --git a/src/duckdb/src/execution/operator/helper/physical_buffered_collector.cpp b/src/duckdb/src/execution/operator/helper/physical_buffered_collector.cpp new file mode 100644 index 000000000..fcf75496e --- /dev/null +++ b/src/duckdb/src/execution/operator/helper/physical_buffered_collector.cpp @@ -0,0 +1,85 @@ +#include "duckdb/execution/operator/helper/physical_buffered_collector.hpp" +#include "duckdb/main/stream_query_result.hpp" +#include "duckdb/main/client_context.hpp" + +namespace duckdb { + +PhysicalBufferedCollector::PhysicalBufferedCollector(PreparedStatementData &data, bool parallel) + : PhysicalResultCollector(data), parallel(parallel) { +} + +//===--------------------------------------------------------------------===// +// Sink +//===--------------------------------------------------------------------===// +class BufferedCollectorGlobalState : public GlobalSinkState { +public: + mutex glock; + //! This is weak to avoid creating a cyclical reference + weak_ptr context; + shared_ptr buffered_data; +}; + +class BufferedCollectorLocalState : public LocalSinkState { +public: + bool blocked = false; +}; + +SinkResultType PhysicalBufferedCollector::Sink(ExecutionContext &context, DataChunk &chunk, + OperatorSinkInput &input) const { + auto &gstate = input.global_state.Cast(); + auto &lstate = input.local_state.Cast(); + + lock_guard l(gstate.glock); + auto &buffered_data = gstate.buffered_data->Cast(); + + if (!lstate.blocked || buffered_data.BufferIsFull()) { + lstate.blocked = true; + auto callback_state = input.interrupt_state; + auto blocked_sink = BlockedSink(callback_state, chunk.size()); + buffered_data.BlockSink(blocked_sink); + return SinkResultType::BLOCKED; + } + + auto to_append = make_uniq(); + to_append->Initialize(Allocator::DefaultAllocator(), chunk.GetTypes()); + chunk.Copy(*to_append, 0); + buffered_data.Append(std::move(to_append)); + return SinkResultType::NEED_MORE_INPUT; +} + +SinkCombineResultType PhysicalBufferedCollector::Combine(ExecutionContext &context, + OperatorSinkCombineInput &input) const { + return SinkCombineResultType::FINISHED; +} + +unique_ptr PhysicalBufferedCollector::GetGlobalSinkState(ClientContext &context) const { + auto state = make_uniq(); + state->context = context.shared_from_this(); + state->buffered_data = make_shared(state->context); + return std::move(state); +} + +unique_ptr PhysicalBufferedCollector::GetLocalSinkState(ExecutionContext &context) const { + auto state = make_uniq(); + return std::move(state); +} + +unique_ptr PhysicalBufferedCollector::GetResult(GlobalSinkState &state) { + auto &gstate = state.Cast(); + lock_guard l(gstate.glock); + // FIXME: maybe we want to check if the execution was successfull before creating the StreamQueryResult ? + auto cc = gstate.context.lock(); + auto result = make_uniq(statement_type, properties, types, names, cc->GetClientProperties(), + gstate.buffered_data); + return std::move(result); +} + +bool PhysicalBufferedCollector::ParallelSink() const { + return parallel; +} + +bool PhysicalBufferedCollector::SinkOrderDependent() const { + return true; +} + +} // namespace duckdb diff --git a/src/duckdb/src/execution/operator/helper/physical_result_collector.cpp b/src/duckdb/src/execution/operator/helper/physical_result_collector.cpp index e0ac959a7..8b2bbdf8e 100644 --- a/src/duckdb/src/execution/operator/helper/physical_result_collector.cpp +++ b/src/duckdb/src/execution/operator/helper/physical_result_collector.cpp @@ -2,6 +2,7 @@ #include "duckdb/execution/operator/helper/physical_batch_collector.hpp" #include "duckdb/execution/operator/helper/physical_materialized_collector.hpp" +#include "duckdb/execution/operator/helper/physical_buffered_collector.hpp" #include "duckdb/execution/physical_plan_generator.hpp" #include "duckdb/main/config.hpp" #include "duckdb/main/prepared_statement_data.hpp" @@ -20,13 +21,22 @@ unique_ptr PhysicalResultCollector::GetResultCollector( PreparedStatementData &data) { if (!PhysicalPlanGenerator::PreserveInsertionOrder(context, *data.plan)) { // the plan is not order preserving, so we just use the parallel materialized collector + if (data.is_streaming) { + return make_uniq_base(data, true); + } return make_uniq_base(data, true); } else if (!PhysicalPlanGenerator::UseBatchIndex(context, *data.plan)) { // the plan is order preserving, but we cannot use the batch index: use a single-threaded result collector + if (data.is_streaming) { + return make_uniq_base(data, false); + } return make_uniq_base(data, false); } else { // we care about maintaining insertion order and the sources all support batch indexes // use a batch collector + if (data.is_streaming) { + return make_uniq_base(data, false); + } return make_uniq_base(data); } } diff --git a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp index 5e72b2065..6cb658f97 100644 --- a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp @@ -13,6 +13,7 @@ #include "duckdb/planner/expression/bound_reference_expression.hpp" #include "duckdb/storage/buffer_manager.hpp" #include "duckdb/storage/storage_manager.hpp" +#include "duckdb/parallel/interrupt.hpp" #include "duckdb/storage/temporary_memory_manager.hpp" namespace duckdb { @@ -614,7 +615,7 @@ class HashJoinGlobalSourceState : public GlobalSourceState { //! Initialize this source state using the info in the sink void Initialize(HashJoinGlobalSinkState &sink); //! Try to prepare the next stage - void TryPrepareNextStage(HashJoinGlobalSinkState &sink); + bool TryPrepareNextStage(HashJoinGlobalSinkState &sink); //! Prepare the next build/probe/scan_ht stage for external hash join (must hold lock) void PrepareBuild(HashJoinGlobalSinkState &sink); void PrepareProbe(HashJoinGlobalSinkState &sink); @@ -663,6 +664,8 @@ class HashJoinGlobalSourceState : public GlobalSourceState { idx_t full_outer_chunk_count; idx_t full_outer_chunk_done; idx_t full_outer_chunks_per_thread; + + vector blocked_tasks; }; class HashJoinLocalSourceState : public LocalSourceState { @@ -739,13 +742,14 @@ void HashJoinGlobalSourceState::Initialize(HashJoinGlobalSinkState &sink) { TryPrepareNextStage(sink); } -void HashJoinGlobalSourceState::TryPrepareNextStage(HashJoinGlobalSinkState &sink) { +bool HashJoinGlobalSourceState::TryPrepareNextStage(HashJoinGlobalSinkState &sink) { switch (global_stage.load()) { case HashJoinSourceStage::BUILD: if (build_chunk_done == build_chunk_count) { sink.hash_table->GetDataCollection().VerifyEverythingPinned(); sink.hash_table->finalized = true; PrepareProbe(sink); + return true; } break; case HashJoinSourceStage::PROBE: @@ -755,16 +759,19 @@ void HashJoinGlobalSourceState::TryPrepareNextStage(HashJoinGlobalSinkState &sin } else { PrepareBuild(sink); } + return true; } break; case HashJoinSourceStage::SCAN_HT: if (full_outer_chunk_done == full_outer_chunk_count) { PrepareBuild(sink); + return true; } break; default: break; } + return false; } void HashJoinGlobalSourceState::PrepareBuild(HashJoinGlobalSinkState &sink) { @@ -1014,7 +1021,15 @@ SourceResultType PhysicalHashJoin::GetData(ExecutionContext &context, DataChunk lstate.ExecuteTask(sink, gstate, chunk); } else { lock_guard guard(gstate.lock); - gstate.TryPrepareNextStage(sink); + if (gstate.TryPrepareNextStage(sink) || gstate.global_stage == HashJoinSourceStage::DONE) { + for (auto &state : gstate.blocked_tasks) { + state.Callback(); + } + gstate.blocked_tasks.clear(); + } else { + gstate.blocked_tasks.push_back(input.interrupt_state); + return SourceResultType::BLOCKED; + } } } diff --git a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp index 52bb30562..b3d0a7ea8 100644 --- a/src/duckdb/src/execution/radix_partitioned_hashtable.cpp +++ b/src/duckdb/src/execution/radix_partitioned_hashtable.cpp @@ -175,6 +175,8 @@ class RadixHTGlobalSinkState : public GlobalSinkState { idx_t count_before_combining; //! Maximum partition size if all unique idx_t max_partition_size; + + vector blocked_tasks; }; RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht_p) @@ -526,7 +528,6 @@ void RadixPartitionedHashTable::Finalize(ClientContext &context, GlobalSinkState auto max_threads = MinValue(TaskScheduler::GetScheduler(context).NumberOfThreads(), gstate.partitions.size()); gstate.temporary_memory_state->SetRemainingSize(context, max_threads * gstate.max_partition_size); - gstate.finalized = true; } @@ -572,8 +573,9 @@ class RadixHTGlobalSourceState : public GlobalSourceState { vector column_ids; //! For synchronizing scan tasks - atomic scan_idx; - atomic scan_done; + mutex lock; + idx_t scan_idx; + idx_t scan_done; }; enum class RadixHTScanStatus : uint8_t { INIT, IN_PROGRESS, DONE }; @@ -635,18 +637,26 @@ bool RadixHTGlobalSourceState::AssignTask(RadixHTGlobalSinkState &sink, RadixHTL if (finished) { return false; } - // We first try to assign a Scan task, then a Finalize task if that didn't work, without using any locks - // We need an atomic compare-and-swap to assign a Scan task, because we need to only increment - // the 'scan_idx' atomic if the 'finalize' of that partition is true, i.e., ready to be scanned - bool scan_assigned = true; - do { - lstate.task_idx = scan_idx.load(); - if (lstate.task_idx >= n_partitions || !sink.partitions[lstate.task_idx]->finalized) { - scan_assigned = false; - break; + // We first try to assign a Scan task, then a Finalize task if that didn't work + bool scan_assigned = false; + { + lock_guard gstate_guard(lock); + if (scan_idx < n_partitions && sink.partitions[scan_idx]->finalized) { + lstate.task_idx = scan_idx++; + scan_assigned = true; + if (scan_idx == n_partitions) { + // We will never be able to assign another task, unblock blocked tasks + lock_guard sink_guard(sink.lock); + if (!sink.blocked_tasks.empty()) { + for (auto &state : sink.blocked_tasks) { + state.Callback(); + } + sink.blocked_tasks.clear(); + } + } } - } while (!std::atomic_compare_exchange_weak(&scan_idx, &lstate.task_idx, lstate.task_idx + 1)); + } if (scan_assigned) { // We successfully assigned a Scan task @@ -669,7 +679,8 @@ bool RadixHTGlobalSourceState::AssignTask(RadixHTGlobalSinkState &sink, RadixHTL return true; } - // We didn't manage to assign a Finalize task + // We didn't manage to assign a Finalize task because there are none left + sink.temporary_memory_state->SetRemainingSize(context, 0); return false; } @@ -741,6 +752,17 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob // Mark partition as ready to scan partition.finalized = true; + // Unblock blocked tasks so they can scan this partition + { + lock_guard sink_guard(sink.lock); + if (!sink.blocked_tasks.empty()) { + for (auto &state : sink.blocked_tasks) { + state.Callback(); + } + sink.blocked_tasks.clear(); + } + } + // Make sure this thread's aggregate allocator does not get lost lock_guard guard(sink.lock); sink.stored_allocators.emplace_back(ht->GetAggregateAllocator()); @@ -756,6 +778,7 @@ void RadixHTLocalSourceState::Scan(RadixHTGlobalSinkState &sink, RadixHTGlobalSo if (data_collection.Count() == 0) { scan_status = RadixHTScanStatus::DONE; + lock_guard gstate_guard(gstate.lock); if (++gstate.scan_done == sink.partitions.size()) { gstate.finished = true; } @@ -776,6 +799,7 @@ void RadixHTLocalSourceState::Scan(RadixHTGlobalSinkState &sink, RadixHTGlobalSo } if (data_collection.ScanComplete(scan_state)) { + lock_guard gstate_guard(gstate.lock); if (++gstate.scan_done == sink.partitions.size()) { gstate.finished = true; } @@ -834,7 +858,6 @@ SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, D sink.scan_pin_properties == TupleDataPinProperties::DESTROY_AFTER_DONE); if (gstate.finished) { - sink.temporary_memory_state->SetRemainingSize(context.client, 0); return SourceResultType::FINISHED; } @@ -875,6 +898,15 @@ SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, D while (!gstate.finished && chunk.size() == 0) { if (!lstate.TaskFinished() || gstate.AssignTask(sink, lstate)) { lstate.ExecuteTask(sink, gstate, chunk); + } else { + lock_guard gstate_guard(gstate.lock); + if (gstate.scan_idx < sink.partitions.size()) { + lock_guard sink_guard(sink.lock); + sink.blocked_tasks.push_back(input.interrupt_state); + return SourceResultType::BLOCKED; + } else { + return SourceResultType::FINISHED; + } } } diff --git a/src/duckdb/src/function/scalar/nested_functions.cpp b/src/duckdb/src/function/scalar/nested_functions.cpp index a0e0aa5a9..59c5f2b23 100644 --- a/src/duckdb/src/function/scalar/nested_functions.cpp +++ b/src/duckdb/src/function/scalar/nested_functions.cpp @@ -2,6 +2,37 @@ namespace duckdb { +void MapUtil::ReinterpretMap(Vector &result, Vector &input, idx_t count) { + UnifiedVectorFormat input_data; + input.ToUnifiedFormat(count, input_data); + // Copy the list validity + FlatVector::SetValidity(result, input_data.validity); + + // Copy the struct validity + UnifiedVectorFormat input_struct_data; + ListVector::GetEntry(input).ToUnifiedFormat(count, input_struct_data); + auto &result_struct = ListVector::GetEntry(result); + FlatVector::SetValidity(result_struct, input_struct_data.validity); + + // Set the right vector type + result.SetVectorType(input.GetVectorType()); + + // Copy the list size + auto list_size = ListVector::GetListSize(input); + ListVector::SetListSize(result, list_size); + + // Copy the list buffer (the list_entry_t data) + result.CopyBuffer(input); + + auto &input_keys = MapVector::GetKeys(input); + auto &result_keys = MapVector::GetKeys(result); + result_keys.Reference(input_keys); + + auto &input_values = MapVector::GetValues(input); + auto &result_values = MapVector::GetValues(result); + result_values.Reference(input_values); +} + void BuiltinFunctions::RegisterNestedFunctions() { Register(); Register(); diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index d970a8983..8d37c6b57 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,8 +1,8 @@ #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v0.9.3-dev3731" +#define DUCKDB_VERSION "v0.9.3-dev3861" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "d4c774b1f1" +#define DUCKDB_SOURCE_ID "a20a91c5a4" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb.h b/src/duckdb/src/include/duckdb.h index eb54f2ed1..d0b6160d5 100644 --- a/src/duckdb/src/include/duckdb.h +++ b/src/duckdb/src/include/duckdb.h @@ -1535,6 +1535,18 @@ The error message can be obtained by calling duckdb_pending_error on the pending */ DUCKDB_API duckdb_pending_state duckdb_pending_execute_task(duckdb_pending_result pending_result); +/*! +If this returns DUCKDB_PENDING_RESULT_READY, the duckdb_execute_pending function can be called to obtain the result. +If this returns DUCKDB_PENDING_RESULT_NOT_READY, the duckdb_pending_execute_check_state function should be called again. +If this returns DUCKDB_PENDING_ERROR, an error occurred during execution. + +The error message can be obtained by calling duckdb_pending_error on the pending_result. + +* pending_result: The pending result. +* returns: The state of the pending result. +*/ +DUCKDB_API duckdb_pending_state duckdb_pending_execute_check_state(duckdb_pending_result pending_result); + /*! Fully execute a pending query result, returning the final query result. diff --git a/src/duckdb/src/include/duckdb/common/enums/pending_execution_result.hpp b/src/duckdb/src/include/duckdb/common/enums/pending_execution_result.hpp index e130e9734..8c8daa3e6 100644 --- a/src/duckdb/src/include/duckdb/common/enums/pending_execution_result.hpp +++ b/src/duckdb/src/include/duckdb/common/enums/pending_execution_result.hpp @@ -12,6 +12,12 @@ namespace duckdb { -enum class PendingExecutionResult : uint8_t { RESULT_READY, RESULT_NOT_READY, EXECUTION_ERROR, NO_TASKS_AVAILABLE }; +enum class PendingExecutionResult : uint8_t { + RESULT_READY, + RESULT_NOT_READY, + EXECUTION_ERROR, + BLOCKED, + NO_TASKS_AVAILABLE +}; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/types.hpp b/src/duckdb/src/include/duckdb/common/types.hpp index bf6d23e08..8c12ff83c 100644 --- a/src/duckdb/src/include/duckdb/common/types.hpp +++ b/src/duckdb/src/include/duckdb/common/types.hpp @@ -254,6 +254,19 @@ struct LogicalType { inline const ExtraTypeInfo *AuxInfo() const { return type_info_.get(); } + inline bool IsNested() const { + auto internal = InternalType(); + if (internal == PhysicalType::STRUCT) { + return true; + } + if (internal == PhysicalType::LIST) { + return true; + } + if (internal == PhysicalType::ARRAY) { + return true; + } + return false; + } inline shared_ptr GetAuxInfoShrPtr() const { return type_info_; diff --git a/src/duckdb/src/include/duckdb/common/types/vector.hpp b/src/duckdb/src/include/duckdb/common/types/vector.hpp index bf5d339e0..dd0152583 100644 --- a/src/duckdb/src/include/duckdb/common/types/vector.hpp +++ b/src/duckdb/src/include/duckdb/common/types/vector.hpp @@ -172,6 +172,11 @@ class Vector { auxiliary = std::move(new_buffer); }; + inline void CopyBuffer(Vector &other) { + buffer = other.buffer; + data = other.data; + } + //! This functions resizes the vector DUCKDB_API void Resize(idx_t cur_size, idx_t new_size); diff --git a/src/duckdb/src/include/duckdb/execution/executor.hpp b/src/duckdb/src/include/duckdb/execution/executor.hpp index 767f44f95..3c248c68d 100644 --- a/src/duckdb/src/include/duckdb/execution/executor.hpp +++ b/src/duckdb/src/include/duckdb/execution/executor.hpp @@ -48,17 +48,17 @@ class Executor { void Initialize(unique_ptr physical_plan); void CancelTasks(); - PendingExecutionResult ExecuteTask(); + PendingExecutionResult ExecuteTask(bool dry_run = false); void Reset(); vector GetTypes(); - unique_ptr FetchChunk(); - //! Push a new error void PushError(ErrorData exception); + ErrorData GetError(); + //! True if an error has been thrown bool HasError(); //! Throw the exception that was pushed using PushError. @@ -101,6 +101,7 @@ class Executor { bool ExecutionIsFinished(); private: + bool ResultCollectorIsBlocked(); void InitializeInternal(PhysicalOperator &physical_plan); void ScheduleEvents(const vector> &meta_pipelines); diff --git a/src/duckdb/src/include/duckdb/execution/operator/helper/physical_batch_collector.hpp b/src/duckdb/src/include/duckdb/execution/operator/helper/physical_batch_collector.hpp index 04c152094..88ae8f1b6 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/helper/physical_batch_collector.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/helper/physical_batch_collector.hpp @@ -33,10 +33,6 @@ class PhysicalBatchCollector : public PhysicalResultCollector { return true; } - bool IsSink() const override { - return true; - } - bool ParallelSink() const override { return true; } diff --git a/src/duckdb/src/include/duckdb/execution/operator/helper/physical_buffered_collector.hpp b/src/duckdb/src/include/duckdb/execution/operator/helper/physical_buffered_collector.hpp new file mode 100644 index 000000000..08cbd1b80 --- /dev/null +++ b/src/duckdb/src/include/duckdb/execution/operator/helper/physical_buffered_collector.hpp @@ -0,0 +1,37 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/execution/operator/helper/physical_buffered_collector.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/execution/operator/helper/physical_result_collector.hpp" +#include "duckdb/main/buffered_data/simple_buffered_data.hpp" + +namespace duckdb { + +class PhysicalBufferedCollector : public PhysicalResultCollector { +public: + PhysicalBufferedCollector(PreparedStatementData &data, bool parallel); + + bool parallel; + +public: + unique_ptr GetResult(GlobalSinkState &state) override; + +public: + // Sink interface + SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override; + SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override; + + unique_ptr GetLocalSinkState(ExecutionContext &context) const override; + unique_ptr GetGlobalSinkState(ClientContext &context) const override; + + bool ParallelSink() const override; + bool SinkOrderDependent() const override; +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/execution/task_error_manager.hpp b/src/duckdb/src/include/duckdb/execution/task_error_manager.hpp index cd47b8425..4d7e64493 100644 --- a/src/duckdb/src/include/duckdb/execution/task_error_manager.hpp +++ b/src/duckdb/src/include/duckdb/execution/task_error_manager.hpp @@ -21,6 +21,17 @@ class TaskErrorManager { this->exceptions.push_back(std::move(error)); } + ErrorData GetError() { + lock_guard elock(error_lock); + D_ASSERT(!exceptions.empty()); + + // FIXME: Should we try to get the biggest priority error? + // In case the first exception is a StandardException but a regular Exception or a FatalException occurred + // Maybe we should throw the more critical exception instead, as that changes behavior. + auto &entry = exceptions[0]; + return entry; + } + bool HasError() { lock_guard elock(error_lock); return !exceptions.empty(); diff --git a/src/duckdb/src/include/duckdb/function/scalar/nested_functions.hpp b/src/duckdb/src/include/duckdb/function/scalar/nested_functions.hpp index ed51471e1..86fa9a19b 100644 --- a/src/duckdb/src/include/duckdb/function/scalar/nested_functions.hpp +++ b/src/duckdb/src/include/duckdb/function/scalar/nested_functions.hpp @@ -49,6 +49,10 @@ struct PositionFunctor { } }; +struct MapUtil { + static void ReinterpretMap(Vector &target, Vector &other, idx_t count); +}; + struct VariableReturnBindData : public FunctionData { LogicalType stype; diff --git a/src/duckdb/src/include/duckdb/main/buffered_data/buffered_data.hpp b/src/duckdb/src/include/duckdb/main/buffered_data/buffered_data.hpp new file mode 100644 index 000000000..a98bfad56 --- /dev/null +++ b/src/duckdb/src/include/duckdb/main/buffered_data/buffered_data.hpp @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/main/buffered_data.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/parallel/interrupt.hpp" +#include "duckdb/common/queue.hpp" +#include "duckdb/common/vector_size.hpp" +#include "duckdb/common/types/data_chunk.hpp" +#include "duckdb/common/optional_idx.hpp" +#include "duckdb/execution/physical_operator_states.hpp" +#include "duckdb/common/enums/pending_execution_result.hpp" + +namespace duckdb { + +class StreamQueryResult; +class ClientContextLock; + +struct BlockedSink { +public: + BlockedSink(InterruptState state, idx_t chunk_size) : state(state), chunk_size(chunk_size) { + } + +public: + //! The handle to reschedule the blocked sink + InterruptState state; + //! The amount of tuples this sink would add + idx_t chunk_size; +}; + +class BufferedData { +protected: + enum class Type { SIMPLE }; + +public: + BufferedData(Type type, weak_ptr context) : type(type), context(context) { + } + virtual ~BufferedData() { + } + +public: + virtual bool BufferIsFull() = 0; + virtual PendingExecutionResult ReplenishBuffer(StreamQueryResult &result, ClientContextLock &context_lock) = 0; + virtual unique_ptr Scan() = 0; + shared_ptr GetContext() { + return context.lock(); + } + bool Closed() const { + if (context.expired()) { + return false; + } + auto c = context.lock(); + return c == nullptr; + } + void Close() { + context.reset(); + } + +public: + template + TARGET &Cast() { + if (TARGET::TYPE != type) { + throw InternalException("Failed to cast buffered data to type - buffered data type mismatch"); + } + return reinterpret_cast(*this); + } + + template + const TARGET &Cast() const { + if (TARGET::TYPE != type) { + throw InternalException("Failed to cast buffered data to type - buffered data type mismatch"); + } + return reinterpret_cast(*this); + } + +protected: + Type type; + //! This is weak to avoid a cyclical reference + weak_ptr context; + //! Protect against populate/fetch race condition + mutex glock; +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/main/buffered_data/simple_buffered_data.hpp b/src/duckdb/src/include/duckdb/main/buffered_data/simple_buffered_data.hpp new file mode 100644 index 000000000..a33928c48 --- /dev/null +++ b/src/duckdb/src/include/duckdb/main/buffered_data/simple_buffered_data.hpp @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/main/simple_buffered_data.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/main/buffered_data/buffered_data.hpp" +#include "duckdb/parallel/interrupt.hpp" +#include "duckdb/common/queue.hpp" +#include "duckdb/common/vector_size.hpp" +#include "duckdb/common/types/data_chunk.hpp" + +namespace duckdb { + +class StreamQueryResult; +class ClientContextLock; + +class SimpleBufferedData : public BufferedData { +public: + static constexpr const BufferedData::Type TYPE = BufferedData::Type::SIMPLE; + +private: + //! (roughly) The max amount of tuples we'll keep buffered at a time + static constexpr idx_t BUFFER_SIZE = 100000; + +public: + SimpleBufferedData(weak_ptr context); + ~SimpleBufferedData() override; + +public: + void Append(unique_ptr chunk); + void BlockSink(const BlockedSink &blocked_sink); + bool BufferIsFull() override; + PendingExecutionResult ReplenishBuffer(StreamQueryResult &result, ClientContextLock &context_lock) override; + unique_ptr Scan() override; + +private: + void UnblockSinks(); + +private: + //! Our handles to reschedule the blocked sink tasks + queue blocked_sinks; + //! The queue of chunks + queue> buffered_chunks; + //! The current capacity of the buffer (tuples) + atomic buffered_count; +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/main/client_context.hpp b/src/duckdb/src/include/duckdb/main/client_context.hpp index e8c54ab65..b62dc04fd 100644 --- a/src/duckdb/src/include/duckdb/main/client_context.hpp +++ b/src/duckdb/src/include/duckdb/main/client_context.hpp @@ -43,6 +43,7 @@ struct CreateScalarFunctionInfo; class ScalarFunctionCatalogEntry; struct ActiveQueryContext; struct ParserOptions; +class SimpleBufferedData; struct ClientData; struct PendingQueryParameters { @@ -63,8 +64,9 @@ class ClientContextState { //! The ClientContext holds information relevant to the current client session //! during execution class ClientContext : public std::enable_shared_from_this { - friend class PendingQueryResult; - friend class StreamQueryResult; + friend class PendingQueryResult; // LockContext + friend class SimpleBufferedData; // ExecuteTaskInternal + friend class StreamQueryResult; // LockContext friend class ConnectionManager; public: @@ -174,10 +176,9 @@ class ClientContext : public std::enable_shared_from_this { //! Returns the parser options for this client context DUCKDB_API ParserOptions GetParserOptions() const; - DUCKDB_API unique_ptr Fetch(ClientContextLock &lock, StreamQueryResult &result); - //! Whether or not the given result object (streaming query result or pending query result) is active - DUCKDB_API bool IsActiveResult(ClientContextLock &lock, BaseQueryResult *result); + DUCKDB_API bool IsActiveResult(ClientContextLock &lock, BaseQueryResult &result); + DUCKDB_API void SetActiveResult(ClientContextLock &lock, BaseQueryResult &result); //! Returns the current executor Executor &GetExecutor(); @@ -237,7 +238,6 @@ class ClientContext : public std::enable_shared_from_this { void LogQueryInternal(ClientContextLock &lock, const string &query); unique_ptr FetchResultInternal(ClientContextLock &lock, PendingQueryResult &pending); - unique_ptr FetchInternal(ClientContextLock &lock, Executor &executor, BaseQueryResult &result); unique_ptr LockContext(); @@ -245,7 +245,7 @@ class ClientContext : public std::enable_shared_from_this { void BeginQueryInternal(ClientContextLock &lock, const string &query); ErrorData EndQueryInternal(ClientContextLock &lock, bool success, bool invalidate_transaction); - PendingExecutionResult ExecuteTaskInternal(ClientContextLock &lock, PendingQueryResult &result); + PendingExecutionResult ExecuteTaskInternal(ClientContextLock &lock, BaseQueryResult &result, bool dry_run = false); unique_ptr PendingStatementOrPreparedStatementInternal( ClientContextLock &lock, const string &query, unique_ptr statement, diff --git a/src/duckdb/src/include/duckdb/main/pending_query_result.hpp b/src/duckdb/src/include/duckdb/main/pending_query_result.hpp index 930363e29..8b95a025e 100644 --- a/src/duckdb/src/include/duckdb/main/pending_query_result.hpp +++ b/src/duckdb/src/include/duckdb/main/pending_query_result.hpp @@ -28,6 +28,7 @@ class PendingQueryResult : public BaseQueryResult { vector types, bool allow_stream_result); DUCKDB_API explicit PendingQueryResult(ErrorData error_message); DUCKDB_API ~PendingQueryResult() override; + DUCKDB_API bool AllowStreamResult() const; public: //! Executes a single task within the query, returning whether or not the query is ready. @@ -38,6 +39,7 @@ class PendingQueryResult : public BaseQueryResult { //! but tasks may become available in the future. //! The error message can be obtained by calling GetError() on the PendingQueryResult. DUCKDB_API PendingExecutionResult ExecuteTask(); + DUCKDB_API PendingExecutionResult CheckPulse(); //! Returns the result of the query as an actual query result. //! This returns (mostly) instantly if ExecuteTask has been called until RESULT_READY was returned. @@ -47,6 +49,7 @@ class PendingQueryResult : public BaseQueryResult { //! Function to determine whether execution is considered finished DUCKDB_API static bool IsFinished(PendingExecutionResult result); + DUCKDB_API static bool IsFinishedOrBlocked(PendingExecutionResult result); private: shared_ptr context; diff --git a/src/duckdb/src/include/duckdb/main/prepared_statement_data.hpp b/src/duckdb/src/include/duckdb/main/prepared_statement_data.hpp index ce0b7d1a1..684f2d530 100644 --- a/src/duckdb/src/include/duckdb/main/prepared_statement_data.hpp +++ b/src/duckdb/src/include/duckdb/main/prepared_statement_data.hpp @@ -46,6 +46,8 @@ class PreparedStatementData { idx_t catalog_version; //! The map of parameter index to the actual value entry bound_parameter_map_t value_map; + //! Whether we are creating a streaming result or not + bool is_streaming = false; public: void CheckParameterCount(idx_t parameter_count); diff --git a/src/duckdb/src/include/duckdb/main/stream_query_result.hpp b/src/duckdb/src/include/duckdb/main/stream_query_result.hpp index 0373f307e..5b8016c80 100644 --- a/src/duckdb/src/include/duckdb/main/stream_query_result.hpp +++ b/src/duckdb/src/include/duckdb/main/stream_query_result.hpp @@ -10,6 +10,9 @@ #include "duckdb/common/winapi.hpp" #include "duckdb/main/query_result.hpp" +#include "duckdb/parallel/interrupt.hpp" +#include "duckdb/common/queue.hpp" +#include "duckdb/main/buffered_data/simple_buffered_data.hpp" namespace duckdb { @@ -29,7 +32,8 @@ class StreamQueryResult : public QueryResult { //! Create a successful StreamQueryResult. StreamQueryResults should always be successful initially (it makes no //! sense to stream an error). DUCKDB_API StreamQueryResult(StatementType statement_type, StatementProperties properties, - shared_ptr context, vector types, vector names); + vector types, vector names, ClientProperties client_properties, + shared_ptr buffered_data); DUCKDB_API ~StreamQueryResult() override; public: @@ -49,9 +53,13 @@ class StreamQueryResult : public QueryResult { shared_ptr context; private: + unique_ptr FetchInternal(ClientContextLock &lock); unique_ptr LockContext(); void CheckExecutableInternal(ClientContextLock &lock); bool IsOpenInternal(ClientContextLock &lock); + +private: + shared_ptr buffered_data; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp index 27f9fa65f..ca60992eb 100644 --- a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp +++ b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp @@ -21,6 +21,26 @@ namespace duckdb { class Executor; class Event; class MetaPipeline; +class PipelineExecutor; +class Pipeline; + +class PipelineTask : public ExecutorTask { + static constexpr const idx_t PARTIAL_CHUNK_COUNT = 50; + +public: + explicit PipelineTask(Pipeline &pipeline_p, shared_ptr event_p); + + Pipeline &pipeline; + shared_ptr event; + unique_ptr pipeline_executor; + +public: + const PipelineExecutor &GetPipelineExecutor() const; + bool TaskBlockedOnResult() const override; + +public: + TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; +}; class PipelineBuildState { public: diff --git a/src/duckdb/src/include/duckdb/parallel/pipeline_executor.hpp b/src/duckdb/src/include/duckdb/parallel/pipeline_executor.hpp index 631944492..e19c4483d 100644 --- a/src/duckdb/src/include/duckdb/parallel/pipeline_executor.hpp +++ b/src/duckdb/src/include/duckdb/parallel/pipeline_executor.hpp @@ -51,14 +51,12 @@ class PipelineExecutor { //! This should only be called once per PipelineExecutor. PipelineExecuteResult PushFinalize(); - //! Initializes a chunk with the types that will flow out of ExecutePull + bool RemainingSinkChunk() const; + + //! Initializes a chunk with the types that will flow out of the chunk void InitializeChunk(DataChunk &chunk); //! Execute a pipeline without a sink, and retrieve a single DataChunk //! Returns an empty chunk when finished. - void ExecutePull(DataChunk &result); - //! Called after depleting the source using ExecutePull - //! This flushes profiler states - void PullFinalize(); //! Registers the task in the interrupt_state to allow Source/Sink operators to block the task void SetTaskForInterrupts(weak_ptr current_task); diff --git a/src/duckdb/src/include/duckdb/parallel/task.hpp b/src/duckdb/src/include/duckdb/parallel/task.hpp index 2245c74a0..25a85b710 100644 --- a/src/duckdb/src/include/duckdb/parallel/task.hpp +++ b/src/duckdb/src/include/duckdb/parallel/task.hpp @@ -27,6 +27,7 @@ class Task : public std::enable_shared_from_this { virtual ~Task() { } +public: //! Execute the task in the specified execution mode //! If mode is PROCESS_ALL, Execute should always finish processing and return TASK_FINISHED //! If mode is PROCESS_PARTIAL, Execute can return TASK_NOT_FINISHED, in which case Execute will be called again @@ -45,6 +46,10 @@ class Task : public std::enable_shared_from_this { virtual void Reschedule() { throw InternalException("Cannot reschedule task of base Task class"); } + + virtual bool TaskBlockedOnResult() const { + return false; + } }; //! Execute a task within an executor, including exception handling @@ -55,9 +60,11 @@ class ExecutorTask : public Task { ExecutorTask(ClientContext &context); virtual ~ExecutorTask(); +public: void Deschedule() override; void Reschedule() override; +public: Executor &executor; public: diff --git a/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp b/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp index b54da51ce..1c04dbb88 100644 --- a/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp +++ b/src/duckdb/src/include/duckdb/parallel/task_scheduler.hpp @@ -61,6 +61,9 @@ class TaskScheduler { //! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched. //! The main thread will also be used for execution void SetThreads(int32_t n); + + void RelaunchThreads(); + //! Returns the number of threads DUCKDB_API int32_t NumberOfThreads(); @@ -74,7 +77,7 @@ class TaskScheduler { void SetAllocatorFlushTreshold(idx_t threshold); private: - void SetThreadsInternal(int32_t n); + void RelaunchThreadsInternal(int32_t n); private: DatabaseInstance &db; @@ -88,6 +91,8 @@ class TaskScheduler { vector>> markers; //! The threshold after which to flush the allocator after completing a task atomic allocator_flush_threshold; + //! Requested thread count + atomic thread_count; }; } // namespace duckdb diff --git a/src/duckdb/src/main/buffered_data/simple_buffered_data.cpp b/src/duckdb/src/main/buffered_data/simple_buffered_data.cpp new file mode 100644 index 000000000..f84cae8a1 --- /dev/null +++ b/src/duckdb/src/main/buffered_data/simple_buffered_data.cpp @@ -0,0 +1,96 @@ +#include "duckdb/main/buffered_data/simple_buffered_data.hpp" +#include "duckdb/common/printer.hpp" +#include "duckdb/main/client_context.hpp" +#include "duckdb/main/stream_query_result.hpp" +#include "duckdb/common/helper.hpp" + +namespace duckdb { + +SimpleBufferedData::SimpleBufferedData(weak_ptr context) + : BufferedData(BufferedData::Type::SIMPLE, std::move(context)) { + buffered_count = 0; +} + +SimpleBufferedData::~SimpleBufferedData() { +} + +void SimpleBufferedData::BlockSink(const BlockedSink &blocked_sink) { + lock_guard lock(glock); + blocked_sinks.push(blocked_sink); +} + +bool SimpleBufferedData::BufferIsFull() { + return buffered_count >= BUFFER_SIZE; +} + +void SimpleBufferedData::UnblockSinks() { + if (Closed()) { + return; + } + if (buffered_count >= BUFFER_SIZE) { + return; + } + // Reschedule enough blocked sinks to populate the buffer + lock_guard lock(glock); + while (!blocked_sinks.empty()) { + auto &blocked_sink = blocked_sinks.front(); + if (buffered_count >= BUFFER_SIZE) { + // We have unblocked enough sinks already + break; + } + blocked_sink.state.Callback(); + blocked_sinks.pop(); + } +} + +PendingExecutionResult SimpleBufferedData::ReplenishBuffer(StreamQueryResult &result, ClientContextLock &context_lock) { + if (Closed()) { + return PendingExecutionResult::EXECUTION_ERROR; + } + if (BufferIsFull()) { + // The buffer isn't empty yet, just return + return PendingExecutionResult::RESULT_READY; + } + UnblockSinks(); + auto cc = context.lock(); + // Let the executor run until the buffer is no longer empty + auto res = cc->ExecuteTaskInternal(context_lock, result); + while (!PendingQueryResult::IsFinished(res)) { + if (buffered_count >= BUFFER_SIZE) { + break; + } + // Check if we need to unblock more sinks to reach the buffer size + UnblockSinks(); + res = cc->ExecuteTaskInternal(context_lock, result); + } + if (result.HasError()) { + Close(); + } + return res; +} + +unique_ptr SimpleBufferedData::Scan() { + if (Closed()) { + return nullptr; + } + lock_guard lock(glock); + if (buffered_chunks.empty()) { + Close(); + return nullptr; + } + auto chunk = std::move(buffered_chunks.front()); + buffered_chunks.pop(); + + if (chunk) { + buffered_count -= chunk->size(); + } + return chunk; +} + +void SimpleBufferedData::Append(unique_ptr chunk) { + unique_lock lock(glock); + buffered_count += chunk->size(); + buffered_chunks.push(std::move(chunk)); +} + +} // namespace duckdb diff --git a/src/duckdb/src/main/capi/pending-c.cpp b/src/duckdb/src/main/capi/pending-c.cpp index 734754ff2..688758724 100644 --- a/src/duckdb/src/main/capi/pending-c.cpp +++ b/src/duckdb/src/main/capi/pending-c.cpp @@ -66,6 +66,37 @@ const char *duckdb_pending_error(duckdb_pending_result pending_result) { return wrapper->statement->GetError().c_str(); } +duckdb_pending_state duckdb_pending_execute_check_state(duckdb_pending_result pending_result) { + if (!pending_result) { + return DUCKDB_PENDING_ERROR; + } + auto wrapper = reinterpret_cast(pending_result); + if (!wrapper->statement) { + return DUCKDB_PENDING_ERROR; + } + if (wrapper->statement->HasError()) { + return DUCKDB_PENDING_ERROR; + } + PendingExecutionResult return_value; + try { + return_value = wrapper->statement->CheckPulse(); + } catch (std::exception &ex) { + wrapper->statement->SetError(duckdb::ErrorData(ex)); + return DUCKDB_PENDING_ERROR; + } + switch (return_value) { + case PendingExecutionResult::BLOCKED: + case PendingExecutionResult::RESULT_READY: + return DUCKDB_PENDING_RESULT_READY; + case PendingExecutionResult::NO_TASKS_AVAILABLE: + return DUCKDB_PENDING_NO_TASKS_AVAILABLE; + case PendingExecutionResult::RESULT_NOT_READY: + return DUCKDB_PENDING_RESULT_NOT_READY; + default: + return DUCKDB_PENDING_ERROR; + } +} + duckdb_pending_state duckdb_pending_execute_task(duckdb_pending_result pending_result) { if (!pending_result) { return DUCKDB_PENDING_ERROR; @@ -85,6 +116,7 @@ duckdb_pending_state duckdb_pending_execute_task(duckdb_pending_result pending_r return DUCKDB_PENDING_ERROR; } switch (return_value) { + case PendingExecutionResult::BLOCKED: case PendingExecutionResult::RESULT_READY: return DUCKDB_PENDING_RESULT_READY; case PendingExecutionResult::NO_TASKS_AVAILABLE: @@ -115,13 +147,20 @@ duckdb_state duckdb_execute_pending(duckdb_pending_result pending_result, duckdb if (!pending_result || !out_result) { return DuckDBError; } + memset(out_result, 0, sizeof(duckdb_result)); auto wrapper = reinterpret_cast(pending_result); if (!wrapper->statement) { return DuckDBError; } duckdb::unique_ptr result; - result = wrapper->statement->Execute(); + try { + result = wrapper->statement->Execute(); + } catch (std::exception &ex) { + duckdb::ErrorData error(ex); + result = duckdb::make_uniq(std::move(error)); + } + wrapper->statement.reset(); return duckdb_translate_result(std::move(result), out_result); } diff --git a/src/duckdb/src/main/capi/result-c.cpp b/src/duckdb/src/main/capi/result-c.cpp index 5f4070725..c3b62c410 100644 --- a/src/duckdb/src/main/capi/result-c.cpp +++ b/src/duckdb/src/main/capi/result-c.cpp @@ -483,7 +483,7 @@ bool *duckdb_nullmask_data(duckdb_result *result, idx_t col) { } const char *duckdb_result_error(duckdb_result *result) { - if (!result) { + if (!result || !result->internal_data) { return nullptr; } auto &result_data = *(reinterpret_cast(result->internal_data)); diff --git a/src/duckdb/src/main/client_context.cpp b/src/duckdb/src/main/client_context.cpp index e533dc158..672ebefa4 100644 --- a/src/duckdb/src/main/client_context.cpp +++ b/src/duckdb/src/main/client_context.cpp @@ -47,16 +47,30 @@ namespace duckdb { struct ActiveQueryContext { +public: //! The query that is currently being executed string query; - //! The currently open result - BaseQueryResult *open_result = nullptr; //! Prepared statement data shared_ptr prepared; //! The query executor unique_ptr executor; //! The progress bar unique_ptr progress_bar; + +public: + void SetOpenResult(BaseQueryResult &result) { + open_result = &result; + } + bool IsOpenResult(BaseQueryResult &result) { + return open_result == &result; + } + bool HasOpenResult() const { + return open_result != nullptr; + } + +private: + //! The currently open result + BaseQueryResult *open_result = nullptr; }; ClientContext::ClientContext(shared_ptr database) @@ -101,42 +115,6 @@ unique_ptr ClientContext::ErrorResult(ErrorData error, const string &query) { return make_uniq(std::move(error)); } -unique_ptr ClientContext::Fetch(ClientContextLock &lock, StreamQueryResult &result) { - D_ASSERT(IsActiveResult(lock, &result)); - D_ASSERT(active_query->executor); - return FetchInternal(lock, *active_query->executor, result); -} - -unique_ptr ClientContext::FetchInternal(ClientContextLock &lock, Executor &executor, - BaseQueryResult &result) { - bool invalidate_query = true; - try { - // fetch the chunk and return it - auto chunk = executor.FetchChunk(); - if (!chunk || chunk->size() == 0) { - CleanupInternal(lock, &result); - } - return chunk; - } catch (std::exception &ex) { - ErrorData error(ex); - auto exception_type = error.Type(); - if (!Exception::InvalidatesTransaction(exception_type)) { - // standard exceptions do not invalidate the current transaction - invalidate_query = false; - } else if (Exception::InvalidatesDatabase(exception_type)) { - // fatal exceptions invalidate the entire database - auto &db_inst = DatabaseInstance::GetDatabase(*this); - ValidChecker::Invalidate(db_inst, error.RawMessage()); - } - ProcessError(error, active_query->query); - result.SetError(std::move(error)); - } catch (...) { // LCOV_EXCL_START - result.SetError(ErrorData("Unhandled exception in FetchInternal")); - } // LCOV_EXCL_STOP - CleanupInternal(lock, &result, invalidate_query); - return nullptr; -} - void ClientContext::BeginTransactionInternal(ClientContextLock &lock, bool requires_valid_transaction) { // check if we are on AutoCommit. In this case we should start a transaction D_ASSERT(!active_query); @@ -222,6 +200,10 @@ void ClientContext::CleanupInternal(ClientContextLock &lock, BaseQueryResult *re } active_query->progress_bar.reset(); + // Relaunch the threads if a SET THREADS command was issued + auto &scheduler = TaskScheduler::GetScheduler(*this); + scheduler.RelaunchThreads(); + auto error = EndQueryInternal(lock, result ? !result->HasError() : false, invalidate_transaction); if (result && !result->HasError()) { // if an error occurred while committing report it in the result @@ -243,54 +225,19 @@ const string &ClientContext::GetCurrentQuery() { unique_ptr ClientContext::FetchResultInternal(ClientContextLock &lock, PendingQueryResult &pending) { D_ASSERT(active_query); - D_ASSERT(active_query->open_result == &pending); + D_ASSERT(active_query->IsOpenResult(pending)); D_ASSERT(active_query->prepared); auto &executor = GetExecutor(); auto &prepared = *active_query->prepared; bool create_stream_result = prepared.properties.allow_stream_result && pending.allow_stream_result; - if (create_stream_result) { - D_ASSERT(!executor.HasResultCollector()); - active_query->progress_bar.reset(); - query_progress.Initialize(); - // successfully compiled SELECT clause, and it is the last statement - // return a StreamQueryResult so the client can call Fetch() on it and stream the result - auto stream_result = make_uniq(pending.statement_type, pending.properties, - shared_from_this(), pending.types, pending.names); - active_query->open_result = stream_result.get(); - return std::move(stream_result); - } unique_ptr result; - if (executor.HasResultCollector()) { - // we have a result collector - fetch the result directly from the result collector - result = executor.GetResult(); + D_ASSERT(executor.HasResultCollector()); + // we have a result collector - fetch the result directly from the result collector + result = executor.GetResult(); + if (!create_stream_result) { CleanupInternal(lock, result.get(), false); } else { - // no result collector - create a materialized result by continuously fetching - auto result_collection = make_uniq(Allocator::DefaultAllocator(), pending.types); - D_ASSERT(!result_collection->Types().empty()); - auto materialized_result = - make_uniq(pending.statement_type, pending.properties, pending.names, - std::move(result_collection), GetClientProperties()); - - auto &collection = materialized_result->Collection(); - D_ASSERT(!collection.Types().empty()); - ColumnDataAppendState append_state; - collection.InitializeAppend(append_state); - while (true) { - auto chunk = FetchInternal(lock, GetExecutor(), *materialized_result); - if (!chunk || chunk->size() == 0) { - break; - } -#ifdef DEBUG - for (idx_t i = 0; i < chunk->ColumnCount(); i++) { - if (pending.types[i].id() == LogicalTypeId::VARCHAR) { - chunk->data[i].UTFVerify(chunk->size()); - } - } -#endif - collection.Append(append_state, *chunk); - } - result = std::move(materialized_result); + active_query->SetOpenResult(*result); } return result; } @@ -417,42 +364,56 @@ unique_ptr ClientContext::PendingPreparedStatement(ClientCon query_progress.Restart(); } auto stream_result = parameters.allow_stream_result && statement.properties.allow_stream_result; - if (!stream_result && statement.properties.return_type == StatementReturnType::QUERY_RESULT) { - unique_ptr collector; - auto &client_config = ClientConfig::GetConfig(*this); - auto get_method = client_config.result_collector ? client_config.result_collector - : PhysicalResultCollector::GetResultCollector; - collector = get_method(*this, statement); - D_ASSERT(collector->type == PhysicalOperatorType::RESULT_COLLECTOR); - executor.Initialize(std::move(collector)); - } else { - executor.Initialize(*statement.plan); + + get_result_collector_t get_method = PhysicalResultCollector::GetResultCollector; + auto &client_config = ClientConfig::GetConfig(*this); + if (!stream_result && client_config.result_collector) { + get_method = client_config.result_collector; } + statement.is_streaming = stream_result; + auto collector = get_method(*this, statement); + D_ASSERT(collector->type == PhysicalOperatorType::RESULT_COLLECTOR); + executor.Initialize(std::move(collector)); + auto types = executor.GetTypes(); D_ASSERT(types == statement.types); - D_ASSERT(!active_query->open_result); + D_ASSERT(!active_query->HasOpenResult()); auto pending_result = make_uniq(shared_from_this(), *statement_p, std::move(types), stream_result); active_query->prepared = std::move(statement_p); - active_query->open_result = pending_result.get(); + active_query->SetOpenResult(*pending_result); return pending_result; } -PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &lock, PendingQueryResult &result) { +PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &lock, BaseQueryResult &result, + bool dry_run) { D_ASSERT(active_query); - D_ASSERT(active_query->open_result == &result); + D_ASSERT(active_query->IsOpenResult(result)); bool invalidate_transaction = true; try { - auto query_result = active_query->executor->ExecuteTask(); + auto query_result = active_query->executor->ExecuteTask(dry_run); if (active_query->progress_bar) { - active_query->progress_bar->Update(query_result == PendingExecutionResult::RESULT_READY); + auto is_finished = PendingQueryResult::IsFinishedOrBlocked(query_result); + active_query->progress_bar->Update(is_finished); query_progress = active_query->progress_bar->GetDetailedQueryProgress(); } return query_result; } catch (std::exception &ex) { auto error = ErrorData(ex); - if (!Exception::InvalidatesTransaction(error.Type())) { + if (error.Type() == ExceptionType::INTERRUPT) { + auto &executor = *active_query->executor; + if (!executor.HasError()) { + // Interrupted by the user + result.SetError(ex); + invalidate_transaction = true; + } else { + // Interrupted by an exception caused in a worker thread + auto error = executor.GetError(); + invalidate_transaction = Exception::InvalidatesTransaction(error.Type()); + result.SetError(error); + } + } else if (!Exception::InvalidatesTransaction(error.Type())) { invalidate_transaction = false; } else if (Exception::InvalidatesDatabase(error.Type())) { // fatal exceptions invalidate the entire database @@ -637,11 +598,18 @@ unique_ptr ClientContext::RunStatementInternal(ClientContextLock &l return ExecutePendingQueryInternal(lock, *pending); } -bool ClientContext::IsActiveResult(ClientContextLock &lock, BaseQueryResult *result) { +bool ClientContext::IsActiveResult(ClientContextLock &lock, BaseQueryResult &result) { if (!active_query) { return false; } - return active_query->open_result == result; + return active_query->IsOpenResult(result); +} + +void ClientContext::SetActiveResult(ClientContextLock &lock, BaseQueryResult &result) { + if (!active_query) { + return; + } + return active_query->SetOpenResult(result); } unique_ptr ClientContext::PendingStatementOrPreparedStatementInternal( @@ -700,7 +668,7 @@ unique_ptr ClientContext::PendingStatementOrPreparedStatemen unique_ptr ClientContext::PendingStatementOrPreparedStatement( ClientContextLock &lock, const string &query, unique_ptr statement, shared_ptr &prepared, const PendingQueryParameters ¶meters) { - unique_ptr result; + unique_ptr pending; try { BeginQueryInternal(lock, query); @@ -720,7 +688,7 @@ unique_ptr ClientContext::PendingStatementOrPreparedStatemen bool invalidate_query = true; try { if (statement) { - result = PendingStatementInternal(lock, query, std::move(statement), parameters); + pending = PendingStatementInternal(lock, query, std::move(statement), parameters); } else { if (prepared->RequireRebind(*this, parameters.parameters)) { // catalog was modified: rebind the statement before execution @@ -731,7 +699,7 @@ unique_ptr ClientContext::PendingStatementOrPreparedStatemen prepared = std::move(new_prepared); prepared->properties.bound_all_parameters = false; } - result = PendingPreparedStatement(lock, prepared, parameters); + pending = PendingPreparedStatement(lock, prepared, parameters); } } catch (std::exception &ex) { ErrorData error(ex); @@ -746,15 +714,15 @@ unique_ptr ClientContext::PendingStatementOrPreparedStatemen } } // other types of exceptions do invalidate the current transaction - result = ErrorResult(std::move(error), query); + pending = ErrorResult(std::move(error), query); } - if (result->HasError()) { + if (pending->HasError()) { // query failed: abort now EndQueryInternal(lock, false, invalidate_query); - return result; + return pending; } - D_ASSERT(active_query->open_result == result.get()); - return result; + D_ASSERT(active_query->IsOpenResult(*pending)); + return pending; } void ClientContext::LogQueryInternal(ClientContextLock &, const string &query) { @@ -835,6 +803,12 @@ unique_ptr ClientContext::Query(const string &query, bool allow_str last_result->next = std::move(current_result); last_result = last_result->next.get(); } + D_ASSERT(last_result); + if (last_result->HasError()) { + // Reset the interrupted flag, this was set by the task that found the error + // Next statements should not be bothered by that interruption + interrupted = false; + } } return result; } diff --git a/src/duckdb/src/main/database.cpp b/src/duckdb/src/main/database.cpp index d62f9f796..189131493 100644 --- a/src/duckdb/src/main/database.cpp +++ b/src/duckdb/src/main/database.cpp @@ -244,6 +244,7 @@ void DatabaseInstance::Initialize(const char *database_path, DBConfig *user_conf // only increase thread count after storage init because we get races on catalog otherwise scheduler->SetThreads(config.options.maximum_threads); + scheduler->RelaunchThreads(); } DuckDB::DuckDB(const char *path, DBConfig *new_config) : instance(make_shared()) { diff --git a/src/duckdb/src/main/pending_query_result.cpp b/src/duckdb/src/main/pending_query_result.cpp index 89ec1c451..a112286e2 100644 --- a/src/duckdb/src/main/pending_query_result.cpp +++ b/src/duckdb/src/main/pending_query_result.cpp @@ -32,7 +32,7 @@ unique_ptr PendingQueryResult::LockContext() { void PendingQueryResult::CheckExecutableInternal(ClientContextLock &lock) { bool invalidated = HasError() || !context; if (!invalidated) { - invalidated = !context->IsActiveResult(lock, this); + invalidated = !context->IsActiveResult(lock, *this); } if (invalidated) { if (HasError()) { @@ -48,15 +48,30 @@ PendingExecutionResult PendingQueryResult::ExecuteTask() { return ExecuteTaskInternal(*lock); } +PendingExecutionResult PendingQueryResult::CheckPulse() { + auto lock = LockContext(); + CheckExecutableInternal(*lock); + return context->ExecuteTaskInternal(*lock, *this, true); +} + +bool PendingQueryResult::AllowStreamResult() const { + return allow_stream_result; +} + PendingExecutionResult PendingQueryResult::ExecuteTaskInternal(ClientContextLock &lock) { CheckExecutableInternal(lock); - return context->ExecuteTaskInternal(lock, *this); + return context->ExecuteTaskInternal(lock, *this, false); } unique_ptr PendingQueryResult::ExecuteInternal(ClientContextLock &lock) { CheckExecutableInternal(lock); // Busy wait while execution is not finished - while (!IsFinished(ExecuteTaskInternal(lock))) { + if (allow_stream_result) { + while (!IsFinishedOrBlocked(ExecuteTaskInternal(lock))) { + } + } else { + while (!IsFinished(ExecuteTaskInternal(lock))) { + } } if (HasError()) { return make_uniq(error); @@ -76,10 +91,12 @@ void PendingQueryResult::Close() { } bool PendingQueryResult::IsFinished(PendingExecutionResult result) { - if (result == PendingExecutionResult::RESULT_READY || result == PendingExecutionResult::EXECUTION_ERROR) { - return true; - } - return false; + return (result == PendingExecutionResult::RESULT_READY || result == PendingExecutionResult::EXECUTION_ERROR); +} + +bool PendingQueryResult::IsFinishedOrBlocked(PendingExecutionResult result) { + return (result == PendingExecutionResult::RESULT_READY || result == PendingExecutionResult::EXECUTION_ERROR || + result == PendingExecutionResult::BLOCKED); } } // namespace duckdb diff --git a/src/duckdb/src/main/query_result.cpp b/src/duckdb/src/main/query_result.cpp index 1c96468b0..c24fab135 100644 --- a/src/duckdb/src/main/query_result.cpp +++ b/src/duckdb/src/main/query_result.cpp @@ -15,6 +15,8 @@ BaseQueryResult::BaseQueryResult(QueryResultType type, StatementType statement_t BaseQueryResult::BaseQueryResult(QueryResultType type, ErrorData error) : type(type), success(false), error(std::move(error)) { + // Assert that the error object is initialized + D_ASSERT(this->error.HasError()); } BaseQueryResult::~BaseQueryResult() { diff --git a/src/duckdb/src/main/stream_query_result.cpp b/src/duckdb/src/main/stream_query_result.cpp index 903e485e2..73ab07c39 100644 --- a/src/duckdb/src/main/stream_query_result.cpp +++ b/src/duckdb/src/main/stream_query_result.cpp @@ -3,16 +3,17 @@ #include "duckdb/main/client_context.hpp" #include "duckdb/main/materialized_query_result.hpp" #include "duckdb/common/box_renderer.hpp" +#include "duckdb/main/database.hpp" namespace duckdb { StreamQueryResult::StreamQueryResult(StatementType statement_type, StatementProperties properties, - shared_ptr context_p, vector types, - vector names) + vector types, vector names, + ClientProperties client_properties, shared_ptr data) : QueryResult(QueryResultType::STREAM_RESULT, statement_type, std::move(properties), std::move(types), - std::move(names), context_p->GetClientProperties()), - context(std::move(context_p)) { - D_ASSERT(context); + std::move(names), std::move(client_properties)), + buffered_data(std::move(data)) { + context = buffered_data->GetContext(); } StreamQueryResult::~StreamQueryResult() { @@ -40,14 +41,41 @@ unique_ptr StreamQueryResult::LockContext() { return context->LockContext(); } -void StreamQueryResult::CheckExecutableInternal(ClientContextLock &lock) { - if (!IsOpenInternal(lock)) { - string error_str = "Attempting to execute an unsuccessful or closed pending query result"; - if (HasError()) { - error_str += StringUtil::Format("\nError: %s", GetError()); +unique_ptr StreamQueryResult::FetchInternal(ClientContextLock &lock) { + bool invalidate_query = true; + unique_ptr chunk; + try { + // fetch the chunk and return it + auto res = buffered_data->ReplenishBuffer(*this, lock); + if (res == PendingExecutionResult::EXECUTION_ERROR) { + return chunk; } - throw InvalidInputException(error_str); - } + chunk = buffered_data->Scan(); + if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) { + context->CleanupInternal(lock, this); + chunk = nullptr; + } + return chunk; + } catch (std::exception &ex) { + ErrorData error(ex); + if (!Exception::InvalidatesTransaction(error.Type())) { + // standard exceptions do not invalidate the current transaction + invalidate_query = false; + } else if (Exception::InvalidatesDatabase(error.Type())) { + // fatal exceptions invalidate the entire database + auto &config = context->config; + if (!config.query_verification_enabled) { + auto &db_instance = DatabaseInstance::GetDatabase(*context); + ValidChecker::Invalidate(db_instance, error.RawMessage()); + } + } + context->ProcessError(error, context->GetCurrentQuery()); + SetError(std::move(error)); + } catch (...) { // LCOV_EXCL_START + SetError(ErrorData("Unhandled exception in FetchInternal")); + } // LCOV_EXCL_STOP + context->CleanupInternal(lock, this, invalidate_query); + return nullptr; } unique_ptr StreamQueryResult::FetchRaw() { @@ -55,12 +83,7 @@ unique_ptr StreamQueryResult::FetchRaw() { { auto lock = LockContext(); CheckExecutableInternal(*lock); - auto system_chunk = context->Fetch(*lock, *this); - if (system_chunk) { - chunk = make_uniq(); - chunk->Initialize(Allocator::DefaultAllocator(), system_chunk->GetTypes()); - system_chunk->Copy(*chunk, 0); - } + chunk = FetchInternal(*lock); } if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) { Close(); @@ -95,11 +118,21 @@ unique_ptr StreamQueryResult::Materialize() { bool StreamQueryResult::IsOpenInternal(ClientContextLock &lock) { bool invalidated = !success || !context; if (!invalidated) { - invalidated = !context->IsActiveResult(lock, this); + invalidated = !context->IsActiveResult(lock, *this); } return !invalidated; } +void StreamQueryResult::CheckExecutableInternal(ClientContextLock &lock) { + if (!IsOpenInternal(lock)) { + string error_str = "Attempting to execute an unsuccessful or closed pending query result"; + if (HasError()) { + error_str += StringUtil::Format("\nError: %s", GetError()); + } + throw InvalidInputException(error_str); + } +} + bool StreamQueryResult::IsOpen() { if (!success || !context) { return false; @@ -109,6 +142,7 @@ bool StreamQueryResult::IsOpen() { } void StreamQueryResult::Close() { + buffered_data->Close(); context.reset(); } diff --git a/src/duckdb/src/parallel/executor.cpp b/src/duckdb/src/parallel/executor.cpp index eed8e535d..730f04a09 100644 --- a/src/duckdb/src/parallel/executor.cpp +++ b/src/duckdb/src/parallel/executor.cpp @@ -442,6 +442,28 @@ void Executor::RescheduleTask(shared_ptr &task_p) { } } +bool Executor::ResultCollectorIsBlocked() { + if (completed_pipelines + 1 != total_pipelines) { + // The result collector is always in the last pipeline + return false; + } + lock_guard l(executor_lock); + if (to_be_rescheduled_tasks.empty()) { + return false; + } + for (auto &kv : to_be_rescheduled_tasks) { + auto &task = kv.second; + if (task->TaskBlockedOnResult()) { + // At least one of the blocked tasks is connected to a result collector + // This task could be the only task that could unblock the other non-result-collector tasks + // To prevent a scenario where we halt indefinitely, we return here so it can be unblocked by a call to + // Fetch + return true; + } + } + return false; +} + void Executor::AddToBeRescheduled(shared_ptr &task_p) { lock_guard l(executor_lock); if (cancelled) { @@ -457,7 +479,7 @@ bool Executor::ExecutionIsFinished() { return completed_pipelines >= total_pipelines || HasError(); } -PendingExecutionResult Executor::ExecuteTask() { +PendingExecutionResult Executor::ExecuteTask(bool dry_run) { // Only executor should return NO_TASKS_AVAILABLE D_ASSERT(execution_result != PendingExecutionResult::NO_TASKS_AVAILABLE); if (execution_result != PendingExecutionResult::RESULT_NOT_READY) { @@ -467,14 +489,29 @@ PendingExecutionResult Executor::ExecuteTask() { auto &scheduler = TaskScheduler::GetScheduler(context); while (completed_pipelines < total_pipelines) { // there are! if we don't already have a task, fetch one - if (!task) { - scheduler.GetTaskFromProducer(*producer, task); + auto current_task = task.get(); + if (dry_run) { + // Pretend we have no task, we don't want to execute anything + current_task = nullptr; + } else { + if (!task) { + scheduler.GetTaskFromProducer(*producer, task); + } + current_task = task.get(); } - if (!task && !HasError()) { + + if (!current_task && !HasError()) { // there are no tasks to be scheduled and there are tasks blocked + if (ResultCollectorIsBlocked()) { + // The blocked tasks are processing the Sink of a BufferedResultCollector + // We return here so the query result can be made and fetched from + // which will in turn unblock the Sink tasks. + return PendingExecutionResult::BLOCKED; + } return PendingExecutionResult::NO_TASKS_AVAILABLE; } - if (task) { + + if (current_task) { // if we have a task, partially process it auto result = task->Execute(TaskExecutionMode::PROCESS_PARTIAL); if (result == TaskExecutionResult::TASK_BLOCKED) { @@ -564,6 +601,10 @@ bool Executor::HasError() { return error_manager.HasError(); } +ErrorData Executor::GetError() { + return error_manager.GetError(); +} + void Executor::ThrowException() { error_manager.ThrowException(); } @@ -616,24 +657,4 @@ unique_ptr Executor::GetResult() { return result_collector.GetResult(*result_collector.sink_state); } -unique_ptr Executor::FetchChunk() { - D_ASSERT(physical_plan); - - auto chunk = make_uniq(); - root_executor->InitializeChunk(*chunk); - while (true) { - root_executor->ExecutePull(*chunk); - if (chunk->size() == 0) { - root_executor->PullFinalize(); - if (NextExecutor()) { - continue; - } - break; - } else { - break; - } - } - return chunk; -} - } // namespace duckdb diff --git a/src/duckdb/src/parallel/pipeline.cpp b/src/duckdb/src/parallel/pipeline.cpp index 1448b1007..aee671b92 100644 --- a/src/duckdb/src/parallel/pipeline.cpp +++ b/src/duckdb/src/parallel/pipeline.cpp @@ -15,54 +15,54 @@ namespace duckdb { -class PipelineTask : public ExecutorTask { - static constexpr const idx_t PARTIAL_CHUNK_COUNT = 50; +PipelineTask::PipelineTask(Pipeline &pipeline_p, shared_ptr event_p) + : ExecutorTask(pipeline_p.executor), pipeline(pipeline_p), event(std::move(event_p)) { +} + +bool PipelineTask::TaskBlockedOnResult() const { + // If this returns true, it means the pipeline this task belongs to has a cached chunk + // that was the result of the Sink method returning BLOCKED + return pipeline_executor->RemainingSinkChunk(); +} + +const PipelineExecutor &PipelineTask::GetPipelineExecutor() const { + return *pipeline_executor; +} -public: - explicit PipelineTask(Pipeline &pipeline_p, shared_ptr event_p) - : ExecutorTask(pipeline_p.executor), pipeline(pipeline_p), event(std::move(event_p)) { +TaskExecutionResult PipelineTask::ExecuteTask(TaskExecutionMode mode) { + if (!pipeline_executor) { + pipeline_executor = make_uniq(pipeline.GetClientContext(), pipeline); } - Pipeline &pipeline; - shared_ptr event; - unique_ptr pipeline_executor; + pipeline_executor->SetTaskForInterrupts(shared_from_this()); -public: - TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { - if (!pipeline_executor) { - pipeline_executor = make_uniq(pipeline.GetClientContext(), pipeline); - } + if (mode == TaskExecutionMode::PROCESS_PARTIAL) { + auto res = pipeline_executor->Execute(PARTIAL_CHUNK_COUNT); - pipeline_executor->SetTaskForInterrupts(shared_from_this()); - - if (mode == TaskExecutionMode::PROCESS_PARTIAL) { - auto res = pipeline_executor->Execute(PARTIAL_CHUNK_COUNT); - - switch (res) { - case PipelineExecuteResult::NOT_FINISHED: - return TaskExecutionResult::TASK_NOT_FINISHED; - case PipelineExecuteResult::INTERRUPTED: - return TaskExecutionResult::TASK_BLOCKED; - case PipelineExecuteResult::FINISHED: - break; - } - } else { - auto res = pipeline_executor->Execute(); - switch (res) { - case PipelineExecuteResult::NOT_FINISHED: - throw InternalException("Execute without limit should not return NOT_FINISHED"); - case PipelineExecuteResult::INTERRUPTED: - return TaskExecutionResult::TASK_BLOCKED; - case PipelineExecuteResult::FINISHED: - break; - } + switch (res) { + case PipelineExecuteResult::NOT_FINISHED: + return TaskExecutionResult::TASK_NOT_FINISHED; + case PipelineExecuteResult::INTERRUPTED: + return TaskExecutionResult::TASK_BLOCKED; + case PipelineExecuteResult::FINISHED: + break; + } + } else { + auto res = pipeline_executor->Execute(); + switch (res) { + case PipelineExecuteResult::NOT_FINISHED: + throw InternalException("Execute without limit should not return NOT_FINISHED"); + case PipelineExecuteResult::INTERRUPTED: + return TaskExecutionResult::TASK_BLOCKED; + case PipelineExecuteResult::FINISHED: + break; } - - event->FinishTask(); - pipeline_executor.reset(); - return TaskExecutionResult::TASK_FINISHED; } -}; + + event->FinishTask(); + pipeline_executor.reset(); + return TaskExecutionResult::TASK_FINISHED; +} Pipeline::Pipeline(Executor &executor_p) : executor(executor_p), ready(false), initialized(false), source(nullptr), sink(nullptr) { diff --git a/src/duckdb/src/parallel/pipeline_executor.cpp b/src/duckdb/src/parallel/pipeline_executor.cpp index 2c00aef94..fadfd8bd0 100644 --- a/src/duckdb/src/parallel/pipeline_executor.cpp +++ b/src/duckdb/src/parallel/pipeline_executor.cpp @@ -245,6 +245,10 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { return PushFinalize(); } +bool PipelineExecutor::RemainingSinkChunk() const { + return remaining_sink_chunk; +} + PipelineExecuteResult PipelineExecutor::Execute() { return Execute(NumericLimits::Maximum()); } @@ -348,73 +352,6 @@ PipelineExecuteResult PipelineExecutor::PushFinalize() { return PipelineExecuteResult::FINISHED; } -// TODO: Refactoring the StreamingQueryResult to use Push-based execution should eliminate the need for this code -void PipelineExecutor::ExecutePull(DataChunk &result) { - if (IsFinished()) { - return; - } - auto &executor = pipeline.executor; - try { - D_ASSERT(!pipeline.sink); - D_ASSERT(!requires_batch_index); - auto &source_chunk = pipeline.operators.empty() ? result : *intermediate_chunks[0]; - while (result.size() == 0 && (!exhausted_source || !in_process_operators.empty())) { - if (in_process_operators.empty()) { - source_chunk.Reset(); - - auto done_signal = make_shared(); - interrupt_state = InterruptState(done_signal); - SourceResultType source_result; - - // Repeatedly try to fetch from the source until it doesn't block. Note that it may block multiple times - while (true) { - D_ASSERT(!exhausted_source); - source_result = FetchFromSource(source_chunk); - - // No interrupt happened, all good. - if (source_result != SourceResultType::BLOCKED) { - break; - } - - // Busy wait for async callback from source operator - done_signal->Await(); - } - - if (source_result == SourceResultType::FINISHED) { - exhausted_source = true; - if (source_chunk.size() == 0) { - break; - } - } - } - if (!pipeline.operators.empty()) { - auto state = Execute(source_chunk, result); - if (state == OperatorResultType::FINISHED) { - break; - } - } - } - } catch (std::exception &ex) { // LCOV_EXCL_START - if (executor.HasError()) { - executor.ThrowException(); - } - throw; - } catch (...) { - if (executor.HasError()) { - executor.ThrowException(); - } - throw; - } // LCOV_EXCL_STOP -} - -void PipelineExecutor::PullFinalize() { - if (finalized) { - throw InternalException("Calling PullFinalize on a pipeline that has been finalized already"); - } - finalized = true; - pipeline.executor.Flush(thread); -} - void PipelineExecutor::GoToSource(idx_t ¤t_idx, idx_t initial_idx) { // we go back to the first operator (the source) current_idx = initial_idx; diff --git a/src/duckdb/src/parallel/task_scheduler.cpp b/src/duckdb/src/parallel/task_scheduler.cpp index d0d3bed5b..9455e12eb 100644 --- a/src/duckdb/src/parallel/task_scheduler.cpp +++ b/src/duckdb/src/parallel/task_scheduler.cpp @@ -97,12 +97,12 @@ ProducerToken::~ProducerToken() { TaskScheduler::TaskScheduler(DatabaseInstance &db) : db(db), queue(make_uniq()), - allocator_flush_threshold(db.config.options.allocator_flush_threshold) { + allocator_flush_threshold(db.config.options.allocator_flush_threshold), thread_count(1) { } TaskScheduler::~TaskScheduler() { #ifndef DUCKDB_NO_THREADS - SetThreadsInternal(1); + RelaunchThreadsInternal(1); #endif } @@ -236,11 +236,10 @@ int32_t TaskScheduler::NumberOfThreads() { void TaskScheduler::SetThreads(int32_t n) { #ifndef DUCKDB_NO_THREADS - lock_guard t(thread_lock); if (n < 1) { throw SyntaxException("Must have at least 1 thread!"); } - SetThreadsInternal(n); + thread_count = n; #else if (n != 1) { throw NotImplementedException("DuckDB was compiled without threads! Setting threads > 1 is not allowed."); @@ -263,7 +262,13 @@ void TaskScheduler::YieldThread() { #endif } -void TaskScheduler::SetThreadsInternal(int32_t n) { +void TaskScheduler::RelaunchThreads() { + lock_guard t(thread_lock); + auto n = thread_count.load(); + RelaunchThreadsInternal(n); +} + +void TaskScheduler::RelaunchThreadsInternal(int32_t n) { #ifndef DUCKDB_NO_THREADS if (threads.size() == idx_t(n - 1)) { return; diff --git a/src/duckdb/ub_src_execution_operator_helper.cpp b/src/duckdb/ub_src_execution_operator_helper.cpp index 0821c06b2..6215a4b4b 100644 --- a/src/duckdb/ub_src_execution_operator_helper.cpp +++ b/src/duckdb/ub_src_execution_operator_helper.cpp @@ -1,5 +1,7 @@ #include "src/execution/operator/helper/physical_batch_collector.cpp" +#include "src/execution/operator/helper/physical_buffered_collector.cpp" + #include "src/execution/operator/helper/physical_create_secret.cpp" #include "src/execution/operator/helper/physical_execute.cpp" diff --git a/src/duckdb/ub_src_main_buffered_data.cpp b/src/duckdb/ub_src_main_buffered_data.cpp new file mode 100644 index 000000000..824214bff --- /dev/null +++ b/src/duckdb/ub_src_main_buffered_data.cpp @@ -0,0 +1,2 @@ +#include "src/main/buffered_data/simple_buffered_data.cpp" + diff --git a/src/include/sources.mk b/src/include/sources.mk index d134e9c62..cc12be870 100644 --- a/src/include/sources.mk +++ b/src/include/sources.mk @@ -1 +1 @@ -SOURCES=duckdb/ub_src_catalog.o duckdb/ub_src_catalog_catalog_entry.o duckdb/ub_src_catalog_catalog_entry_dependency.o duckdb/ub_src_catalog_default.o duckdb/ub_src_common_adbc.o duckdb/ub_src_common_adbc_nanoarrow.o duckdb/ub_src_common.o duckdb/ub_src_common_arrow_appender.o duckdb/ub_src_common_arrow.o duckdb/ub_src_common_crypto.o duckdb/ub_src_common_enums.o duckdb/ub_src_common_exception.o duckdb/ub_src_common_operator.o duckdb/ub_src_common_progress_bar.o duckdb/ub_src_common_row_operations.o duckdb/ub_src_common_serializer.o duckdb/ub_src_common_sort.o duckdb/ub_src_common_types.o duckdb/ub_src_common_types_column.o duckdb/ub_src_common_types_row.o duckdb/ub_src_common_value_operations.o duckdb/src/common/vector_operations/boolean_operators.o duckdb/src/common/vector_operations/comparison_operators.o duckdb/src/common/vector_operations/generators.o duckdb/src/common/vector_operations/is_distinct_from.o duckdb/src/common/vector_operations/null_operations.o duckdb/src/common/vector_operations/numeric_inplace_operators.o duckdb/src/common/vector_operations/vector_cast.o duckdb/src/common/vector_operations/vector_copy.o duckdb/src/common/vector_operations/vector_hash.o duckdb/src/common/vector_operations/vector_storage.o duckdb/ub_src_core_functions_aggregate_algebraic.o duckdb/ub_src_core_functions_aggregate_distributive.o duckdb/ub_src_core_functions_aggregate_holistic.o duckdb/ub_src_core_functions_aggregate_nested.o duckdb/ub_src_core_functions_aggregate_regression.o duckdb/ub_src_core_functions.o duckdb/ub_src_core_functions_scalar_array.o duckdb/ub_src_core_functions_scalar_bit.o duckdb/ub_src_core_functions_scalar_blob.o duckdb/ub_src_core_functions_scalar_date.o duckdb/ub_src_core_functions_scalar_debug.o duckdb/ub_src_core_functions_scalar_enum.o duckdb/ub_src_core_functions_scalar_generic.o duckdb/ub_src_core_functions_scalar_list.o duckdb/ub_src_core_functions_scalar_map.o duckdb/ub_src_core_functions_scalar_math.o duckdb/ub_src_core_functions_scalar_operators.o duckdb/ub_src_core_functions_scalar_random.o duckdb/ub_src_core_functions_scalar_secret.o duckdb/ub_src_core_functions_scalar_string.o duckdb/ub_src_core_functions_scalar_struct.o duckdb/ub_src_core_functions_scalar_union.o duckdb/ub_src_execution.o duckdb/ub_src_execution_expression_executor.o duckdb/ub_src_execution_index_art.o duckdb/ub_src_execution_index.o duckdb/ub_src_execution_nested_loop_join.o duckdb/ub_src_execution_operator_aggregate.o duckdb/ub_src_execution_operator_csv_scanner_buffer_manager.o duckdb/ub_src_execution_operator_csv_scanner_scanner.o duckdb/ub_src_execution_operator_csv_scanner_sniffer.o duckdb/ub_src_execution_operator_csv_scanner_state_machine.o duckdb/ub_src_execution_operator_csv_scanner_table_function.o duckdb/ub_src_execution_operator_csv_scanner_util.o duckdb/ub_src_execution_operator_filter.o duckdb/ub_src_execution_operator_helper.o duckdb/ub_src_execution_operator_join.o duckdb/ub_src_execution_operator_order.o duckdb/ub_src_execution_operator_persistent.o duckdb/ub_src_execution_operator_projection.o duckdb/ub_src_execution_operator_scan.o duckdb/ub_src_execution_operator_schema.o duckdb/ub_src_execution_operator_set.o duckdb/ub_src_execution_physical_plan.o duckdb/ub_src_function_aggregate_distributive.o duckdb/ub_src_function_aggregate.o duckdb/ub_src_function.o duckdb/ub_src_function_cast.o duckdb/ub_src_function_cast_union.o duckdb/ub_src_function_pragma.o duckdb/ub_src_function_scalar_compressed_materialization.o duckdb/ub_src_function_scalar.o duckdb/ub_src_function_scalar_generic.o duckdb/ub_src_function_scalar_list.o duckdb/ub_src_function_scalar_operators.o duckdb/ub_src_function_scalar_sequence.o duckdb/ub_src_function_scalar_string.o duckdb/ub_src_function_scalar_string_regexp.o duckdb/ub_src_function_scalar_struct.o duckdb/ub_src_function_scalar_system.o duckdb/ub_src_function_table_arrow.o duckdb/ub_src_function_table.o duckdb/ub_src_function_table_system.o duckdb/ub_src_function_table_version.o duckdb/ub_src_main.o duckdb/ub_src_main_capi.o duckdb/ub_src_main_capi_cast.o duckdb/ub_src_main_chunk_scan_state.o duckdb/ub_src_main_extension.o duckdb/ub_src_main_relation.o duckdb/ub_src_main_secret.o duckdb/ub_src_main_settings.o duckdb/ub_src_optimizer.o duckdb/ub_src_optimizer_compressed_materialization.o duckdb/ub_src_optimizer_join_order.o duckdb/ub_src_optimizer_matcher.o duckdb/ub_src_optimizer_pullup.o duckdb/ub_src_optimizer_pushdown.o duckdb/ub_src_optimizer_rule.o duckdb/ub_src_optimizer_statistics_expression.o duckdb/ub_src_optimizer_statistics_operator.o duckdb/ub_src_parallel.o duckdb/ub_src_parser.o duckdb/ub_src_parser_constraints.o duckdb/ub_src_parser_expression.o duckdb/ub_src_parser_parsed_data.o duckdb/ub_src_parser_query_node.o duckdb/ub_src_parser_statement.o duckdb/ub_src_parser_tableref.o duckdb/ub_src_parser_transform_constraint.o duckdb/ub_src_parser_transform_expression.o duckdb/ub_src_parser_transform_helpers.o duckdb/ub_src_parser_transform_statement.o duckdb/ub_src_parser_transform_tableref.o duckdb/ub_src_planner.o duckdb/ub_src_planner_binder_expression.o duckdb/ub_src_planner_binder_query_node.o duckdb/ub_src_planner_binder_statement.o duckdb/ub_src_planner_binder_tableref.o duckdb/ub_src_planner_expression.o duckdb/ub_src_planner_expression_binder.o duckdb/ub_src_planner_filter.o duckdb/ub_src_planner_operator.o duckdb/ub_src_planner_subquery.o duckdb/ub_src_storage.o duckdb/ub_src_storage_buffer.o duckdb/ub_src_storage_checkpoint.o duckdb/ub_src_storage_compression_alp.o duckdb/ub_src_storage_compression.o duckdb/ub_src_storage_compression_chimp.o duckdb/ub_src_storage_metadata.o duckdb/ub_src_storage_serialization.o duckdb/ub_src_storage_statistics.o duckdb/ub_src_storage_table.o duckdb/ub_src_transaction.o duckdb/src/verification/copied_statement_verifier.o duckdb/src/verification/deserialized_statement_verifier.o duckdb/src/verification/external_statement_verifier.o duckdb/src/verification/fetch_row_verifier.o duckdb/src/verification/no_operator_caching_verifier.o duckdb/src/verification/parsed_statement_verifier.o duckdb/src/verification/prepared_statement_verifier.o duckdb/src/verification/statement_verifier.o duckdb/src/verification/unoptimized_statement_verifier.o duckdb/third_party/fmt/format.o duckdb/third_party/fsst/fsst_avx512.o duckdb/third_party/fsst/libfsst.o duckdb/third_party/miniz/miniz.o duckdb/third_party/re2/re2/bitstate.o duckdb/third_party/re2/re2/compile.o duckdb/third_party/re2/re2/dfa.o duckdb/third_party/re2/re2/filtered_re2.o duckdb/third_party/re2/re2/mimics_pcre.o duckdb/third_party/re2/re2/nfa.o duckdb/third_party/re2/re2/onepass.o duckdb/third_party/re2/re2/parse.o duckdb/third_party/re2/re2/perl_groups.o duckdb/third_party/re2/re2/prefilter.o duckdb/third_party/re2/re2/prefilter_tree.o duckdb/third_party/re2/re2/prog.o duckdb/third_party/re2/re2/re2.o duckdb/third_party/re2/re2/regexp.o duckdb/third_party/re2/re2/set.o duckdb/third_party/re2/re2/simplify.o duckdb/third_party/re2/re2/stringpiece.o duckdb/third_party/re2/re2/tostring.o duckdb/third_party/re2/re2/unicode_casefold.o duckdb/third_party/re2/re2/unicode_groups.o duckdb/third_party/re2/util/rune.o duckdb/third_party/re2/util/strutil.o duckdb/third_party/hyperloglog/hyperloglog.o duckdb/third_party/hyperloglog/sds.o duckdb/third_party/skiplist/SkipList.o duckdb/third_party/fastpforlib/bitpacking.o duckdb/third_party/utf8proc/utf8proc.o duckdb/third_party/utf8proc/utf8proc_wrapper.o duckdb/third_party/libpg_query/pg_functions.o duckdb/third_party/libpg_query/postgres_parser.o duckdb/third_party/libpg_query/src_backend_nodes_list.o duckdb/third_party/libpg_query/src_backend_nodes_makefuncs.o duckdb/third_party/libpg_query/src_backend_nodes_value.o duckdb/third_party/libpg_query/src_backend_parser_gram.o duckdb/third_party/libpg_query/src_backend_parser_parser.o duckdb/third_party/libpg_query/src_backend_parser_scan.o duckdb/third_party/libpg_query/src_backend_parser_scansup.o duckdb/third_party/libpg_query/src_common_keywords.o duckdb/third_party/mbedtls/library/aes.o duckdb/third_party/mbedtls/library/aria.o duckdb/third_party/mbedtls/library/asn1parse.o duckdb/third_party/mbedtls/library/base64.o duckdb/third_party/mbedtls/library/bignum.o duckdb/third_party/mbedtls/library/camellia.o duckdb/third_party/mbedtls/library/cipher.o duckdb/third_party/mbedtls/library/cipher_wrap.o duckdb/third_party/mbedtls/library/constant_time.o duckdb/third_party/mbedtls/library/entropy.o duckdb/third_party/mbedtls/library/entropy_poll.o duckdb/third_party/mbedtls/library/gcm.o duckdb/third_party/mbedtls/library/md.o duckdb/third_party/mbedtls/library/oid.o duckdb/third_party/mbedtls/library/pem.o duckdb/third_party/mbedtls/library/pk.o duckdb/third_party/mbedtls/library/pk_wrap.o duckdb/third_party/mbedtls/library/pkparse.o duckdb/third_party/mbedtls/library/platform_util.o duckdb/third_party/mbedtls/library/rsa.o duckdb/third_party/mbedtls/library/rsa_alt_helpers.o duckdb/third_party/mbedtls/library/sha1.o duckdb/third_party/mbedtls/library/sha256.o duckdb/third_party/mbedtls/library/sha512.o duckdb/third_party/mbedtls/mbedtls_wrapper.o duckdb/extension/parquet/column_reader.o duckdb/extension/parquet/column_writer.o duckdb/extension/parquet/parquet_crypto.o duckdb/extension/parquet/parquet_extension.o duckdb/extension/parquet/parquet_metadata.o duckdb/extension/parquet/parquet_reader.o duckdb/extension/parquet/parquet_statistics.o duckdb/extension/parquet/parquet_timestamp.o duckdb/extension/parquet/parquet_writer.o duckdb/extension/parquet/serialize_parquet.o duckdb/extension/parquet/zstd_file_system.o duckdb/third_party/parquet/parquet_constants.o duckdb/third_party/parquet/parquet_types.o duckdb/third_party/thrift/thrift/protocol/TProtocol.o duckdb/third_party/thrift/thrift/transport/TTransportException.o duckdb/third_party/thrift/thrift/transport/TBufferTransports.o duckdb/third_party/snappy/snappy.o duckdb/third_party/snappy/snappy-sinksource.o duckdb/third_party/zstd/decompress/zstd_ddict.o duckdb/third_party/zstd/decompress/huf_decompress.o duckdb/third_party/zstd/decompress/zstd_decompress.o duckdb/third_party/zstd/decompress/zstd_decompress_block.o duckdb/third_party/zstd/common/entropy_common.o duckdb/third_party/zstd/common/fse_decompress.o duckdb/third_party/zstd/common/zstd_common.o duckdb/third_party/zstd/common/error_private.o duckdb/third_party/zstd/common/xxhash.o duckdb/third_party/zstd/compress/fse_compress.o duckdb/third_party/zstd/compress/hist.o duckdb/third_party/zstd/compress/huf_compress.o duckdb/third_party/zstd/compress/zstd_compress.o duckdb/third_party/zstd/compress/zstd_compress_literals.o duckdb/third_party/zstd/compress/zstd_compress_sequences.o duckdb/third_party/zstd/compress/zstd_compress_superblock.o duckdb/third_party/zstd/compress/zstd_double_fast.o duckdb/third_party/zstd/compress/zstd_fast.o duckdb/third_party/zstd/compress/zstd_lazy.o duckdb/third_party/zstd/compress/zstd_ldm.o duckdb/third_party/zstd/compress/zstd_opt.o +SOURCES=duckdb/ub_src_catalog.o duckdb/ub_src_catalog_catalog_entry.o duckdb/ub_src_catalog_catalog_entry_dependency.o duckdb/ub_src_catalog_default.o duckdb/ub_src_common_adbc.o duckdb/ub_src_common_adbc_nanoarrow.o duckdb/ub_src_common.o duckdb/ub_src_common_arrow_appender.o duckdb/ub_src_common_arrow.o duckdb/ub_src_common_crypto.o duckdb/ub_src_common_enums.o duckdb/ub_src_common_exception.o duckdb/ub_src_common_operator.o duckdb/ub_src_common_progress_bar.o duckdb/ub_src_common_row_operations.o duckdb/ub_src_common_serializer.o duckdb/ub_src_common_sort.o duckdb/ub_src_common_types.o duckdb/ub_src_common_types_column.o duckdb/ub_src_common_types_row.o duckdb/ub_src_common_value_operations.o duckdb/src/common/vector_operations/boolean_operators.o duckdb/src/common/vector_operations/comparison_operators.o duckdb/src/common/vector_operations/generators.o duckdb/src/common/vector_operations/is_distinct_from.o duckdb/src/common/vector_operations/null_operations.o duckdb/src/common/vector_operations/numeric_inplace_operators.o duckdb/src/common/vector_operations/vector_cast.o duckdb/src/common/vector_operations/vector_copy.o duckdb/src/common/vector_operations/vector_hash.o duckdb/src/common/vector_operations/vector_storage.o duckdb/ub_src_core_functions_aggregate_algebraic.o duckdb/ub_src_core_functions_aggregate_distributive.o duckdb/ub_src_core_functions_aggregate_holistic.o duckdb/ub_src_core_functions_aggregate_nested.o duckdb/ub_src_core_functions_aggregate_regression.o duckdb/ub_src_core_functions.o duckdb/ub_src_core_functions_scalar_array.o duckdb/ub_src_core_functions_scalar_bit.o duckdb/ub_src_core_functions_scalar_blob.o duckdb/ub_src_core_functions_scalar_date.o duckdb/ub_src_core_functions_scalar_debug.o duckdb/ub_src_core_functions_scalar_enum.o duckdb/ub_src_core_functions_scalar_generic.o duckdb/ub_src_core_functions_scalar_list.o duckdb/ub_src_core_functions_scalar_map.o duckdb/ub_src_core_functions_scalar_math.o duckdb/ub_src_core_functions_scalar_operators.o duckdb/ub_src_core_functions_scalar_random.o duckdb/ub_src_core_functions_scalar_secret.o duckdb/ub_src_core_functions_scalar_string.o duckdb/ub_src_core_functions_scalar_struct.o duckdb/ub_src_core_functions_scalar_union.o duckdb/ub_src_execution.o duckdb/ub_src_execution_expression_executor.o duckdb/ub_src_execution_index_art.o duckdb/ub_src_execution_index.o duckdb/ub_src_execution_nested_loop_join.o duckdb/ub_src_execution_operator_aggregate.o duckdb/ub_src_execution_operator_csv_scanner_buffer_manager.o duckdb/ub_src_execution_operator_csv_scanner_scanner.o duckdb/ub_src_execution_operator_csv_scanner_sniffer.o duckdb/ub_src_execution_operator_csv_scanner_state_machine.o duckdb/ub_src_execution_operator_csv_scanner_table_function.o duckdb/ub_src_execution_operator_csv_scanner_util.o duckdb/ub_src_execution_operator_filter.o duckdb/ub_src_execution_operator_helper.o duckdb/ub_src_execution_operator_join.o duckdb/ub_src_execution_operator_order.o duckdb/ub_src_execution_operator_persistent.o duckdb/ub_src_execution_operator_projection.o duckdb/ub_src_execution_operator_scan.o duckdb/ub_src_execution_operator_schema.o duckdb/ub_src_execution_operator_set.o duckdb/ub_src_execution_physical_plan.o duckdb/ub_src_function_aggregate_distributive.o duckdb/ub_src_function_aggregate.o duckdb/ub_src_function.o duckdb/ub_src_function_cast.o duckdb/ub_src_function_cast_union.o duckdb/ub_src_function_pragma.o duckdb/ub_src_function_scalar_compressed_materialization.o duckdb/ub_src_function_scalar.o duckdb/ub_src_function_scalar_generic.o duckdb/ub_src_function_scalar_list.o duckdb/ub_src_function_scalar_operators.o duckdb/ub_src_function_scalar_sequence.o duckdb/ub_src_function_scalar_string.o duckdb/ub_src_function_scalar_string_regexp.o duckdb/ub_src_function_scalar_struct.o duckdb/ub_src_function_scalar_system.o duckdb/ub_src_function_table_arrow.o duckdb/ub_src_function_table.o duckdb/ub_src_function_table_system.o duckdb/ub_src_function_table_version.o duckdb/ub_src_main.o duckdb/ub_src_main_buffered_data.o duckdb/ub_src_main_capi.o duckdb/ub_src_main_capi_cast.o duckdb/ub_src_main_chunk_scan_state.o duckdb/ub_src_main_extension.o duckdb/ub_src_main_relation.o duckdb/ub_src_main_secret.o duckdb/ub_src_main_settings.o duckdb/ub_src_optimizer.o duckdb/ub_src_optimizer_compressed_materialization.o duckdb/ub_src_optimizer_join_order.o duckdb/ub_src_optimizer_matcher.o duckdb/ub_src_optimizer_pullup.o duckdb/ub_src_optimizer_pushdown.o duckdb/ub_src_optimizer_rule.o duckdb/ub_src_optimizer_statistics_expression.o duckdb/ub_src_optimizer_statistics_operator.o duckdb/ub_src_parallel.o duckdb/ub_src_parser.o duckdb/ub_src_parser_constraints.o duckdb/ub_src_parser_expression.o duckdb/ub_src_parser_parsed_data.o duckdb/ub_src_parser_query_node.o duckdb/ub_src_parser_statement.o duckdb/ub_src_parser_tableref.o duckdb/ub_src_parser_transform_constraint.o duckdb/ub_src_parser_transform_expression.o duckdb/ub_src_parser_transform_helpers.o duckdb/ub_src_parser_transform_statement.o duckdb/ub_src_parser_transform_tableref.o duckdb/ub_src_planner.o duckdb/ub_src_planner_binder_expression.o duckdb/ub_src_planner_binder_query_node.o duckdb/ub_src_planner_binder_statement.o duckdb/ub_src_planner_binder_tableref.o duckdb/ub_src_planner_expression.o duckdb/ub_src_planner_expression_binder.o duckdb/ub_src_planner_filter.o duckdb/ub_src_planner_operator.o duckdb/ub_src_planner_subquery.o duckdb/ub_src_storage.o duckdb/ub_src_storage_buffer.o duckdb/ub_src_storage_checkpoint.o duckdb/ub_src_storage_compression_alp.o duckdb/ub_src_storage_compression.o duckdb/ub_src_storage_compression_chimp.o duckdb/ub_src_storage_metadata.o duckdb/ub_src_storage_serialization.o duckdb/ub_src_storage_statistics.o duckdb/ub_src_storage_table.o duckdb/ub_src_transaction.o duckdb/src/verification/copied_statement_verifier.o duckdb/src/verification/deserialized_statement_verifier.o duckdb/src/verification/external_statement_verifier.o duckdb/src/verification/fetch_row_verifier.o duckdb/src/verification/no_operator_caching_verifier.o duckdb/src/verification/parsed_statement_verifier.o duckdb/src/verification/prepared_statement_verifier.o duckdb/src/verification/statement_verifier.o duckdb/src/verification/unoptimized_statement_verifier.o duckdb/third_party/fmt/format.o duckdb/third_party/fsst/fsst_avx512.o duckdb/third_party/fsst/libfsst.o duckdb/third_party/miniz/miniz.o duckdb/third_party/re2/re2/bitstate.o duckdb/third_party/re2/re2/compile.o duckdb/third_party/re2/re2/dfa.o duckdb/third_party/re2/re2/filtered_re2.o duckdb/third_party/re2/re2/mimics_pcre.o duckdb/third_party/re2/re2/nfa.o duckdb/third_party/re2/re2/onepass.o duckdb/third_party/re2/re2/parse.o duckdb/third_party/re2/re2/perl_groups.o duckdb/third_party/re2/re2/prefilter.o duckdb/third_party/re2/re2/prefilter_tree.o duckdb/third_party/re2/re2/prog.o duckdb/third_party/re2/re2/re2.o duckdb/third_party/re2/re2/regexp.o duckdb/third_party/re2/re2/set.o duckdb/third_party/re2/re2/simplify.o duckdb/third_party/re2/re2/stringpiece.o duckdb/third_party/re2/re2/tostring.o duckdb/third_party/re2/re2/unicode_casefold.o duckdb/third_party/re2/re2/unicode_groups.o duckdb/third_party/re2/util/rune.o duckdb/third_party/re2/util/strutil.o duckdb/third_party/hyperloglog/hyperloglog.o duckdb/third_party/hyperloglog/sds.o duckdb/third_party/skiplist/SkipList.o duckdb/third_party/fastpforlib/bitpacking.o duckdb/third_party/utf8proc/utf8proc.o duckdb/third_party/utf8proc/utf8proc_wrapper.o duckdb/third_party/libpg_query/pg_functions.o duckdb/third_party/libpg_query/postgres_parser.o duckdb/third_party/libpg_query/src_backend_nodes_list.o duckdb/third_party/libpg_query/src_backend_nodes_makefuncs.o duckdb/third_party/libpg_query/src_backend_nodes_value.o duckdb/third_party/libpg_query/src_backend_parser_gram.o duckdb/third_party/libpg_query/src_backend_parser_parser.o duckdb/third_party/libpg_query/src_backend_parser_scan.o duckdb/third_party/libpg_query/src_backend_parser_scansup.o duckdb/third_party/libpg_query/src_common_keywords.o duckdb/third_party/mbedtls/library/aes.o duckdb/third_party/mbedtls/library/aria.o duckdb/third_party/mbedtls/library/asn1parse.o duckdb/third_party/mbedtls/library/base64.o duckdb/third_party/mbedtls/library/bignum.o duckdb/third_party/mbedtls/library/camellia.o duckdb/third_party/mbedtls/library/cipher.o duckdb/third_party/mbedtls/library/cipher_wrap.o duckdb/third_party/mbedtls/library/constant_time.o duckdb/third_party/mbedtls/library/entropy.o duckdb/third_party/mbedtls/library/entropy_poll.o duckdb/third_party/mbedtls/library/gcm.o duckdb/third_party/mbedtls/library/md.o duckdb/third_party/mbedtls/library/oid.o duckdb/third_party/mbedtls/library/pem.o duckdb/third_party/mbedtls/library/pk.o duckdb/third_party/mbedtls/library/pk_wrap.o duckdb/third_party/mbedtls/library/pkparse.o duckdb/third_party/mbedtls/library/platform_util.o duckdb/third_party/mbedtls/library/rsa.o duckdb/third_party/mbedtls/library/rsa_alt_helpers.o duckdb/third_party/mbedtls/library/sha1.o duckdb/third_party/mbedtls/library/sha256.o duckdb/third_party/mbedtls/library/sha512.o duckdb/third_party/mbedtls/mbedtls_wrapper.o duckdb/extension/parquet/column_reader.o duckdb/extension/parquet/column_writer.o duckdb/extension/parquet/parquet_crypto.o duckdb/extension/parquet/parquet_extension.o duckdb/extension/parquet/parquet_metadata.o duckdb/extension/parquet/parquet_reader.o duckdb/extension/parquet/parquet_statistics.o duckdb/extension/parquet/parquet_timestamp.o duckdb/extension/parquet/parquet_writer.o duckdb/extension/parquet/serialize_parquet.o duckdb/extension/parquet/zstd_file_system.o duckdb/third_party/parquet/parquet_constants.o duckdb/third_party/parquet/parquet_types.o duckdb/third_party/thrift/thrift/protocol/TProtocol.o duckdb/third_party/thrift/thrift/transport/TTransportException.o duckdb/third_party/thrift/thrift/transport/TBufferTransports.o duckdb/third_party/snappy/snappy.o duckdb/third_party/snappy/snappy-sinksource.o duckdb/third_party/zstd/decompress/zstd_ddict.o duckdb/third_party/zstd/decompress/huf_decompress.o duckdb/third_party/zstd/decompress/zstd_decompress.o duckdb/third_party/zstd/decompress/zstd_decompress_block.o duckdb/third_party/zstd/common/entropy_common.o duckdb/third_party/zstd/common/fse_decompress.o duckdb/third_party/zstd/common/zstd_common.o duckdb/third_party/zstd/common/error_private.o duckdb/third_party/zstd/common/xxhash.o duckdb/third_party/zstd/compress/fse_compress.o duckdb/third_party/zstd/compress/hist.o duckdb/third_party/zstd/compress/huf_compress.o duckdb/third_party/zstd/compress/zstd_compress.o duckdb/third_party/zstd/compress/zstd_compress_literals.o duckdb/third_party/zstd/compress/zstd_compress_sequences.o duckdb/third_party/zstd/compress/zstd_compress_superblock.o duckdb/third_party/zstd/compress/zstd_double_fast.o duckdb/third_party/zstd/compress/zstd_fast.o duckdb/third_party/zstd/compress/zstd_lazy.o duckdb/third_party/zstd/compress/zstd_ldm.o duckdb/third_party/zstd/compress/zstd_opt.o