Skip to content

Commit

Permalink
1. Add MemTracker In Sort/Scan/Exchange/Corss Join Node (apache#90)
Browse files Browse the repository at this point in the history
2. Opt the mem usage of TOPN node
  • Loading branch information
HappenLee committed Aug 10, 2021
1 parent 259728a commit c538edd
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 169 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
// TODO(zhaochun):
// RETURN_IF_ERROR(state->CheckQueryState());
bool eos = true;
bool eos = false;
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/aggregate_functions/aggregate_function_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

#pragma once

// #include <IO/VarInt.h>
// #include <IO/WriteHelpers.h>
#include <array>

#include "common/logging.h"
#include <vec/aggregate_functions/aggregate_function.h>
Expand All @@ -27,8 +26,6 @@
#include <vec/data_types/data_type_number.h>
#include <vec/io/io_helper.h>

#include <array>

namespace doris::vectorized {

struct AggregateFunctionCountData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/aggregate_functions/aggregate_function_distinct.h"

#include <algorithm>
#include <boost/algorithm/string.hpp>

#include "boost/algorithm/string.hpp"
#include "vec/aggregate_functions/aggregate_function_combinator.h"
#include "vec/aggregate_functions/aggregate_function_distinct.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/aggregate_functions/helpers.h"
#include "vec/common/typeid_cast.h"
#include "vec/data_types/data_type_nullable.h"
// #include "registerAggregateFunctions.h"

namespace doris::vectorized {
namespace ErrorCodes {
Expand Down Expand Up @@ -89,7 +87,6 @@ void register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFact
auto nested_function = factory.get(nested_function_name, transform_arguments, params);
return function_combinator->transform_aggregate_function(nested_function, types, params);
};
factory.registerDistinctFunctionCombinator(creator, DISTINCT_FUNCTION_PREFIX);
// factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorDistinct>());
factory.register_distinct_function_combinator(creator, DISTINCT_FUNCTION_PREFIX);
}
} // namespace doris::vectorized
5 changes: 0 additions & 5 deletions be/src/vec/aggregate_functions/aggregate_function_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/key_holder_helpers.h"

// #include <Columns/ColumnArray.h>
// #include <DataTypes/DataTypeArray.h>
#include "vec/common/aggregation_common.h"
#include "vec/common/assert_cast.h"
#include "vec/common/field_visitors.h"
Expand Down Expand Up @@ -213,8 +210,6 @@ class AggregateFunctionDistinct
bool allocates_memory_in_arena() const override { return true; }

const char* get_header_file_path() const override { return __FILE__; }

// AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class AggregateFunctionSimpleFactory {
}
}

void registerDistinctFunctionCombinator(Creator creator, const std::string& prefix) {
void register_distinct_function_combinator(Creator creator, const std::string& prefix) {
std::vector<std::string> need_insert;
for (auto entity : aggregate_functions) {
std::string target_value = prefix + entity.first;
Expand Down
10 changes: 3 additions & 7 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,17 +682,13 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
}
filter_block_internal(block, filter, column_to_keep);
} else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
UInt64 ret = const_column->get_uint(0);
if (ret == 0) {
block->get_by_position(0).column = block->get_by_position(0).column->clone_empty();
} else if (ret == 1) {
bool ret = const_column->get_bool(0);
if (ret) {
for (size_t i = column_to_keep; i < block->columns(); ++i) {
block->erase(i);
}
} else {
std::stringstream ss;
ss << "invalid const value for filter block, value = " << ret;
return Status::InternalError(ss.str());
block->get_by_position(0).column = block->get_by_position(0).column->clone_empty();
}
} else {
const IColumn::Filter& filter =
Expand Down
27 changes: 0 additions & 27 deletions be/src/vec/core/sort_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit)
: block.safe_get_by_position(description[0].column_number).column.get();

IColumn::Permutation perm;
// if (needCollation(column, description[0]))
// {
// const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
// column_string.get_permutation_with_collation(*description[0].collator, reverse, limit, perm);
// }
// else
column->get_permutation(reverse, limit, description[0].nulls_direction, perm);

size_t columns = block.columns();
Expand All @@ -132,29 +126,8 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit)

if (limit >= size) limit = 0;

bool need_collation = false;
ColumnsWithSortDescriptions columns_with_sort_desc =
get_columns_with_sort_description(block, description);

