Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix quantilesGK bug #58216

Merged
merged 1 commit into from Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 54 additions & 36 deletions src/AggregateFunctions/AggregateFunctionQuantileGK.cpp
Expand Up @@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
Expand All @@ -30,12 +31,12 @@ class ApproxSampler
public:
struct Stats
{
T value; // the sampled value
Int64 g; // the minimum rank jump from the previous value's minimum rank
Int64 delta; // the maximum span of the rank
T value; // The sampled value
Int64 g; // The minimum rank jump from the previous value's minimum rank
Int64 delta; // The maximum span of the rank

Stats() = default;
Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) {}
Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { }
};

struct QueryResult
Expand All @@ -49,20 +50,20 @@ class ApproxSampler

ApproxSampler() = default;

explicit ApproxSampler(
double relative_error_,
size_t compress_threshold_ = default_compress_threshold,
size_t count_ = 0,
bool compressed_ = false)
: relative_error(relative_error_)
, compress_threshold(compress_threshold_)
, count(count_)
, compressed(compressed_)
ApproxSampler(const ApproxSampler & other)
: relative_error(other.relative_error)
, compress_threshold(other.compress_threshold)
, count(other.count)
, compressed(other.compressed)
, sampled(other.sampled.begin(), other.sampled.end())
, backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end())
, head_sampled(other.head_sampled.begin(), other.head_sampled.end())
{
sampled.reserve(compress_threshold);
backup_sampled.reserve(compress_threshold);
}

head_sampled.reserve(default_head_size);
explicit ApproxSampler(double relative_error_)
: relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false)
{
}

bool isCompressed() const { return compressed; }
Expand Down Expand Up @@ -95,9 +96,9 @@ class ApproxSampler
Int64 current_max = std::numeric_limits<Int64>::min();
for (const auto & stats : sampled)
current_max = std::max(stats.delta + stats.g, current_max);
Int64 target_error = current_max/2;
Int64 target_error = current_max / 2;

size_t index= 0;
size_t index = 0;
auto min_rank = sampled[0].g;
for (size_t i = 0; i < size; ++i)
{
Expand All @@ -118,7 +119,6 @@ class ApproxSampler
result[indices[i]] = res.value;
}
}

}

void compress()
Expand Down Expand Up @@ -256,16 +256,27 @@ class ApproxSampler
void read(ReadBuffer & buf)
{
readBinaryLittleEndian(compress_threshold, buf);
if (compress_threshold != default_compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"The compress threshold {} isn't the expected one {}",
compress_threshold,
default_compress_threshold);

readBinaryLittleEndian(relative_error, buf);
readBinaryLittleEndian(count, buf);

size_t sampled_len = 0;
readBinaryLittleEndian(sampled_len, buf);
if (sampled_len > compress_threshold)
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
throw Exception(
ErrorCodes::INCORRECT_DATA, "The number of elements {} for quantileGK exceeds {}", sampled_len, compress_threshold);

sampled.resize(sampled_len);

for (size_t i = 0; i < sampled_len; ++i)
{
auto stats = sampled[i];
auto & stats = sampled[i];
readBinaryLittleEndian(stats.value, buf);
readBinaryLittleEndian(stats.g, buf);
readBinaryLittleEndian(stats.delta, buf);
Expand All @@ -291,7 +302,7 @@ class ApproxSampler
min_rank += curr_sample.g;
}
}
return {sampled.size()-1, 0, sampled.back().value};
return {sampled.size() - 1, 0, sampled.back().value};
}

void withHeadBufferInserted()
Expand Down Expand Up @@ -389,12 +400,11 @@ class ApproxSampler

double relative_error;
size_t compress_threshold;
size_t count = 0;
size_t count;
bool compressed;

PaddedPODArray<Stats> sampled;
PaddedPODArray<Stats> backup_sampled;

PaddedPODArray<T> head_sampled;

static constexpr size_t default_compress_threshold = 10000;
Expand All @@ -406,17 +416,14 @@ class QuantileGK
{
private:
using Data = ApproxSampler<Value>;
mutable Data data;
Data data;

public:
QuantileGK() = default;

explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { }

void add(const Value & x)
{
data.insert(x);
}
void add(const Value & x) { data.insert(x); }

template <typename Weight>
void add(const Value &, const Weight &)
Expand All @@ -429,22 +436,34 @@ class QuantileGK
if (!data.isCompressed())
data.compress();

data.merge(rhs.data);
if (rhs.data.isCompressed())
data.merge(rhs.data);
else
{
/// We can't modify rhs, so copy it and compress
Data rhs_data_copy(rhs.data);
rhs_data_copy.compress();
data.merge(rhs_data_copy);
}
}

void serialize(WriteBuffer & buf) const
{
/// Always compress before serialization
if (!data.isCompressed())
data.compress();

data.write(buf);
if (data.isCompressed())
data.write(buf);
else
{
/// We can't modify rhs, so copy it and compress
Data data_copy(data);
data_copy.compress();
data_copy.write(buf);
}
}

void deserialize(ReadBuffer & buf)
{
data.read(buf);

/// Serialized data is always compressed
data.setCompressed();
}

Expand Down Expand Up @@ -481,7 +500,6 @@ class QuantileGK
}
};


template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>;
template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>;

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/GatherFunctionQuantileVisitor.cpp
Expand Up @@ -30,6 +30,7 @@ static const std::unordered_map<String, String> quantile_fuse_name_mapping =
{"quantileTDigestWeighted", "quantilesTDigestWeighted"},
{"quantileTiming", "quantilesTiming"},
{"quantileTimingWeighted", "quantilesTimingWeighted"},
{"quantileGK", "quantilesGK"},
};

String GatherFunctionQuantileData::toFusedNameOrSelf(const String & func_name)
Expand Down
14 changes: 14 additions & 0 deletions tests/queries/0_stateless/02661_quantile_approx.reference
Expand Up @@ -19,6 +19,20 @@ select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(numbe
[99,199,249,313,776]
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
[100,200,250,314,777]
SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);
24902
SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);
[24902,44518,49999]
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select quantileGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
Expand Down
13 changes: 13 additions & 0 deletions tests/queries/0_stateless/02661_quantile_approx.sql
Expand Up @@ -15,6 +15,19 @@ select quantilesGK(100, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number
select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);

SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);

SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);

select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
Expand Down