Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIMIT BY clause was implemented #293

Merged
merged 14 commits into from
Dec 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 41 additions & 0 deletions dbms/include/DB/DataStreams/LimitByBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <DB/DataStreams/IProfilingBlockInputStream.h>

#include <DB/Common/HashTable/HashMap.h>
#include <DB/Common/SipHash.h>
#include <DB/Common/UInt128.h>

namespace DB
{

/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup".
*
* For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7),
* the query SELECT Num FROM T LIMIT 2 BY Num
* will give you the following result: (Num: 1 1 3 3 4 4 5 7 7).
*/
class LimitByBlockInputStream : public IProfilingBlockInputStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment with detailed description.

{
public:
LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_);

String getName() const override { return "LimitBy"; }

String getID() const override;

protected:
Block readImpl() override;

private:
ConstColumnPlainPtrs getKeyColumns(Block & block) const;

private:
using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment.


const Names columns_names;
const size_t group_size;
MapHashed keys_counts;
};

}
2 changes: 2 additions & 0 deletions dbms/include/DB/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class ExpressionAnalyzer : private boost::noncopyable
/// Удалить из ORDER BY повторяющиеся элементы.
void optimizeOrderBy();

void optimizeLimitBy();

/// remove Function_if AST if condition is constant
void optimizeIfWithConstantCondition();
void optimizeIfWithConstantConditionImpl(ASTPtr & current_ast, Aliases & aliases) const;
Expand Down
1 change: 1 addition & 0 deletions dbms/include/DB/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class InterpreterSelectQuery : public IInterpreter
void executeMergeSorted();
void executePreLimit();
void executeUnion();
void executeLimitBy();
void executeLimit();
void executeProjection(ExpressionActionsPtr expression);
void executeDistinct(bool before_order, Names columns);
Expand Down
2 changes: 2 additions & 0 deletions dbms/include/DB/Parsers/ASTSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ASTSelectQuery : public ASTQueryWithOutput
bool group_by_with_totals = false;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_value;
ASTPtr limit_by_expression_list;
ASTPtr limit_offset;
ASTPtr limit_length;
ASTPtr settings;
Expand Down
83 changes: 83 additions & 0 deletions dbms/src/DataStreams/LimitByBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include <DB/DataStreams/LimitByBlockInputStream.h>

namespace DB
{

LimitByBlockInputStream::LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_)
: columns_names(columns_)
, group_size(group_size_)
{
children.push_back(input_);
}

String LimitByBlockInputStream::getID() const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of method getID - to allow merging common elements of few execution pipelines. See glueBlockInputStreams. So, method getID must return different values for different calculations. In this case, it is missing names and limit value.

It is experimental, not used in production. Despite of this, better to implement method getID correctly.

{
std::stringstream res;
res << "LimitBy(" << this << ")";
return res.str();
}

Block LimitByBlockInputStream::readImpl()
{
/// Execute until end of stream or until
/// a block with some new records will be gotten.
while (true)
{
Block block = children[0]->read();
if (!block)
return Block();

const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block));
const size_t rows = block.rows();
IColumn::Filter filter(rows);
size_t inserted_count = 0;

for (size_t i = 0; i < rows; ++i)
{
UInt128 key;
SipHash hash;

for (auto & column : column_ptrs)
column->updateHashWithValue(i, hash);

hash.get128(key.first, key.second);

if (keys_counts[key]++ < group_size)
{
inserted_count++;
filter[i] = 1;
}
else
filter[i] = 0;
}

/// Just go to the next block if there isn't any new records in the current one.
if (!inserted_count)
continue;

size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, inserted_count);

return block;
}
}

ConstColumnPlainPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will bump in profiler in case of very small blocks. Ok for now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woo, it is really nice feature for me. Thanks guys!

{
ConstColumnPlainPtrs column_ptrs;
column_ptrs.reserve(columns_names.size());

for (const auto & name : columns_names)
{
auto & column = block.getByName(name).column;

/// Ignore all constant columns.
if (!column->isConst())
column_ptrs.emplace_back(column.get());
}

return column_ptrs;
}

}
28 changes: 28 additions & 0 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ void ExpressionAnalyzer::init()
/// Удалить из ORDER BY повторяющиеся элементы.
optimizeOrderBy();

// Remove duplicated elements from LIMIT BY clause.
optimizeLimitBy();

/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();

Expand Down Expand Up @@ -1169,6 +1172,31 @@ void ExpressionAnalyzer::optimizeOrderBy()
}


void ExpressionAnalyzer::optimizeLimitBy()
{
if (!(select_query && select_query->limit_by_expression_list))
return;

std::set<String> elems_set;

ASTs & elems = select_query->limit_by_expression_list->children;
ASTs unique_elems;
unique_elems.reserve(elems.size());

for (const auto & elem : elems)
{
if (const auto id = typeid_cast<const ASTIdentifier*>(elem.get()))
{
if (elems_set.emplace(id->getColumnName()).second)
unique_elems.emplace_back(elem);
}
}

if (unique_elems.size() < elems.size())
elems = unique_elems;
}


