Skip to content

Commit

Permalink
Fix several other issues with setting incorrect cardinality
Browse files Browse the repository at this point in the history
  • Loading branch information
Mytherin committed Feb 17, 2020
1 parent 8bb6938 commit 9ef76ec
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 45 deletions.
10 changes: 9 additions & 1 deletion src/common/types/data_chunk.cpp
Expand Up @@ -49,14 +49,22 @@ void DataChunk::SetValue(index_t col_idx, index_t index, Value val) {
data[col_idx].SetValue(sel_vector ? sel_vector[index] : index, move(val));
}

void DataChunk::Reference(DataChunk &chunk) {
assert(chunk.column_count() == column_count());
SetCardinality(chunk);
for (index_t i = 0; i < column_count(); i++) {
data[i].Reference(chunk.data[i]);
}
}

void DataChunk::Copy(DataChunk &other, index_t offset) {
assert(column_count() == other.column_count());
assert(other.size() == 0 && !other.sel_vector);

for (index_t i = 0; i < column_count(); i++) {
VectorOperations::Copy(data[i], other.data[i], offset);
}
other.SetCardinality(size());
other.SetCardinality(size() - offset);
}

void DataChunk::Append(DataChunk &other) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/vector_operations/gather.cpp
Expand Up @@ -97,7 +97,7 @@ template <class LOOP, class OP> static void generic_gather_loop(Vector &source,
}