// for (size_t i = 0, num_sort_columns = description.size(); i < num_sort_columns; ++i)
// {
// if (needCollation(columns_with_sort_desc[i].first, description[i]))
// {
// need_collation = true;
// break;
// }
// }

// if (need_collation)
// {
// PartialSortingLessWithCollation less_with_collation(columns_with_sort_desc);
//
// if (limit)
// std::partial_sort(perm.begin(), perm.begin() + limit, perm.end(), less_with_collation);
// else
// pdqsort(perm.begin(), perm.end(), less_with_collation);
// }
// else
{
PartialSortingLess less(columns_with_sort_desc);

Expand Down
118 changes: 44 additions & 74 deletions be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ struct SortCursorImpl {
size_t pos = 0;
size_t rows = 0;

/** Determines order if comparing columns are equal.
* Order is determined by number of cursor.
*
* Cursor number (always?) equals to number of merging part.
* Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
*/
size_t order = 0;

using NeedCollationFlags = std::vector<UInt8>;

/** Should we use Collator to sort a column? */
Expand All @@ -59,18 +51,16 @@ struct SortCursorImpl {

SortCursorImpl() = default;

SortCursorImpl(const Block& block, const SortDescription& desc_, size_t order_ = 0)
SortCursorImpl(const Block& block, const SortDescription& desc_)
: desc(desc_),
sort_columns_size(desc.size()),
order(order_),
need_collation(desc.size()) {
reset(block);
}

SortCursorImpl(const Columns& columns, const SortDescription& desc_, size_t order_ = 0)
SortCursorImpl(const Columns& columns, const SortDescription& desc_)
: desc(desc_),
sort_columns_size(desc.size()),
order(order_),
need_collation(desc.size()) {
for (auto& column_desc : desc) {
if (!column_desc.column_name.empty()) {
Expand Down Expand Up @@ -101,9 +91,6 @@ struct SortCursorImpl {
? block.get_position_by_name(column_desc.column_name)
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());

// need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String)
// has_collation |= need_collation[j];
}

pos = 0;
Expand Down Expand Up @@ -175,84 +162,67 @@ struct SortCursor {
const SortCursorImpl* operator->() const { return impl; }

/// The specified row of this cursor is greater than the specified row of another cursor.
bool greater_at(const SortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const {
int8_t greater_at(const SortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const {
for (size_t i = 0; i < impl->sort_columns_size; ++i) {
int direction = impl->desc[i].direction;
int nulls_direction = impl->desc[i].nulls_direction;
int res = direction * impl->sort_columns[i]->compare_at(lhs_pos, rhs_pos,
*(rhs.impl->sort_columns[i]),
nulls_direction);
if (res > 0) return true;
if (res < 0) return false;
if (res > 0) return 1;
if (res < 0) return -1;
}
return impl->order > rhs.impl->order;
return 0;
}

/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
bool totally_less_or_equals(const SortCursor& rhs) const {
bool totally_less(const SortCursor& rhs) const {
if (impl->rows == 0 || rhs.impl->rows == 0) return false;

/// The last row of this cursor is no larger than the first row of the another cursor.
return !greater_at(rhs, impl->rows - 1, 0);
return greater_at(rhs, impl->rows - 1, 0) == -1;
}

bool greater(const SortCursor& rhs) const { return greater_at(rhs, impl->pos, rhs.impl->pos); }
bool greater(const SortCursor& rhs) const { return greater_at(rhs, impl->pos, rhs.impl->pos) > 0; }

/// Inverted so that the priority queue elements are removed in ascending order.
bool operator<(const SortCursor& rhs) const { return greater(rhs); }
};

/// Separate comparator for locale-sensitive string comparisons
//struct SortCursorWithCollation
//{
// SortCursorImpl * impl;
//
// SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {}
// SortCursorImpl * operator-> () { return impl; }
// const SortCursorImpl * operator-> () const { return impl; }
//
// bool greater_at(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
// {
// for (size_t i = 0; i < impl->sort_columns_size; ++i)
// {
// int direction = impl->desc[i].direction;
// int nulls_direction = impl->desc[i].nulls_direction;
// int res;
// if (impl->need_collation[i])
// {
// const ColumnString & column_string = assert_cast<const ColumnString &>(*impl->sort_columns[i]);
// res = column_string.compare_at_with_collation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator);
// }
// else
// res = impl->sort_columns[i]->compare_at(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
//
// res *= direction;
// if (res > 0)
// return true;
// if (res < 0)
// return false;
// }
// return impl->order > rhs.impl->order;
// }
//
// bool totally_less_or_equals(const SortCursorWithCollation & rhs) const
// {
// if (impl->rows == 0 || rhs.impl->rows == 0)
// return false;
//
// /// The last row of this cursor is no larger than the first row of the another cursor.
// return !greater_at(rhs, impl->rows - 1, 0);
// }
//
// bool greater(const SortCursorWithCollation & rhs) const
// {
// return greater_at(rhs, impl->pos, rhs.impl->pos);
// }
//
// bool operator< (const SortCursorWithCollation & rhs) const
// {
// return greater(rhs);
// }
//};
/// For easy copying.
struct SortBlockCursor {
SortCursorImpl* impl;

SortBlockCursor(SortCursorImpl* impl_) : impl(impl_) {}
SortCursorImpl* operator->() { return impl; }
const SortCursorImpl* operator->() const { return impl; }

// ~SortBlockCursor() { delete impl;}

/// The specified row of this cursor is greater than the specified row of another cursor.
int8_t less_at(const SortBlockCursor& rhs, int rows) const {
for (size_t i = 0; i < impl->sort_columns_size; ++i) {
int direction = impl->desc[i].direction;
int nulls_direction = impl->desc[i].nulls_direction;
int res = direction * impl->sort_columns[i]->compare_at(rows, rhs->rows - 1,
*(rhs.impl->sort_columns[i]),
nulls_direction);
if (res < 0) return 1;
if (res > 0) return -1;
}
return 0;
}

/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
bool totally_greater(const SortBlockCursor& rhs) const {
if (impl->rows == 0 || rhs.impl->rows == 0) return false;

/// The last row of this cursor is no larger than the first row of the another cursor.
return less_at(rhs, 0) == -1;
}

/// Inverted so that the priority queue elements are removed in ascending order.
bool operator<(const SortBlockCursor& rhs) const { return less_at(rhs, impl->rows - 1) >= 0; }
};

} // namespace doris::vectorized
5 changes: 0 additions & 5 deletions be/src/vec/data_types/data_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,8 @@ class IDataType : private boost::noncopyable {
virtual size_t serialize(const IColumn& column, PColumn* pcolumn) const = 0;
virtual void deserialize(const PColumn& pcolumn, IColumn* column) const = 0;

// static String getFileNameForStream(const String & column_name, const SubstreamPath & path);

private:
friend class DataTypeFactory;
/** Customize this DataType
*/
// void setCustomization(DataTypeCustomDescPtr custom_desc_) const;

private:
/** This is mutable to allow setting custom name and serialization on `const IDataType` post construction.
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/data_types/data_type_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c
}

size_t DataTypeNullable::serialize(const IColumn& column, PColumn* pcolumn) const {
const ColumnNullable& col =
assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get());
auto ptr = column.convert_to_full_column_if_const();
const ColumnNullable& col = assert_cast<const ColumnNullable&>(*ptr.get());
pcolumn->mutable_is_null()->Reserve(column.size());

for (size_t i = 0; i < column.size(); ++i) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/exec/vblocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ Status VBlockingJoinNode::open(RuntimeState* state) {

if (state->resource_pool()->try_acquire_thread_token()) {
add_runtime_exec_option("Join Build-Side Prepared Asynchronously");
// Thread build_thread(_node_name, "build thread",
// bind(&VBlockingJoinNode::BuildSideThread, this, state, &build_side_status));
// if (!state->cgroup().empty()) {
// RETURN_IF_ERROR(
// state->exec_env()->cgroups_mgr()->assign_thread_to_cgroup(
// build_thread, state->cgroup()));
// }
boost::thread(boost::bind(&VBlockingJoinNode::build_side_thread, this, state, &build_side_status));
} else {
build_side_status.set_value(construct_build_side(state));
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/vblocking_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class VBlockingJoinNode : public doris::ExecNode {
return Status::NotSupported("Not Implemented VBlocking Join Node::get_next scalar");
}

// Subclasses should close any other structures and then call
// VBlockingJoinNode::close().
virtual Status close(RuntimeState *state);

private:
Expand Down

0 comments on commit c538edd

Please sign in to comment.