Skip to content

Commit

Permalink
Internal duckdb#215: QUANTILE Frame Refactor
Browse files Browse the repository at this point in the history
Cleanup up QUANTILE a bit in preparation for composite frames.
This reduces the problem to generalising ReuseIndexes (future work).
  • Loading branch information
Richard Wesley committed Sep 14, 2023
1 parent e1969af commit 336e4c3
Showing 1 changed file with 46 additions and 40 deletions.
86 changes: 46 additions & 40 deletions src/core_functions/aggregate/holistic/quantile.cpp
Expand Up @@ -45,21 +45,25 @@ struct QuantileState {

// Windowed Quantile indirection
vector<idx_t> w;
idx_t pos;
idx_t count;

// Windowed MAD indirection
vector<idx_t> m;

QuantileState() : pos(0) {
QuantileState() : count(0) {
}

~QuantileState() {
}

inline void SetPos(size_t pos_p) {
pos = pos_p;
if (pos >= w.size()) {
w.resize(pos);
inline void SetCount(const FrameBounds *frames, idx_t nframes) {
count = 0;
for (idx_t f = 0; f < nframes; ++f) {
const auto &frame = frames[f];
count += frame.end - frame.start;
}
if (count >= w.size()) {
w.resize(count);
}
}
};
Expand All @@ -81,10 +85,12 @@ struct QuantileIncluded {
const ValidityMask &dmask;
};

void ReuseIndexes(idx_t *index, const FrameBounds &frame, const FrameBounds &prev) {
void ReuseIndexes(idx_t *index, const FrameBounds *frames, const FrameBounds *prevs, idx_t nframes) {
idx_t j = 0;

// Copy overlapping indices
const auto prev = prevs[0];
const auto frame = frames[0];
for (idx_t p = 0; p < (prev.end - prev.start); ++p) {
auto idx = index[p];

Expand Down Expand Up @@ -561,8 +567,8 @@ struct QuantileScalarOperation : public QuantileOperation {
QuantileIncluded included(fmask, dmask);

// Lazily initialise frame state
auto prev_pos = state.pos;
state.SetPos(frame.end - frame.start);
const auto prev_count = state.count;
state.SetCount(frames, nframes);

auto index = state.w.data();
D_ASSERT(index);
Expand All @@ -574,27 +580,27 @@ struct QuantileScalarOperation : public QuantileOperation {
const auto q = bind_data.quantiles[0];

bool replace = false;
if (frame.start == prev.start + 1 && frame.end == prev.end + 1) {
if (nframes == 1 && frame.start == prev.start + 1 && frame.end == prev.end + 1) {
// Fixed frame size
const auto j = ReplaceIndex(index, frame, prev);
// We can only replace if the number of NULLs has not changed
if (included.AllValid() || included(prev.start) == included(prev.end)) {
Interpolator<DISCRETE> interp(q, prev_pos, false);
Interpolator<DISCRETE> interp(q, prev_count, false);
replace = CanReplace(index, data, j, interp.FRN, interp.CRN, included);
if (replace) {
state.pos = prev_pos;
state.count = prev_count;
}
}
} else {
ReuseIndexes(index, frame, prev);
ReuseIndexes(index, frames, prevs, nframes);
}

if (!replace && !included.AllValid()) {
// Remove the NULLs
state.pos = std::partition(index, index + state.pos, included) - index;
state.count = std::partition(index, index + state.count, included) - index;
}
if (state.pos) {
Interpolator<DISCRETE> interp(q, state.pos, false);
if (state.count) {
Interpolator<DISCRETE> interp(q, state.count, false);

using ID = QuantileIndirect<INPUT_TYPE>;
ID indirect(data);
Expand Down Expand Up @@ -727,8 +733,8 @@ struct QuantileListOperation : public QuantileOperation {
auto rdata = FlatVector::GetData<CHILD_TYPE>(result);

// Lazily initialise frame state
auto prev_pos = state.pos;
state.SetPos(frame.end - frame.start);
const auto prev_count = state.count;
state.SetCount(frames, nframes);

auto index = state.w.data();

Expand All @@ -738,20 +744,20 @@ struct QuantileListOperation : public QuantileOperation {
// So if a replaced index in an IQR is located between Q25 and Q50, but has a value below Q25,
// then Q25 must be recomputed, but Q50 and Q75 are unaffected.
// For a single element list, this reduces to the scalar case.
std::pair<idx_t, idx_t> replaceable {state.pos, 0};
if (frame.start == prev.start + 1 && frame.end == prev.end + 1) {
std::pair<idx_t, idx_t> replaceable {state.count, 0};
if (nframes == 1 && frame.start == prev.start + 1 && frame.end == prev.end + 1) {
// Fixed frame size
const auto j = ReplaceIndex(index, frame, prev);
// We can only replace if the number of NULLs has not changed
if (included.AllValid() || included(prev.start) == included(prev.end)) {
for (const auto &q : bind_data.order) {
const auto &quantile = bind_data.quantiles[q];
Interpolator<DISCRETE> interp(quantile, prev_pos, false);
Interpolator<DISCRETE> interp(quantile, prev_count, false);
const auto replace = CanReplace(index, data, j, interp.FRN, interp.CRN, included);
if (replace < 0) {
// Replacement is before this quantile, so the rest will be replaceable too.
replaceable.first = MinValue(replaceable.first, interp.FRN);
replaceable.second = prev_pos;
replaceable.second = prev_count;
break;
} else if (replace > 0) {
// Replacement is after this quantile, so everything before it is replaceable too.
Expand All @@ -760,24 +766,24 @@ struct QuantileListOperation : public QuantileOperation {
}
}
if (replaceable.first < replaceable.second) {
state.pos = prev_pos;
state.count = prev_count;
}
}
} else {
ReuseIndexes(index, frame, prev);
ReuseIndexes(index, frames, prevs, nframes);
}

if (replaceable.first >= replaceable.second && !included.AllValid()) {
// Remove the NULLs
state.pos = std::partition(index, index + state.pos, included) - index;
state.count = std::partition(index, index + state.count, included) - index;
}

if (state.pos) {
if (state.count) {
using ID = QuantileIndirect<INPUT_TYPE>;
ID indirect(data);
for (const auto &q : bind_data.order) {
const auto &quantile = bind_data.quantiles[q];
Interpolator<DISCRETE> interp(quantile, state.pos, false);
Interpolator<DISCRETE> interp(quantile, state.count, false);
if (replaceable.first <= interp.FRN && interp.CRN <= replaceable.second) {
rdata[lentry.offset + q] = interp.template Replace<idx_t, CHILD_TYPE, ID>(index, result, indirect);
} else {
Expand Down Expand Up @@ -1075,15 +1081,15 @@ struct MedianAbsoluteDeviationOperation : public QuantileOperation {
QuantileIncluded included(fmask, dmask);

// Lazily initialise frame state
auto prev_pos = state.pos;
state.SetPos(frame.end - frame.start);
auto prev_count = state.count;
state.SetCount(frames, nframes);

auto index = state.w.data();
D_ASSERT(index);

// We need a second index for the second pass.
if (state.pos > state.m.size()) {
state.m.resize(state.pos);
if (state.count > state.m.size()) {
state.m.resize(state.count);
}

auto index2 = state.m.data();
Expand All @@ -1092,35 +1098,35 @@ struct MedianAbsoluteDeviationOperation : public QuantileOperation {
// The replacement trick does not work on the second index because if
// the median has changed, the previous order is not correct.
// It is probably close, however, and so reuse is helpful.
ReuseIndexes(index2, frame, prev);
std::partition(index2, index2 + state.pos, included);
ReuseIndexes(index2, frames, prevs, nframes);
std::partition(index2, index2 + state.count, included);

// Find the two positions needed for the median
const float q = 0.5;

bool replace = false;
if (frame.start == prev.start + 1 && frame.end == prev.end + 1) {
if (nframes == 1 && frame.start == prev.start + 1 && frame.end == prev.end + 1) {
// Fixed frame size
const auto j = ReplaceIndex(index, frame, prev);
// We can only replace if the number of NULLs has not changed
if (included.AllValid() || included(prev.start) == included(prev.end)) {
Interpolator<false> interp(q, prev_pos, false);
Interpolator<false> interp(q, prev_count, false);
replace = CanReplace(index, data, j, interp.FRN, interp.CRN, included);
if (replace) {
state.pos = prev_pos;
state.count = prev_count;
}
}
} else {
ReuseIndexes(index, frame, prev);
ReuseIndexes(index, frames, prevs, nframes);
}

if (!replace && !included.AllValid()) {
// Remove the NULLs
state.pos = std::partition(index, index + state.pos, included) - index;
state.count = std::partition(index, index + state.count, included) - index;
}

if (state.pos) {
Interpolator<false> interp(q, state.pos, false);
if (state.count) {
Interpolator<false> interp(q, state.count, false);

// Compute or replace median from the first index
using ID = QuantileIndirect<INPUT_TYPE>;
Expand Down

0 comments on commit 336e4c3

Please sign in to comment.