Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WITH FILL clarification and cleanup #48395

Merged
merged 7 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 29 additions & 4 deletions src/Interpreters/FillingRow.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Interpreters/FillingRow.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <IO/Operators.h>


namespace DB
Expand Down Expand Up @@ -44,21 +45,27 @@ bool FillingRow::operator==(const FillingRow & other) const
return true;
}

bool FillingRow::operator>=(const FillingRow & other) const
{
return !(*this < other);
}

bool FillingRow::next(const FillingRow & to_row)
{
const size_t row_size = size();
size_t pos = 0;

/// Find position we need to increment for generating next row.
for (size_t s = size(); pos < s; ++pos)
for (; pos < row_size; ++pos)
if (!row[pos].isNull() && !to_row.row[pos].isNull() && !equals(row[pos], to_row.row[pos]))
break;

if (pos == size() || less(to_row.row[pos], row[pos], getDirection(pos)))
if (pos == row_size || less(to_row.row[pos], row[pos], getDirection(pos)))
return false;

/// If we have any 'fill_to' value at position greater than 'pos',
/// we need to generate rows up to 'fill_to' value.
for (size_t i = size() - 1; i > pos; --i)
for (size_t i = row_size - 1; i > pos; --i)
{
if (getFillDescription(i).fill_to.isNull() || row[i].isNull())
continue;
Expand All @@ -84,7 +91,7 @@ bool FillingRow::next(const FillingRow & to_row)
{
bool is_less = false;
size_t i = pos + 1;
for (; i < size(); ++i)
for (; i < row_size; ++i)
{
const auto & fill_from = getFillDescription(i).fill_from;
if (!fill_from.isNull())
Expand All @@ -107,4 +114,22 @@ void FillingRow::initFromDefaults(size_t from_pos)
row[i] = getFillDescription(i).fill_from;
}

String FillingRow::dump() const
{
WriteBufferFromOwnString out;
for (size_t i = 0; i < row.size(); ++i)
{
if (i != 0)
out << ", ";
out << row[i].dump();
}
return out.str();
}

WriteBuffer & operator<<(WriteBuffer & out, const FillingRow & row)
{
out << row.dump();
return out;
}

}
7 changes: 5 additions & 2 deletions src/Interpreters/FillingRow.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once
#include <Core/SortDescription.h>
#include <Core/InterpolateDescription.h>
#include <Columns/IColumn.h>


namespace DB
Expand Down Expand Up @@ -30,13 +28,18 @@ class FillingRow
size_t size() const { return row.size(); }
bool operator<(const FillingRow & other) const;
bool operator==(const FillingRow & other) const;
bool operator>=(const FillingRow & other) const;

int getDirection(size_t index) const { return sort_description[index].direction; }
FillColumnDescription & getFillDescription(size_t index) { return sort_description[index].fill_description; }

String dump() const;

private:
Row row;
SortDescription sort_description;
};

WriteBuffer & operator<<(WriteBuffer & out, const FillingRow & row);

}
183 changes: 125 additions & 58 deletions src/Processors/Transforms/FillingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,29 @@
#include <Functions/FunctionDateOrDateTimeAddInterval.h>
#include <Common/FieldVisitorSum.h>
#include <Common/FieldVisitorToString.h>
#include <Common/logger_useful.h>