void VectorOperations::Gather::Set(Vector &source, Vector &dest, bool set_null, index_t offset) {
assert(source.SameCardinality(dest));
assert(source.size() == dest.size());
if (set_null) {
generic_gather_loop<GatherLoopSetNull, PickLeft>(source, dest, offset);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/aggregate_hashtable.cpp
Expand Up @@ -229,7 +229,7 @@ void SuperLargeHashTable::FetchAggregates(DataChunk &groups, DataChunk &result)
for (index_t i = 0; i < result.column_count(); i++) {
assert(result.data[i].type == payload_types[i]);
}
result.SetCardinality(groups.size(), groups.sel_vector);
result.SetCardinality(groups);
if (groups.size() == 0) {
return;
}
Expand Down
21 changes: 13 additions & 8 deletions src/execution/join_hashtable.cpp
Expand Up @@ -161,13 +161,15 @@ void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) {
// for the correlated mark join we need to keep track of COUNT(*) and COUNT(COLUMN) for each of the correlated
// columns push into the aggregate hash table
assert(info.correlated_counts);
info.group_chunk.SetCardinality(keys);
for (index_t i = 0; i < info.correlated_types.size(); i++) {
info.group_chunk.data[i].Reference(keys.data[i]);
}
info.payload_chunk.data[0].Reference(keys.data[info.correlated_types.size()]);
info.payload_chunk.data[1].Reference(keys.data[info.correlated_types.size()]);
info.payload_chunk.data[0].type = info.payload_chunk.data[1].type = TypeId::INT64;
info.payload_chunk.sel_vector = info.group_chunk.sel_vector = info.group_chunk.data[0].sel_vector();
info.payload_chunk.SetCardinality(keys);
for(index_t i = 0; i < 2; i++) {
info.payload_chunk.data[i].Reference(keys.data[info.correlated_types.size()]);
info.payload_chunk.data[i].type = TypeId::INT64;
}
info.correlated_counts->AddChunk(info.group_chunk, info.payload_chunk);
}
sel_t not_null_sel_vector[STANDARD_VECTOR_SIZE];
Expand Down Expand Up @@ -428,6 +430,8 @@ void ScanStructure::Next(DataChunk &keys, DataChunk &left, DataChunk &result) {

index_t ScanStructure::ResolvePredicates(DataChunk &keys, sel_t comparison_result[]) {
FlatVector current_pointers;
current_pointers.SetCount(pointers.size());
current_pointers.SetSelVector(pointers.sel_vector());
current_pointers.Reference(pointers);

index_t comparison_count;
Expand Down Expand Up @@ -553,11 +557,11 @@ void ScanStructure::NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &r
// matches were found
// construct the result
build_pointer_vector.SetCount(result_count);
result.SetCardinality(result_count, result.owned_sel_vector);
// reference the columns of the left side from the result
for (index_t i = 0; i < left.column_count(); i++) {
result.data[i].Reference(left.data[i]);
}
result.SetCardinality(result_count, result.owned_sel_vector);
// now fetch the right side data from the HT
for (index_t i = 0; i < ht.build_types.size(); i++) {
auto &vector = result.data[left.column_count() + i];
Expand Down Expand Up @@ -690,21 +694,22 @@ void ScanStructure::NextMarkJoin(DataChunk &keys, DataChunk &input, DataChunk &r
// there are correlated columns
// first we fetch the counts from the aggregate hashtable corresponding to these entries
assert(keys.column_count() == info.group_chunk.column_count() + 1);
info.group_chunk.SetCardinality(keys);
for (index_t i = 0; i < info.group_chunk.column_count(); i++) {
info.group_chunk.data[i].Reference(keys.data[i]);
}
info.group_chunk.sel_vector = keys.sel_vector;
info.correlated_counts->FetchAggregates(info.group_chunk, info.result_chunk);
assert(!info.result_chunk.sel_vector);

// for the initial set of columns we just reference the left side
result.SetCardinality(input);
for (index_t i = 0; i < input.column_count(); i++) {
result.data[i].Reference(input.data[i]);
}
// create the result matching vector
auto &result_vector = result.data.back();
// first set the nullmask based on whether or not there were NULL values in the join key
result_vector.nullmask = keys.data[keys.column_count() - 1].nullmask;
result_vector.nullmask = keys.data.back().nullmask;

auto bool_result = (bool *)result_vector.GetData();
auto count_star = (int64_t *)info.result_chunk.data[0].GetData();
Expand All @@ -722,7 +727,6 @@ void ScanStructure::NextMarkJoin(DataChunk &keys, DataChunk &input, DataChunk &r
result_vector.nullmask[i] = false;
}
}
result.SetCardinality(input.size());
}
finished = true;
}
Expand Down Expand Up @@ -818,6 +822,7 @@ void ScanStructure::NextSingleJoin(DataChunk &keys, DataChunk &input, DataChunk
VectorOperations::Gather::Set(build_pointer_vector, vector);
VectorOperations::AddInPlace(build_pointer_vector, GetTypeIdSize(ht.build_types[i]));
}
result.SetCardinality(input);
// like the SEMI, ANTI and MARK join types, the SINGLE join only ever does one pass over the HT per input chunk
finished = true;
}
Expand Down
11 changes: 6 additions & 5 deletions src/execution/operator/aggregate/physical_hash_aggregate.cpp
Expand Up @@ -103,19 +103,20 @@ void PhysicalHashAggregate::GetChunkInternal(ClientContext &context, DataChunk &
// special case hack to sort out aggregating from empty intermediates
// for aggregations without groups
if (elements_found == 0 && state->tuples_scanned == 0 && is_implicit_aggr) {
assert(state->aggregate_chunk.column_count() == aggregates.size());
assert(chunk.column_count() == aggregates.size());
// for each column in the aggregates, set to initial state
state->aggregate_chunk.SetCardinality(1);
for (index_t i = 0; i < state->aggregate_chunk.column_count(); i++) {
chunk.SetCardinality(1);
for (index_t i = 0; i < chunk.column_count(); i++) {
assert(aggregates[i]->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
auto &aggr = (BoundAggregateExpression &)*aggregates[i];
auto aggr_state = unique_ptr<data_t[]>(new data_t[aggr.function.state_size(aggr.return_type)]);
aggr.function.initialize(aggr_state.get(), aggr.return_type);

Vector state_vector(state->aggregate_chunk, Value::POINTER((uintptr_t)aggr_state.get()));
aggr.function.finalize(state_vector, state->aggregate_chunk.data[i]);
Vector state_vector(chunk, Value::POINTER((uintptr_t)aggr_state.get()));
aggr.function.finalize(state_vector, chunk.data[i]);
}
state->finished = true;
return;
}
if (elements_found == 0 && !state->finished) {
state->finished = true;
Expand Down
2 changes: 2 additions & 0 deletions src/execution/operator/aggregate/physical_window.cpp
Expand Up @@ -507,6 +507,8 @@ void PhysicalWindow::GetChunkInternal(ClientContext &context, DataChunk &chunk,
auto &wind_ch = window_results.GetChunk(state->position);

index_t out_idx = 0;
assert(proj_ch.size() == wind_ch.size());
chunk.SetCardinality(proj_ch);
for (index_t col_idx = 0; col_idx < proj_ch.column_count(); col_idx++) {
chunk.data[out_idx++].Reference(proj_ch.data[col_idx]);
}
Expand Down
16 changes: 4 additions & 12 deletions src/execution/operator/helper/physical_limit.cpp
Expand Up @@ -34,17 +34,11 @@ void PhysicalLimit::GetChunkInternal(ClientContext &context, DataChunk &chunk, P
index_t start_position = offset - state->current_offset;
index_t chunk_count = min(limit, state->child_chunk.size() - start_position);

// first reference all the columns and set up the counts
// set up a slice of the input chunks
chunk.SetCardinality(chunk_count, state->child_chunk.sel_vector);
for (index_t i = 0; i < chunk.column_count(); i++) {
chunk.data[i].Reference(state->child_chunk.data[i]);
chunk.data[i].Slice(state->child_chunk.data[i], start_position);
}
// now set up the selection vector of the chunk
for (index_t idx = 0; idx < chunk_count; idx++) {
chunk.owned_sel_vector[idx] = state->child_chunk.sel_vector
? state->child_chunk.sel_vector[start_position + idx]
: start_position + idx;
}
chunk.SetCardinality(chunk_count, chunk.owned_sel_vector);
}
} else {
// have to copy either the entire chunk or part of it
Expand All @@ -57,9 +51,7 @@ void PhysicalLimit::GetChunkInternal(ClientContext &context, DataChunk &chunk, P
chunk_count = state->child_chunk.size();
}
// instead of copying we just change the pointer in the current chunk
for (index_t i = 0; i < chunk.column_count(); i++) {
chunk.data[i].Reference(state->child_chunk.data[i]);
}
chunk.Reference(state->child_chunk);
chunk.SetCardinality(chunk_count, state->child_chunk.sel_vector);
}

Expand Down
11 changes: 3 additions & 8 deletions src/execution/operator/join/physical_hash_join.cpp
Expand Up @@ -68,10 +68,10 @@ void PhysicalHashJoin::BuildHashTable(ClientContext &context, PhysicalOperatorSt
if (right_projection_map.size() > 0) {
// there is a projection map: fill the build chunk with the projected columns
build_chunk.Reset();
build_chunk.SetCardinality(right_chunk);
for (index_t i = 0; i < right_projection_map.size(); i++) {
build_chunk.data[i].Reference(right_chunk.data[right_projection_map[i]]);
}
build_chunk.sel_vector = right_chunk.sel_vector;
hash_table->Build(state->join_keys, build_chunk);
} else {
// there is not a projected map: place the entire right chunk in the HT
Expand Down Expand Up @@ -108,9 +108,7 @@ void PhysicalHashJoin::ProbeHashTable(ClientContext &context, DataChunk &chunk,
// anti join with empty hash table, NOP join
// return the input
assert(chunk.column_count() == state->child_chunk.column_count());
for (index_t i = 0; i < chunk.column_count(); i++) {
chunk.data[i].Reference(state->child_chunk.data[i]);
}
chunk.Reference(state->child_chunk);
return;
} else if (hash_table->join_type == JoinType::MARK) {
// MARK join with empty hash table
Expand Down Expand Up @@ -183,10 +181,7 @@ void PhysicalHashJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk
if (chunk.size() == 0) {
if (state->cached_chunk.size() > 0) {
// finished probing but cached data remains, return cached chunk
for (index_t col_idx = 0; col_idx < chunk.column_count(); col_idx++) {
chunk.data[col_idx].Reference(state->cached_chunk.data[col_idx]);
}
chunk.sel_vector = state->cached_chunk.sel_vector;
chunk.Reference(state->cached_chunk);
state->cached_chunk.Reset();
}
return;
Expand Down
3 changes: 3 additions & 0 deletions src/execution/operator/join/physical_piecewise_merge_join.cpp
Expand Up @@ -85,6 +85,8 @@ void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataCh
// resolve the join keys for the right chunk
state->join_keys.Reset();
state->rhs_executor.SetChunk(chunk_to_order);

state->join_keys.SetCardinality(chunk_to_order);
for (index_t k = 0; k < conditions.size(); k++) {
// resolve the join key
state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]);
Expand Down Expand Up @@ -115,6 +117,7 @@ void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataCh
// resolve the join keys for the left chunk
state->join_keys.Reset();
state->lhs_executor.SetChunk(state->child_chunk);
state->join_keys.SetCardinality(state->child_chunk);
for (index_t k = 0; k < conditions.size(); k++) {
state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]);
// sort by join key
Expand Down
Expand Up @@ -164,6 +164,7 @@ void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chu
break;
}
// cast the columns of the chunk to varchar
cast_chunk.SetCardinality(state->child_chunk);
for (index_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) {
if (sql_types[col_idx].id == SQLTypeId::VARCHAR) {
// VARCHAR, just create a reference
Expand Down
3 changes: 2 additions & 1 deletion src/execution/operator/persistent/physical_update.cpp
Expand Up @@ -39,6 +39,7 @@ void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk,
// update data in the base table
// the row ids are given to us as the last column of the child chunk
auto &row_ids = state->child_chunk.data[state->child_chunk.column_count() - 1];
update_chunk.SetCardinality(state->child_chunk);
for (index_t i = 0; i < expressions.size(); i++) {
if (expressions[i]->type == ExpressionType::VALUE_DEFAULT) {
// default expression, set to the default value of the column
Expand All @@ -50,11 +51,11 @@ void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk,
update_chunk.data[i].Reference(state->child_chunk.data[binding.index]);
}
}
update_chunk.sel_vector = state->child_chunk.sel_vector;

if (is_index_update) {
// index update, perform a delete and an append instead
table.Delete(tableref, context, row_ids);
mock_chunk.SetCardinality(update_chunk);
for (index_t i = 0; i < columns.size(); i++) {
mock_chunk.data[columns[i]].Reference(update_chunk.data[i]);
}
Expand Down
4 changes: 1 addition & 3 deletions src/execution/operator/scan/physical_chunk_scan.cpp
Expand Up @@ -23,9 +23,7 @@ void PhysicalChunkScan::GetChunkInternal(ClientContext &context, DataChunk &chun
return;
}
auto &collection_chunk = *collection->chunks[state->chunk_index];
for (index_t i = 0; i < chunk.column_count(); i++) {
chunk.data[i].Reference(collection_chunk.data[i]);
}
chunk.Reference(collection_chunk);
state->chunk_index++;
}

Expand Down
3 changes: 3 additions & 0 deletions src/include/duckdb/common/types/data_chunk.hpp
Expand Up @@ -60,6 +60,9 @@ class DataChunk : public VectorCardinality {
Value GetValue(index_t col_idx, index_t index) const;
void SetValue(index_t col_idx, index_t index, Value val);

//! Set the DataChunk to reference another data chunk
void Reference(DataChunk &chunk);

//! Initializes the DataChunk with the specified types to an empty DataChunk
//! This will create one vector of the specified type for each TypeId in the
//! types list. The vector will be referencing vector to the data owned by
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/storage/numeric_segment.hpp
Expand Up @@ -45,7 +45,7 @@ class NumericSegment : public UncompressedSegment {
typedef void (*update_function_t)(SegmentStatistics &stats, UpdateInfo *info, data_ptr_t base_data, Vector &update);
typedef void (*update_info_fetch_function_t)(Transaction &transaction, UpdateInfo *info, Vector &result);
typedef void (*update_info_append_function_t)(Transaction &transaction, UpdateInfo *info, index_t idx,
Vector &result);
Vector &result, index_t result_idx);
typedef void (*rollback_update_function_t)(UpdateInfo *info, data_ptr_t base_data);
typedef void (*merge_update_function_t)(SegmentStatistics &stats, UpdateInfo *node, data_ptr_t target,
Vector &update, row_t *ids, index_t vector_offset);
Expand Down
8 changes: 4 additions & 4 deletions src/storage/numeric_segment.cpp
Expand Up @@ -90,7 +90,7 @@ void NumericSegment::FetchRow(ColumnFetchState &state, Transaction &transaction,
if (versions && versions[vector_index]) {
// version information: follow the version chain to find out if we need to load this tuple data from any other
// version
append_from_update_info(transaction, versions[vector_index], id_in_vector, result);
append_from_update_info(transaction, versions[vector_index], id_in_vector, result, result_idx);
}
}

Expand Down Expand Up @@ -409,16 +409,16 @@ static NumericSegment::update_info_fetch_function_t GetUpdateInfoFetchFunction(T
// Update Append
//===--------------------------------------------------------------------===//
template <class T>
static void update_info_append(Transaction &transaction, UpdateInfo *info, index_t row_id, Vector &result) {
static void update_info_append(Transaction &transaction, UpdateInfo *info, index_t row_id, Vector &result, index_t result_idx) {
auto result_data = (T *)result.GetData();
UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) {
auto info_data = (T *)current->tuple_data;
// loop over the tuples in this UpdateInfo
for (index_t i = 0; i < current->N; i++) {
if (current->tuples[i] == row_id) {
// found the relevant tuple
result_data[result.size()] = info_data[i];
result.nullmask[result.size()] = current->nullmask[current->tuples[i]];
result_data[result_idx] = info_data[i];
result.nullmask[result_idx] = current->nullmask[current->tuples[i]];
break;
} else if (current->tuples[i] > row_id) {
// tuples are sorted: so if the current tuple is > row_id we will not find it anymore
Expand Down

0 comments on commit 9ef76ec

Please sign in to comment.