Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartialMergeJoin bugfixes #6982

Merged
merged 5 commits into from Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbms/src/Columns/ColumnNullable.cpp
Expand Up @@ -168,6 +168,12 @@ void ColumnNullable::insertRangeFromNotNullable(const IColumn & src, size_t star
getNullMapData().resize_fill(getNullMapData().size() + length, 0);
}

void ColumnNullable::insertManyFromNotNullable(const IColumn & src, size_t position, size_t length)
{
for (size_t i = 0; i < length; ++i)
insertFromNotNullable(src, position);
}

void ColumnNullable::popBack(size_t n)
{
getNestedColumn().popBack(n);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnNullable.h
Expand Up @@ -63,6 +63,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>

void insertFromNotNullable(const IColumn & src, size_t n);
void insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length);
void insertManyFromNotNullable(const IColumn & src, size_t position, size_t length);

void insertDefault() override
{
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Columns/IColumn.h
Expand Up @@ -146,6 +146,13 @@ class IColumn : public COW<IColumn>
/// Could be used to concatenate columns.
virtual void insertRangeFrom(const IColumn & src, size_t start, size_t length) = 0;

/// Appends one element from other column with the same type multiple times.
virtual void insertManyFrom(const IColumn & src, size_t position, size_t length)
{
for (size_t i = 0; i < length; ++i)
insertFrom(src, position);
}

/// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented).
/// Is used to optimize some computations (in aggregation, for example).
/// Parameter length could be ignored if column values have fixed size.
Expand All @@ -157,6 +164,13 @@ class IColumn : public COW<IColumn>
/// For example, ColumnNullable(Nested) absolutely ignores values of nested column if it is marked as NULL.
virtual void insertDefault() = 0;

/// Appends "default value" multiple times.
virtual void insertManyDefaults(size_t length)
{
for (size_t i = 0; i < length; ++i)
insertDefault();
}

/** Removes last n elements.
* Is used to support exception-safety of several operations.
* For example, sometimes insertion should be reverted if we catch an exception during operation processing.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Core/Settings.h
Expand Up @@ -288,7 +288,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).") \
M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join if possible.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Interpreters/AnalyzedJoin.cpp
Expand Up @@ -262,7 +262,10 @@ NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpressio

JoinPtr makeJoin(std::shared_ptr<AnalyzedJoin> table_join, const Block & right_sample_block)
{
if (table_join->partial_merge_join)
bool is_left_or_inner = isLeft(table_join->kind()) || isInner(table_join->kind());
bool is_asof = (table_join->strictness() == ASTTableJoin::Strictness::Asof);

if (table_join->partial_merge_join && !is_asof && is_left_or_inner)
return std::make_shared<MergeJoin>(table_join, right_sample_block);
return std::make_shared<Join>(table_join, right_sample_block);
}
Expand Down
24 changes: 1 addition & 23 deletions dbms/src/Interpreters/Join.cpp
Expand Up @@ -959,29 +959,7 @@ void Join::joinBlock(Block & block)

void Join::joinTotals(Block & block) const
{
Block totals_without_keys = totals;

if (totals_without_keys)
{
for (const auto & name : key_names_right)
totals_without_keys.erase(totals_without_keys.getPositionByName(name));

for (size_t i = 0; i < totals_without_keys.columns(); ++i)
block.insert(totals_without_keys.safeGetByPosition(i));
}
else
{
/// We will join empty `totals` - from one row with the default values.

for (size_t i = 0; i < sample_block_with_columns_to_add.columns(); ++i)
{
const auto & col = sample_block_with_columns_to_add.getByPosition(i);
block.insert({
col.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst(),
col.type,
col.name});
}
}
JoinCommon::joinTotals(totals, sample_block_with_columns_to_add, key_names_right, block);
}


Expand Down
143 changes: 112 additions & 31 deletions dbms/src/Interpreters/MergeJoin.cpp
Expand Up @@ -17,6 +17,51 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}

namespace
{

template <bool has_nulls>
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos)
{
static constexpr int null_direction_hint = 1;

if constexpr (has_nulls)
{
auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);

if (left_nullable && right_nullable)
{
int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
if (res)
return res;

/// NULL != NULL case
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
}

if (left_nullable && !right_nullable)
{
if (left_column.isNullAt(lhs_pos))
return null_direction_hint;
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}

if (!left_nullable && right_nullable)
{
if (right_column.isNullAt(rhs_pos))
return -null_direction_hint;
return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint);
}
}

/// !left_nullable && !right_nullable
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
}

}

struct MergeJoinEqualRange
{
size_t left_start = 0;
Expand All @@ -42,45 +87,40 @@ class MergeJoinCursor
bool atEnd() const { return impl.pos >= impl.rows; }
void nextN(size_t num) { impl.pos += num; }

int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
void setCompareNullability(const MergeJoinCursor & rhs)
{
int res = 0;
has_nullable_columns = false;

for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
res = impl.sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl.sort_columns[i]), 1);
if (res)
bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]);
bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]);

if (is_left_nullable || is_right_nullable)
{
has_nullable_columns = true;
break;
}
}
return res;
}

bool sameNext(size_t lhs_pos) const
Range getNextEqualRange(MergeJoinCursor & rhs)
{
if (lhs_pos + 1 >= impl.rows)
return false;

for (size_t i = 0; i < impl.sort_columns_size; ++i)
if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0)
return false;
return true;
if (has_nullable_columns)
return getNextEqualRangeImpl<true>(rhs);
return getNextEqualRangeImpl<false>(rhs);
}

size_t getEqualLength()
{
if (atEnd())
return 0;

size_t pos = impl.pos;
while (sameNext(pos))
++pos;
return pos - impl.pos + 1;
}
private:
SortCursorImpl impl;
bool has_nullable_columns = false;

Range getNextEqualRange(MergeJoinCursor & rhs)
template <bool has_nulls>
Range getNextEqualRangeImpl(MergeJoinCursor & rhs)
{
while (!atEnd() && !rhs.atEnd())
{
int cmp = compareAt(rhs, impl.pos, rhs.impl.pos);
int cmp = compareAt<has_nulls>(rhs, impl.pos, rhs.impl.pos);
if (cmp < 0)
impl.next();
if (cmp > 0)
Expand All @@ -97,8 +137,43 @@ class MergeJoinCursor
return Range{impl.pos, rhs.impl.pos, 0, 0};
}

private:
SortCursorImpl impl;
template <bool has_nulls>
int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{
int res = 0;
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
auto * left_column = impl.sort_columns[i];
auto * right_column = rhs.impl.sort_columns[i];

res = nullableCompareAt<has_nulls>(*left_column, *right_column, lhs_pos, rhs_pos);
if (res)
break;
}
return res;
}

size_t getEqualLength()
{
if (atEnd())
return 0;

size_t pos = impl.pos;
while (sameNext(pos))
++pos;
return pos - impl.pos + 1;
}

bool sameNext(size_t lhs_pos) const
{
if (lhs_pos + 1 >= impl.rows)
return false;

for (size_t i = 0; i < impl.sort_columns_size; ++i)
if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0)
return false;
return true;
}
};

namespace
Expand Down Expand Up @@ -151,9 +226,9 @@ void copyRightRange(const Block & right_block, const Block & right_columns_to_ad
auto * dst_nullable = typeid_cast<ColumnNullable *>(dst_column.get());

if (dst_nullable && !isColumnNullable(*src_column))
dst_nullable->insertRangeFromNotNullable(*src_column, row_position, rows_to_add);
dst_nullable->insertManyFromNotNullable(*src_column, row_position, rows_to_add);
else
dst_column->insertRangeFrom(*src_column, row_position, rows_to_add);
dst_column->insertManyFrom(*src_column, row_position, rows_to_add);
}
}

Expand All @@ -179,8 +254,7 @@ void joinEquals(const Block & left_block, const Block & right_block, const Block
void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
for (size_t i = 0; i < rows_to_add; ++i)
column->insertDefault();
column->insertManyDefaults(rows_to_add);
}

void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
Expand Down Expand Up @@ -232,6 +306,11 @@ void MergeJoin::setTotals(const Block & totals_block)
mergeRightBlocks();
}

void MergeJoin::joinTotals(Block & block) const
{
JoinCommon::joinTotals(totals, right_columns_to_add, table_join->keyNamesRight(), block);
}

void MergeJoin::mergeRightBlocks()
{
const size_t max_merged_block_size = 128 * 1024 * 1024;
Expand Down Expand Up @@ -319,6 +398,7 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
left_cursor.setCompareNullability(right_cursor);

while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
Expand Down Expand Up @@ -351,6 +431,7 @@ void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_bloc
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
left_cursor.setCompareNullability(right_cursor);

while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/MergeJoin.h
Expand Up @@ -22,7 +22,7 @@ class MergeJoin : public IJoin

bool addJoinedBlock(const Block & block) override;
void joinBlock(Block &) override;
void joinTotals(Block &) const override {}
void joinTotals(Block &) const override;
void setTotals(const Block &) override;
size_t getTotalRowCount() const override { return right_blocks_row_count; }

Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Interpreters/join_common.cpp
Expand Up @@ -122,5 +122,30 @@ void createMissedColumns(Block & block)
}
}

void joinTotals(const Block & totals, const Block & columns_to_add, const Names & key_names_right, Block & block)
{
if (Block totals_without_keys = totals)
{
for (const auto & name : key_names_right)
totals_without_keys.erase(totals_without_keys.getPositionByName(name));

for (size_t i = 0; i < totals_without_keys.columns(); ++i)
block.insert(totals_without_keys.safeGetByPosition(i));
}
else
{
/// We will join empty `totals` - from one row with the default values.

for (size_t i = 0; i < columns_to_add.columns(); ++i)
{
const auto & col = columns_to_add.getByPosition(i);
block.insert({
col.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst(),
col.type,
col.name});
}
}
}

}
}
1 change: 1 addition & 0 deletions dbms/src/Interpreters/join_common.h
Expand Up @@ -26,6 +26,7 @@ ColumnRawPtrs extractKeysForJoin(const Names & key_names_right, const Block & ri
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right, const Names & key_names_right);

void createMissedColumns(Block & block);
void joinTotals(const Block & totals, const Block & columns_to_add, const Names & key_names_right, Block & block);

}

Expand Down