namespace DB
{

constexpr bool debug_logging_enabled = false;

template <typename T>
void logDebug(String key, const T & value, const char * separator = " : ")
{
if constexpr (debug_logging_enabled)
{
WriteBufferFromOwnString ss;
if constexpr (std::is_pointer_v<T>)
ss << *value;
else
ss << value;

LOG_DEBUG(&Poco::Logger::get("FillingTransform"), "{}{}{}", key, separator, ss.str());
}
}

namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
Expand Down Expand Up @@ -233,27 +251,24 @@ FillingTransform::FillingTransform(
interpolate_column_positions.push_back(header_.getPositionByName(name));
}

/// prepare() is overrididen to call transform() after all chunks are processed
/// it can be necessary for suffix generation in case of WITH FILL .. TO is provided
IProcessor::Status FillingTransform::prepare()
{
if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
if (input.isFinished() && !output.isFinished() && !has_input && !all_chunks_processed)
{
should_insert_first = next_row < filling_row || first;
logDebug("prepare()", "all chunks processed");
all_chunks_processed = true;

for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;

if (first || filling_row < next_row)
/// push output data to output port if we can
if (has_output && output.canPush())
{
/// Output if has data.
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
}

generate_suffix = true;
return Status::Ready;
output.pushData(std::move(output_data));
has_output = false;
}

/// return Ready to call transform() for generating filling rows after latest chunk was processed
return Status::Ready;
}

return ISimpleTransform::prepare();
Expand Down Expand Up @@ -316,8 +331,10 @@ static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, c
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
{
for (auto * interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
}

for (auto * other_column : other_columns)
other_column->insertDefault();
Expand Down Expand Up @@ -368,62 +385,107 @@ void FillingTransform::initColumns(
initColumnsByPositions(non_const_columns, input_other_columns, output_columns, output_other_columns, other_column_positions);
}

bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns)
{
logDebug("generateSuffixIfNeeded() filling_row", filling_row);
logDebug("generateSuffixIfNeeded() next_row", next_row);
logDebug("generateSuffixIfNeeded() first", first);

/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = next_row < filling_row || first;

for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;

logDebug("generateSuffixIfNeeded() next_row updated", next_row);

if (!first && filling_row >= next_row)
{
logDebug("generateSuffixIfNeeded()", "no need to generate suffix");
return false;
}

Columns input_fill_columns;
Columns input_interpolate_columns;
Columns input_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_other_columns;

initColumns(
input_columns,
input_fill_columns,
input_interpolate_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);

if (first)
filling_row.initFromDefaults();

Block interpolate_block;
if (should_insert_first && filling_row < next_row)
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}

while (filling_row.next(next_row))
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}

return true;
}

void FillingTransform::transform(Chunk & chunk)
{
if (!chunk.hasRows() && !generate_suffix)
logDebug("new chunk rows", chunk.getNumRows());
logDebug("all chunks processed", all_chunks_processed);

/// if got chunk with no rows and it's not for suffix generation, then just skip it
/// Note: ExpressionTransform can return chunk with no rows, see 02579_fill_empty_chunk.sql for example
if (!chunk.hasRows() && !all_chunks_processed)
return;

Columns old_fill_columns;
Columns old_interpolate_columns;
Columns old_other_columns;
Columns input_fill_columns;
Columns input_interpolate_columns;
Columns input_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_other_columns;
MutableColumns result_columns;

Block interpolate_block;

if (generate_suffix)
if (all_chunks_processed)
{
const auto & empty_columns = input.getHeader().getColumns();
initColumns(
empty_columns,
old_fill_columns,
old_interpolate_columns,
old_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);

if (first)
filling_row.initFromDefaults();

if (should_insert_first && filling_row < next_row)
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
chassert(!chunk.hasRows());

interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row))
/// if all chunks are processed, then we may need to generate suffix for the following cases:
/// (1) when all data are processed and WITH FILL .. TO is provided
/// (2) for empty result set when WITH FILL FROM .. TO is provided (see PR #30888)
if (generateSuffixIfNeeded(input.getHeader().getColumns(), result_columns))
{
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate(result_columns, interpolate_block);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
}

size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
return;
}

chassert(chunk.hasRows());

const size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns();
auto input_columns = chunk.detachColumns();
initColumns(
old_columns,
old_fill_columns,
old_interpolate_columns,
old_other_columns,
input_columns,
input_fill_columns,
input_interpolate_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
Expand All @@ -433,7 +495,7 @@ void FillingTransform::transform(Chunk & chunk)
{
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
auto current_value = (*old_fill_columns[i])[0];
auto current_value = (*input_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;

if (!fill_from.isNull() && !equals(current_value, fill_from))
Expand All @@ -453,18 +515,24 @@ void FillingTransform::transform(Chunk & chunk)

for (size_t row_ind = 0; row_ind < num_rows; ++row_ind)
{
should_insert_first = next_row < filling_row;
logDebug("row", row_ind);
logDebug("filling_row", filling_row);
logDebug("next_row", next_row);

bool should_insert_first = next_row < filling_row;
logDebug("should_insert_first", should_insert_first);

for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
auto current_value = (*input_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;

if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
logDebug("next_row updated", next_row);

/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
Expand All @@ -474,16 +542,15 @@ void FillingTransform::transform(Chunk & chunk)
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}

interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row))
{
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}

copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_interpolate_columns, old_interpolate_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
copyRowFromColumns(res_fill_columns, input_fill_columns, row_ind);
copyRowFromColumns(res_interpolate_columns, input_interpolate_columns, row_ind);
copyRowFromColumns(res_other_columns, input_other_columns, row_ind);
}

saveLastRow(result_columns);
Expand Down