void ExpressionAnalyzer::makeSetsForIndex()
{
if (storage && ast && storage->supportsIndexForIn())
Expand Down
82 changes: 61 additions & 21 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/LimitByBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
Expand Down Expand Up @@ -190,19 +191,25 @@ void InterpreterSelectQuery::initQueryAnalyzer()

InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_first_select_inside_union_all(query.isUnionAllHead()),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input_);
}

InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, ASTPtr query_ptr_, const Context & context_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(QueryProcessingStage::Complete), subquery_depth(0),
is_first_select_inside_union_all(false), only_analyze(true),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(QueryProcessingStage::Complete)
, subquery_depth(0)
, is_first_select_inside_union_all(false), only_analyze(true)
, log(&Logger::get("InterpreterSelectQuery"))
{
init({});
}
Expand All @@ -217,10 +224,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_,
const Names & required_column_names_,
const NamesAndTypesList & table_column_names_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_), table_column_names(table_column_names_),
is_first_select_inside_union_all(query.isUnionAllHead()),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, table_column_names(table_column_names_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input_, required_column_names_);
}
Expand Down Expand Up @@ -305,7 +316,7 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St
DataTypes InterpreterSelectQuery::getReturnTypes()
{
DataTypes res;
NamesAndTypesList columns = query_analyzer->getSelectSampleBlock().getColumnsList();
const NamesAndTypesList & columns = query_analyzer->getSelectSampleBlock().getColumnsList();
for (auto & column : columns)
res.push_back(column.type);

Expand Down Expand Up @@ -553,8 +564,7 @@ void InterpreterSelectQuery::executeSingleQuery()
* но есть ORDER или LIMIT,
* то выполним предварительную сортировку и LIMIT на удалёном сервере.
*/
if (!second_stage
&& !need_aggregate && !has_having)
if (!second_stage && !need_aggregate && !has_having)
{
if (has_order_by)
executeOrder();
Expand Down Expand Up @@ -619,21 +629,28 @@ void InterpreterSelectQuery::executeSingleQuery()
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
if (query.limit_length && hasMoreThanOneStream() && !query.distinct)
if (query.limit_length && hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list)
executePreLimit();

if (need_second_distinct_pass)
union_within_single_query = true;

/// To execute LIMIT BY we should merge all streams together.
if (query.limit_by_expression_list && hasMoreThanOneStream())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should merge all streams together or condition streams.size() == 1 on line 645 will be false and executeLimit won't be called.

union_within_single_query = true;

if (union_within_single_query || stream_with_non_joined_data)
executeUnion();

if (streams.size() == 1)
{
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(false, Names());

executeLimitBy();
executeLimit();
}
}
Expand Down Expand Up @@ -770,7 +787,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);

/** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
/** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено),
* а также установим количество потоков в 1.
*/
Expand All @@ -780,6 +797,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
&& !query.group_expression_list
&& !query.having_expression
&& !query.order_expression_list
&& !query.limit_by_expression_list
&& query.limit_length
&& !query_analyzer->hasAggregation()
&& limit_length + limit_offset < settings.max_block_size)
Expand Down Expand Up @@ -1024,9 +1042,9 @@ static SortDescription getSortDescription(ASTSelectQuery & query)

static size_t getLimitForSorting(ASTSelectQuery & query)
{
/// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку.
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
size_t limit = 0;
if (!query.distinct)
if (!query.distinct && !query.limit_by_expression_list)
{
size_t limit_length = 0;
size_t limit_offset = 0;
Expand Down Expand Up @@ -1156,7 +1174,7 @@ void InterpreterSelectQuery::executePreLimit()
{
transformStreams([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, 0);
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, false);
});

if (hasMoreThanOneStream())
Expand All @@ -1165,6 +1183,28 @@ void InterpreterSelectQuery::executePreLimit()
}


void InterpreterSelectQuery::executeLimitBy()
{
if (!query.limit_by_value)
return;

Names columns;
size_t value = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_by_value).value);

for (const auto & elem : query.limit_by_expression_list->children)
{
columns.emplace_back(elem->getAliasOrColumnName());
}

transformStreams([&](auto & stream)
{
stream = std::make_shared<LimitByBlockInputStream>(
stream, value, columns
);
});
}


void InterpreterSelectQuery::executeLimit()
{
size_t limit_length = 0;
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Parsers/ASTSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ ASTPtr ASTSelectQuery::cloneImpl(bool traverse_union_all) const
CLONE(group_expression_list)
CLONE(having_expression)
CLONE(order_expression_list)
CLONE(limit_by_value)
CLONE(limit_by_expression_list)
CLONE(limit_offset)
CLONE(limit_length)
CLONE(settings)
Expand Down Expand Up @@ -287,6 +289,14 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
: typeid_cast<const ASTExpressionList &>(*order_expression_list).formatImplMultiline(s, state, frame);
}

if (limit_by_value)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT BY " << (s.hilite ? hilite_none : "");
s.one_line
? limit_by_expression_list->formatImpl(s, state, frame)
: typeid_cast<const ASTExpressionList &>(*limit_by_expression_list).formatImplMultiline(s, state, frame);
}

if (limit_length)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : "");
Expand Down