From 9ef76ecd649b8c687b0f4f36d9ebfa84908036eb Mon Sep 17 00:00:00 2001 From: Mark Raasveldt Date: Mon, 17 Feb 2020 13:28:37 +0100 Subject: [PATCH] Fix several other issues with setting incorrect cardinality --- src/common/types/data_chunk.cpp | 10 ++++++++- src/common/vector_operations/gather.cpp | 2 +- src/execution/aggregate_hashtable.cpp | 2 +- src/execution/join_hashtable.cpp | 21 ++++++++++++------- .../aggregate/physical_hash_aggregate.cpp | 11 +++++----- .../operator/aggregate/physical_window.cpp | 2 ++ .../operator/helper/physical_limit.cpp | 16 ++++---------- .../operator/join/physical_hash_join.cpp | 11 +++------- .../join/physical_piecewise_merge_join.cpp | 3 +++ .../persistent/physical_copy_to_file.cpp | 1 + .../operator/persistent/physical_update.cpp | 3 ++- .../operator/scan/physical_chunk_scan.cpp | 4 +--- .../duckdb/common/types/data_chunk.hpp | 3 +++ .../duckdb/storage/numeric_segment.hpp | 2 +- src/storage/numeric_segment.cpp | 8 +++---- 15 files changed, 54 insertions(+), 45 deletions(-) diff --git a/src/common/types/data_chunk.cpp b/src/common/types/data_chunk.cpp index b6b30ff9c58..c78130e746d 100644 --- a/src/common/types/data_chunk.cpp +++ b/src/common/types/data_chunk.cpp @@ -49,6 +49,14 @@ void DataChunk::SetValue(index_t col_idx, index_t index, Value val) { data[col_idx].SetValue(sel_vector ? sel_vector[index] : index, move(val)); } +void DataChunk::Reference(DataChunk &chunk) { + assert(chunk.column_count() == column_count()); + SetCardinality(chunk); + for (index_t i = 0; i < column_count(); i++) { + data[i].Reference(chunk.data[i]); + } +} + void DataChunk::Copy(DataChunk &other, index_t offset) { assert(column_count() == other.column_count()); assert(other.size() == 0 && !other.sel_vector); @@ -56,7 +64,7 @@ void DataChunk::Copy(DataChunk &other, index_t offset) { for (index_t i = 0; i < column_count(); i++) { VectorOperations::Copy(data[i], other.data[i], offset); } - other.SetCardinality(size()); + other.SetCardinality(size() - offset); } void DataChunk::Append(DataChunk &other) { diff --git a/src/common/vector_operations/gather.cpp b/src/common/vector_operations/gather.cpp index 1949fbd37f2..46d247abff7 100644 --- a/src/common/vector_operations/gather.cpp +++ b/src/common/vector_operations/gather.cpp @@ -97,7 +97,7 @@ template static void generic_gather_loop(Vector &source, } void VectorOperations::Gather::Set(Vector &source, Vector &dest, bool set_null, index_t offset) { - assert(source.SameCardinality(dest)); + assert(source.size() == dest.size()); if (set_null) { generic_gather_loop(source, dest, offset); } else { diff --git a/src/execution/aggregate_hashtable.cpp b/src/execution/aggregate_hashtable.cpp index affdede7d47..cf3b41c2e3d 100644 --- a/src/execution/aggregate_hashtable.cpp +++ b/src/execution/aggregate_hashtable.cpp @@ -229,7 +229,7 @@ void SuperLargeHashTable::FetchAggregates(DataChunk &groups, DataChunk &result) for (index_t i = 0; i < result.column_count(); i++) { assert(result.data[i].type == payload_types[i]); } - result.SetCardinality(groups.size(), groups.sel_vector); + result.SetCardinality(groups); if (groups.size() == 0) { return; } diff --git a/src/execution/join_hashtable.cpp b/src/execution/join_hashtable.cpp index 46434d56ccd..6fc8dc19a60 100644 --- a/src/execution/join_hashtable.cpp +++ b/src/execution/join_hashtable.cpp @@ -161,13 +161,15 @@ void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) { // for the correlated mark join we need to keep track of COUNT(*) and COUNT(COLUMN) for each of the correlated // columns push into the aggregate hash table assert(info.correlated_counts); + info.group_chunk.SetCardinality(keys); for (index_t i = 0; i < info.correlated_types.size(); i++) { info.group_chunk.data[i].Reference(keys.data[i]); } - info.payload_chunk.data[0].Reference(keys.data[info.correlated_types.size()]); - info.payload_chunk.data[1].Reference(keys.data[info.correlated_types.size()]); - info.payload_chunk.data[0].type = info.payload_chunk.data[1].type = TypeId::INT64; - info.payload_chunk.sel_vector = info.group_chunk.sel_vector = info.group_chunk.data[0].sel_vector(); + info.payload_chunk.SetCardinality(keys); + for(index_t i = 0; i < 2; i++) { + info.payload_chunk.data[i].Reference(keys.data[info.correlated_types.size()]); + info.payload_chunk.data[i].type = TypeId::INT64; + } info.correlated_counts->AddChunk(info.group_chunk, info.payload_chunk); } sel_t not_null_sel_vector[STANDARD_VECTOR_SIZE]; @@ -428,6 +430,8 @@ void ScanStructure::Next(DataChunk &keys, DataChunk &left, DataChunk &result) { index_t ScanStructure::ResolvePredicates(DataChunk &keys, sel_t comparison_result[]) { FlatVector current_pointers; + current_pointers.SetCount(pointers.size()); + current_pointers.SetSelVector(pointers.sel_vector()); current_pointers.Reference(pointers); index_t comparison_count; @@ -553,11 +557,11 @@ void ScanStructure::NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &r // matches were found // construct the result build_pointer_vector.SetCount(result_count); + result.SetCardinality(result_count, result.owned_sel_vector); // reference the columns of the left side from the result for (index_t i = 0; i < left.column_count(); i++) { result.data[i].Reference(left.data[i]); } - result.SetCardinality(result_count, result.owned_sel_vector); // now fetch the right side data from the HT for (index_t i = 0; i < ht.build_types.size(); i++) { auto &vector = result.data[left.column_count() + i]; @@ -690,21 +694,22 @@ void ScanStructure::NextMarkJoin(DataChunk &keys, DataChunk &input, DataChunk &r // there are correlated columns // first we fetch the counts from the aggregate hashtable corresponding to these entries assert(keys.column_count() == info.group_chunk.column_count() + 1); + info.group_chunk.SetCardinality(keys); for (index_t i = 0; i < info.group_chunk.column_count(); i++) { info.group_chunk.data[i].Reference(keys.data[i]); } - info.group_chunk.sel_vector = keys.sel_vector; info.correlated_counts->FetchAggregates(info.group_chunk, info.result_chunk); assert(!info.result_chunk.sel_vector); // for the initial set of columns we just reference the left side + result.SetCardinality(input); for (index_t i = 0; i < input.column_count(); i++) { result.data[i].Reference(input.data[i]); } // create the result matching vector auto &result_vector = result.data.back(); // first set the nullmask based on whether or not there were NULL values in the join key - result_vector.nullmask = keys.data[keys.column_count() - 1].nullmask; + result_vector.nullmask = keys.data.back().nullmask; auto bool_result = (bool *)result_vector.GetData(); auto count_star = (int64_t *)info.result_chunk.data[0].GetData(); @@ -722,7 +727,6 @@ void ScanStructure::NextMarkJoin(DataChunk &keys, DataChunk &input, DataChunk &r result_vector.nullmask[i] = false; } } - result.SetCardinality(input.size()); } finished = true; } @@ -818,6 +822,7 @@ void ScanStructure::NextSingleJoin(DataChunk &keys, DataChunk &input, DataChunk VectorOperations::Gather::Set(build_pointer_vector, vector); VectorOperations::AddInPlace(build_pointer_vector, GetTypeIdSize(ht.build_types[i])); } + result.SetCardinality(input); // like the SEMI, ANTI and MARK join types, the SINGLE join only ever does one pass over the HT per input chunk finished = true; } diff --git a/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/execution/operator/aggregate/physical_hash_aggregate.cpp index e42203b8b7e..f341fd3aa8d 100644 --- a/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -103,19 +103,20 @@ void PhysicalHashAggregate::GetChunkInternal(ClientContext &context, DataChunk & // special case hack to sort out aggregating from empty intermediates // for aggregations without groups if (elements_found == 0 && state->tuples_scanned == 0 && is_implicit_aggr) { - assert(state->aggregate_chunk.column_count() == aggregates.size()); + assert(chunk.column_count() == aggregates.size()); // for each column in the aggregates, set to initial state - state->aggregate_chunk.SetCardinality(1); - for (index_t i = 0; i < state->aggregate_chunk.column_count(); i++) { + chunk.SetCardinality(1); + for (index_t i = 0; i < chunk.column_count(); i++) { assert(aggregates[i]->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); auto &aggr = (BoundAggregateExpression &)*aggregates[i]; auto aggr_state = unique_ptr(new data_t[aggr.function.state_size(aggr.return_type)]); aggr.function.initialize(aggr_state.get(), aggr.return_type); - Vector state_vector(state->aggregate_chunk, Value::POINTER((uintptr_t)aggr_state.get())); - aggr.function.finalize(state_vector, state->aggregate_chunk.data[i]); + Vector state_vector(chunk, Value::POINTER((uintptr_t)aggr_state.get())); + aggr.function.finalize(state_vector, chunk.data[i]); } state->finished = true; + return; } if (elements_found == 0 && !state->finished) { state->finished = true; diff --git a/src/execution/operator/aggregate/physical_window.cpp b/src/execution/operator/aggregate/physical_window.cpp index aa6da7c5347..5916e57caf9 100644 --- a/src/execution/operator/aggregate/physical_window.cpp +++ b/src/execution/operator/aggregate/physical_window.cpp @@ -507,6 +507,8 @@ void PhysicalWindow::GetChunkInternal(ClientContext &context, DataChunk &chunk, auto &wind_ch = window_results.GetChunk(state->position); index_t out_idx = 0; + assert(proj_ch.size() == wind_ch.size()); + chunk.SetCardinality(proj_ch); for (index_t col_idx = 0; col_idx < proj_ch.column_count(); col_idx++) { chunk.data[out_idx++].Reference(proj_ch.data[col_idx]); } diff --git a/src/execution/operator/helper/physical_limit.cpp b/src/execution/operator/helper/physical_limit.cpp index a19ec3ba7f0..1e529c84f08 100644 --- a/src/execution/operator/helper/physical_limit.cpp +++ b/src/execution/operator/helper/physical_limit.cpp @@ -34,17 +34,11 @@ void PhysicalLimit::GetChunkInternal(ClientContext &context, DataChunk &chunk, P index_t start_position = offset - state->current_offset; index_t chunk_count = min(limit, state->child_chunk.size() - start_position); - // first reference all the columns and set up the counts + // set up a slice of the input chunks + chunk.SetCardinality(chunk_count, state->child_chunk.sel_vector); for (index_t i = 0; i < chunk.column_count(); i++) { - chunk.data[i].Reference(state->child_chunk.data[i]); + chunk.data[i].Slice(state->child_chunk.data[i], start_position); } - // now set up the selection vector of the chunk - for (index_t idx = 0; idx < chunk_count; idx++) { - chunk.owned_sel_vector[idx] = state->child_chunk.sel_vector - ? state->child_chunk.sel_vector[start_position + idx] - : start_position + idx; - } - chunk.SetCardinality(chunk_count, chunk.owned_sel_vector); } } else { // have to copy either the entire chunk or part of it @@ -57,9 +51,7 @@ void PhysicalLimit::GetChunkInternal(ClientContext &context, DataChunk &chunk, P chunk_count = state->child_chunk.size(); } // instead of copying we just change the pointer in the current chunk - for (index_t i = 0; i < chunk.column_count(); i++) { - chunk.data[i].Reference(state->child_chunk.data[i]); - } + chunk.Reference(state->child_chunk); chunk.SetCardinality(chunk_count, state->child_chunk.sel_vector); } diff --git a/src/execution/operator/join/physical_hash_join.cpp b/src/execution/operator/join/physical_hash_join.cpp index 6a8f2c5d557..18027ffffeb 100644 --- a/src/execution/operator/join/physical_hash_join.cpp +++ b/src/execution/operator/join/physical_hash_join.cpp @@ -68,10 +68,10 @@ void PhysicalHashJoin::BuildHashTable(ClientContext &context, PhysicalOperatorSt if (right_projection_map.size() > 0) { // there is a projection map: fill the build chunk with the projected columns build_chunk.Reset(); + build_chunk.SetCardinality(right_chunk); for (index_t i = 0; i < right_projection_map.size(); i++) { build_chunk.data[i].Reference(right_chunk.data[right_projection_map[i]]); } - build_chunk.sel_vector = right_chunk.sel_vector; hash_table->Build(state->join_keys, build_chunk); } else { // there is not a projected map: place the entire right chunk in the HT @@ -108,9 +108,7 @@ void PhysicalHashJoin::ProbeHashTable(ClientContext &context, DataChunk &chunk, // anti join with empty hash table, NOP join // return the input assert(chunk.column_count() == state->child_chunk.column_count()); - for (index_t i = 0; i < chunk.column_count(); i++) { - chunk.data[i].Reference(state->child_chunk.data[i]); - } + chunk.Reference(state->child_chunk); return; } else if (hash_table->join_type == JoinType::MARK) { // MARK join with empty hash table @@ -183,10 +181,7 @@ void PhysicalHashJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk if (chunk.size() == 0) { if (state->cached_chunk.size() > 0) { // finished probing but cached data remains, return cached chunk - for (index_t col_idx = 0; col_idx < chunk.column_count(); col_idx++) { - chunk.data[col_idx].Reference(state->cached_chunk.data[col_idx]); - } - chunk.sel_vector = state->cached_chunk.sel_vector; + chunk.Reference(state->cached_chunk); state->cached_chunk.Reset(); } return; diff --git a/src/execution/operator/join/physical_piecewise_merge_join.cpp b/src/execution/operator/join/physical_piecewise_merge_join.cpp index bb0fb731120..51eb8325fc0 100644 --- a/src/execution/operator/join/physical_piecewise_merge_join.cpp +++ b/src/execution/operator/join/physical_piecewise_merge_join.cpp @@ -85,6 +85,8 @@ void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataCh // resolve the join keys for the right chunk state->join_keys.Reset(); state->rhs_executor.SetChunk(chunk_to_order); + + state->join_keys.SetCardinality(chunk_to_order); for (index_t k = 0; k < conditions.size(); k++) { // resolve the join key state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]); @@ -115,6 +117,7 @@ void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataCh // resolve the join keys for the left chunk state->join_keys.Reset(); state->lhs_executor.SetChunk(state->child_chunk); + state->join_keys.SetCardinality(state->child_chunk); for (index_t k = 0; k < conditions.size(); k++) { state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]); // sort by join key diff --git a/src/execution/operator/persistent/physical_copy_to_file.cpp b/src/execution/operator/persistent/physical_copy_to_file.cpp index 35d4177b804..1e9e6e9bffb 100644 --- a/src/execution/operator/persistent/physical_copy_to_file.cpp +++ b/src/execution/operator/persistent/physical_copy_to_file.cpp @@ -164,6 +164,7 @@ void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chu break; } // cast the columns of the chunk to varchar + cast_chunk.SetCardinality(state->child_chunk); for (index_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) { if (sql_types[col_idx].id == SQLTypeId::VARCHAR) { // VARCHAR, just create a reference diff --git a/src/execution/operator/persistent/physical_update.cpp b/src/execution/operator/persistent/physical_update.cpp index d2309cdd121..484f7d03bf1 100644 --- a/src/execution/operator/persistent/physical_update.cpp +++ b/src/execution/operator/persistent/physical_update.cpp @@ -39,6 +39,7 @@ void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk, // update data in the base table // the row ids are given to us as the last column of the child chunk auto &row_ids = state->child_chunk.data[state->child_chunk.column_count() - 1]; + update_chunk.SetCardinality(state->child_chunk); for (index_t i = 0; i < expressions.size(); i++) { if (expressions[i]->type == ExpressionType::VALUE_DEFAULT) { // default expression, set to the default value of the column @@ -50,11 +51,11 @@ void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk, update_chunk.data[i].Reference(state->child_chunk.data[binding.index]); } } - update_chunk.sel_vector = state->child_chunk.sel_vector; if (is_index_update) { // index update, perform a delete and an append instead table.Delete(tableref, context, row_ids); + mock_chunk.SetCardinality(update_chunk); for (index_t i = 0; i < columns.size(); i++) { mock_chunk.data[columns[i]].Reference(update_chunk.data[i]); } diff --git a/src/execution/operator/scan/physical_chunk_scan.cpp b/src/execution/operator/scan/physical_chunk_scan.cpp index a853ca2fd97..3092a128697 100644 --- a/src/execution/operator/scan/physical_chunk_scan.cpp +++ b/src/execution/operator/scan/physical_chunk_scan.cpp @@ -23,9 +23,7 @@ void PhysicalChunkScan::GetChunkInternal(ClientContext &context, DataChunk &chun return; } auto &collection_chunk = *collection->chunks[state->chunk_index]; - for (index_t i = 0; i < chunk.column_count(); i++) { - chunk.data[i].Reference(collection_chunk.data[i]); - } + chunk.Reference(collection_chunk); state->chunk_index++; } diff --git a/src/include/duckdb/common/types/data_chunk.hpp b/src/include/duckdb/common/types/data_chunk.hpp index 81ca046abab..b4d4291030e 100644 --- a/src/include/duckdb/common/types/data_chunk.hpp +++ b/src/include/duckdb/common/types/data_chunk.hpp @@ -60,6 +60,9 @@ class DataChunk : public VectorCardinality { Value GetValue(index_t col_idx, index_t index) const; void SetValue(index_t col_idx, index_t index, Value val); + //! Set the DataChunk to reference another data chunk + void Reference(DataChunk &chunk); + //! Initializes the DataChunk with the specified types to an empty DataChunk //! This will create one vector of the specified type for each TypeId in the //! types list. The vector will be referencing vector to the data owned by diff --git a/src/include/duckdb/storage/numeric_segment.hpp b/src/include/duckdb/storage/numeric_segment.hpp index 0a3b55ea030..336921493e3 100644 --- a/src/include/duckdb/storage/numeric_segment.hpp +++ b/src/include/duckdb/storage/numeric_segment.hpp @@ -45,7 +45,7 @@ class NumericSegment : public UncompressedSegment { typedef void (*update_function_t)(SegmentStatistics &stats, UpdateInfo *info, data_ptr_t base_data, Vector &update); typedef void (*update_info_fetch_function_t)(Transaction &transaction, UpdateInfo *info, Vector &result); typedef void (*update_info_append_function_t)(Transaction &transaction, UpdateInfo *info, index_t idx, - Vector &result); + Vector &result, index_t result_idx); typedef void (*rollback_update_function_t)(UpdateInfo *info, data_ptr_t base_data); typedef void (*merge_update_function_t)(SegmentStatistics &stats, UpdateInfo *node, data_ptr_t target, Vector &update, row_t *ids, index_t vector_offset); diff --git a/src/storage/numeric_segment.cpp b/src/storage/numeric_segment.cpp index a44ecc33938..ceb50973953 100644 --- a/src/storage/numeric_segment.cpp +++ b/src/storage/numeric_segment.cpp @@ -90,7 +90,7 @@ void NumericSegment::FetchRow(ColumnFetchState &state, Transaction &transaction, if (versions && versions[vector_index]) { // version information: follow the version chain to find out if we need to load this tuple data from any other // version - append_from_update_info(transaction, versions[vector_index], id_in_vector, result); + append_from_update_info(transaction, versions[vector_index], id_in_vector, result, result_idx); } } @@ -409,7 +409,7 @@ static NumericSegment::update_info_fetch_function_t GetUpdateInfoFetchFunction(T // Update Append //===--------------------------------------------------------------------===// template -static void update_info_append(Transaction &transaction, UpdateInfo *info, index_t row_id, Vector &result) { +static void update_info_append(Transaction &transaction, UpdateInfo *info, index_t row_id, Vector &result, index_t result_idx) { auto result_data = (T *)result.GetData(); UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) { auto info_data = (T *)current->tuple_data; @@ -417,8 +417,8 @@ static void update_info_append(Transaction &transaction, UpdateInfo *info, index for (index_t i = 0; i < current->N; i++) { if (current->tuples[i] == row_id) { // found the relevant tuple - result_data[result.size()] = info_data[i]; - result.nullmask[result.size()] = current->nullmask[current->tuples[i]]; + result_data[result_idx] = info_data[i]; + result.nullmask[result_idx] = current->nullmask[current->tuples[i]]; break; } else if (current->tuples[i] > row_id) { // tuples are sorted: so if the current tuple is > row_id we will not find it anymore