Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FINAL SAMPLE #7907

Merged
merged 6 commits into from Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 30 additions & 13 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Expand Up @@ -388,18 +388,18 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);

RelativeSize size_of_universum = 0;
DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
DataTypePtr sampling_column_type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;

if (typeid_cast<const DataTypeUInt64 *>(type.get()))
if (typeid_cast<const DataTypeUInt64 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt32 *>(type.get()))
else if (typeid_cast<const DataTypeUInt32 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt16 *>(type.get()))
else if (typeid_cast<const DataTypeUInt16 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt8 *>(type.get()))
else if (typeid_cast<const DataTypeUInt8 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
else
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.",
throw Exception("Invalid sampling column type in storage parameters: " + sampling_column_type->getName() + ". Must be unsigned integer type.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);

if (settings.parallel_replicas_count > 1)
Expand Down Expand Up @@ -453,13 +453,25 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;

/// If sample and final are used together no need to calculate sampling expression twice.
/// The first time it was calculated for final, because sample key is a part of the PK.
/// So, assume that we already have calculated column.
ASTPtr sampling_key_ast = data.getSamplingKeyAST();
if (select.final())
{
sampling_key_ast = std::make_shared<ASTIdentifier>(data.sampling_expr_column_name);

/// We do spoil available_real_columns here, but it is not used later.
available_real_columns.emplace_back(data.sampling_expr_column_name, std::move(sampling_column_type));
}

if (has_lower_limit)
{
if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);

ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.getSamplingKeyAST());
args->children.push_back(sampling_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(lower));

lower_function = std::make_shared<ASTFunction>();
Expand All @@ -476,7 +488,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);

ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(data.getSamplingKeyAST());
args->children.push_back(sampling_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(upper));

upper_function = std::make_shared<ASTFunction>();
Expand All @@ -503,11 +515,16 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
auto syntax_result = SyntaxAnalyzer(context).analyze(query, available_real_columns);
filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);

/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
if (!select.final())
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
column_names_to_read.end());
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions dbms/tests/queries/0_stateless/01034_sample_final.reference
@@ -0,0 +1,8 @@
count
1000000
count final
666667
count sample
557632
count sample final
371758
14 changes: 14 additions & 0 deletions dbms/tests/queries/0_stateless/01034_sample_final.sql
@@ -0,0 +1,14 @@
drop table if exists sample_final;
create table sample_final (CounterID UInt32, EventDate Date, EventTime DateTime, UserID UInt64, Sign Int8) engine = CollapsingMergeTree(Sign) order by (CounterID, EventDate, intHash32(UserID), EventTime) sample by intHash32(UserID);
insert into sample_final select number / (8192 * 4), toDate('2019-01-01'), toDateTime('2019-01-01 00:00:01') + number, number / (8192 * 2), number % 3 = 1 ? -1 : 1 from numbers(1000000);

select 'count';
select count() from sample_final;
select 'count final';
select count() from sample_final final;
select 'count sample';
select count() from sample_final sample 1/2;
select 'count sample final';
select count() from sample_final final sample 1/2;

drop table if exists sample_final;