Skip to content

Commit

Permalink
Implement mergewith for bytesRange and bytesValue
Browse files Browse the repository at this point in the history
This commit is a follow up of #119
and introduces mergeWith implmentation for BytesRange and BytesValues.

TODO:
- (New PR) Handling case of merging multiple byte ranges which results in MultiRange as the output
  • Loading branch information
atanu1991 committed Oct 5, 2021
1 parent 743c221 commit 2a55e6c
Show file tree
Hide file tree
Showing 4 changed files with 599 additions and 77 deletions.
380 changes: 326 additions & 54 deletions velox/type/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,60 +472,6 @@ bool MultiRange::testBytesRange(
return false;
}

std::unique_ptr<Filter> MultiRange::mergeWith(const Filter* other) const {
switch (other->kind()) {
// Rules of MultiRange with IsNull/IsNotNull
// 1. MultiRange(nullAllowed=true) AND IS NULL => IS NULL
// 2. MultiRange(nullAllowed=true) AND IS NOT NULL =>
// MultiRange(nullAllowed=false)
// 3. MultiRange(nullAllowed=false) AND IS NULL
// => ALWAYS FALSE
// 4. MultiRange(nullAllowed=false) AND IS NOT NULL
// =>MultiRange(nullAllowed=false)
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return this->clone(/*nullAllowed=*/false);
case FilterKind::kDoubleRange:
case FilterKind::kFloatRange:
case FilterKind::kBytesRange:
case FilterKind::kBytesValues:
// TODO Implement
VELOX_UNREACHABLE();
case FilterKind::kMultiRange: {
const MultiRange* multiRangeOther = static_cast<const MultiRange*>(other);
bool bothNullAllowed = nullAllowed_ && other->testNull();
bool bothNanAllowed = nanAllowed_ && multiRangeOther->nanAllowed_;
std::vector<std::unique_ptr<Filter>> merged;
for (auto const& filter : this->filters()) {
for (auto const& filterOther : multiRangeOther->filters()) {
auto innerMerged = filter->mergeWith(filterOther.get());
switch (innerMerged->kind()) {
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
break;
default:
merged.push_back(std::move(innerMerged));
}
}
}

if (merged.empty()) {
return nullOrFalse(bothNullAllowed);
} else if (merged.size() == 1) {
return merged[0]->clone(bothNullAllowed);
} else {
return std::unique_ptr<Filter>(
new MultiRange(std::move(merged), bothNullAllowed, bothNanAllowed));
}
}
default:
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> IsNull::mergeWith(const Filter* other) const {
VELOX_CHECK(other->isDeterministic());

Expand Down Expand Up @@ -847,4 +793,330 @@ std::unique_ptr<Filter> BigintMultiRange::mergeWith(const Filter* other) const {
VELOX_UNREACHABLE();
}
}

namespace {
bool mergeUpperExclusive(
const std::string& left,
bool leftExclusive,
const std::string& right,
bool rightExclusive) {
int cmp = left.compare(right);
bool upperExclusive;
if (cmp < 0) {
upperExclusive = leftExclusive;
} else if (cmp == 0) {
upperExclusive = leftExclusive || rightExclusive;
} else {
upperExclusive = rightExclusive;
}
return upperExclusive;
}

bool mergeLowerExclusive(
const std::string& left,
bool leftExclusive,
const std::string& right,
bool rightExclusive) {
int cmp = left.compare(right);
bool lowerExclusive;
if (cmp < 0) {
lowerExclusive = rightExclusive;
} else if (cmp == 0) {
lowerExclusive = leftExclusive || rightExclusive;
} else {
lowerExclusive = leftExclusive;
}
return lowerExclusive;
}

} // namespace
std::unique_ptr<Filter> BytesRange::mergeWith(const Filter* other) const {
switch (other->kind()) {
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return std::make_unique<BytesRange>(*this, false);
case FilterKind::kBytesValues:
case FilterKind::kMultiRange:
return other->mergeWith(this);
case FilterKind::kBytesRange: {
bool bothNullAllowed = nullAllowed_ && other->testNull();

auto otherRange = static_cast<const BytesRange*>(other);

bool upperUnbounded = false, lowerUnbounded = false;
bool upperExclusive = false, lowerExclusive = false;
bool bothUpper = false, bothLower = false;
std::string upper = "", lower = "";

// TODO:
// Handle the case of same filter being upperUnbounded and
// lowerUnbounded at the same time. The output of this merge
// can result in multiple ranges.

assert(!(this->upperUnbounded_ && this->lowerUnbounded_));
assert(!(otherRange->upperUnbounded_ && otherRange->lowerUnbounded_));

if (this->upperUnbounded_ && otherRange->upperUnbounded_) {
bothUpper = true;
upperUnbounded = true;
lower = std::max(lower_, otherRange->lower_);
lowerExclusive = mergeLowerExclusive(
lower_,
lowerExclusive_,
otherRange->lower_,
otherRange->lowerExclusive_);
}
if (this->lowerUnbounded_ && otherRange->lowerUnbounded_) {
bothLower = true;
lowerUnbounded = true;
upper = std::min(upper_, otherRange->upper_);
upperExclusive = mergeUpperExclusive(
upper_,
upperExclusive_,
otherRange->upper_,
otherRange->upperExclusive_);
}
if (bothLower || bothUpper) {
return std::make_unique<BytesRange>(
lower,
lowerUnbounded,
lowerExclusive,
upper,
upperUnbounded,
upperExclusive,
bothNullAllowed);
}

bool thisInOther = otherRange->testBytesRange(
this->lowerUnbounded_ ? this->upper_ : this->lower_,
this->upperUnbounded_ ? this->lower_ : this->upper_,
bothNullAllowed);

bool otherInThis = this->testBytesRange(
otherRange->lowerUnbounded_ ? otherRange->upper_ : otherRange->lower_,
otherRange->upperUnbounded_ ? otherRange->lower_ : otherRange->upper_,
bothNullAllowed);

if (!thisInOther && !otherInThis) {
return nullOrFalse(bothNullAllowed);
}

if (this->upperUnbounded_) {
lower = std::max(
lower_, otherRange->lowerUnbounded_ ? lower_ : otherRange->lower_);
upper = otherRange->upper_;
upperExclusive = otherRange->upperExclusive_;
lowerExclusive = this->lowerExclusive_;
if (!otherRange->lowerUnbounded_) {
lowerExclusive = mergeLowerExclusive(
lower_,
lowerExclusive_,
otherRange->lower_,
otherRange->lowerExclusive_);
}
} else if (this->lowerUnbounded_) {
lower = otherRange->lower_;
upper = std::min(
upper_, otherRange->upperUnbounded_ ? upper_ : otherRange->upper_);
lowerExclusive = otherRange->lowerExclusive_;
upperExclusive = this->upperExclusive_;

if (!otherRange->upperUnbounded_) {
upperExclusive = mergeUpperExclusive(
upper_,
upperExclusive_,
otherRange->upper_,
otherRange->upperExclusive_);
}
} else if (otherRange->upperUnbounded_ || otherRange->lowerUnbounded_) {
return other->mergeWith(this);
} else {
lower = std::max(lower_, otherRange->lower_);
upper = std::min(upper_, otherRange->upper_);
lowerExclusive = mergeLowerExclusive(
lower_,
lowerExclusive_,
otherRange->lower_,
otherRange->lowerExclusive_);
upperExclusive = mergeUpperExclusive(
upper_,
upperExclusive_,
otherRange->upper_,
otherRange->upperExclusive_);
}

return std::make_unique<BytesRange>(
lower,
lowerUnbounded,
lowerExclusive,
upper,
upperUnbounded,
upperExclusive,
bothNullAllowed);
}

default:
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> BytesValues::mergeWith(const Filter* other) const {
switch (other->kind()) {
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
case FilterKind::kMultiRange:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return std::make_unique<BytesValues>(*this, false);
case FilterKind::kBytesValues: {
bool bothNullAllowed = nullAllowed_ && other->testNull();
auto otherBytesValues = static_cast<const BytesValues*>(other);

if (this->upper_.compare(otherBytesValues->lower_) < 0 ||
otherBytesValues->upper_.compare(this->lower_) < 0) {
return nullOrFalse(bothNullAllowed);
}
const BytesValues* smallerFilter = this;
const BytesValues* largerFilter = otherBytesValues;
if (this->values().size() > otherBytesValues->values().size()) {
smallerFilter = otherBytesValues;
largerFilter = this;
}

std::vector<std::string> newValues;
newValues.reserve(smallerFilter->values().size());

for (const auto& value : smallerFilter->values()) {
if (largerFilter->values_.contains(value)) {
newValues.emplace_back(value);
}
}

if (newValues.empty()) {
return nullOrFalse(bothNullAllowed);
}

return std::make_unique<BytesValues>(
std::move(newValues), bothNullAllowed);
}
case FilterKind::kBytesRange: {
auto otherBytesRange = static_cast<const BytesRange*>(other);
bool bothNullAllowed = nullAllowed_ && other->testNull();

if ((!otherBytesRange->isLowerUnbounded() &&
this->upper_.compare(otherBytesRange->lower()) < 0) ||
(!otherBytesRange->isUpperUnbounded() &&
this->lower_.compare(otherBytesRange->upper()) > 0)) {
return nullOrFalse(bothNullAllowed);
}

std::vector<std::string> newValues;
newValues.reserve(this->values().size());
for (const auto& value : this->values()) {
if (otherBytesRange->testBytes(value.data(), value.length())) {
newValues.emplace_back(value);
}
}

if (newValues.empty()) {
return nullOrFalse(bothNullAllowed);
}

return std::make_unique<BytesValues>(
std::move(newValues), bothNullAllowed);
}

default:
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> MultiRange::mergeWith(const Filter* other) const {
switch (other->kind()) {
// Rules of MultiRange with IsNull/IsNotNull
// 1. MultiRange(nullAllowed=true) AND IS NULL => IS NULL
// 2. MultiRange(nullAllowed=true) AND IS NOT NULL =>
// MultiRange(nullAllowed=false)
// 3. MultiRange(nullAllowed=false) AND IS NULL
// => ALWAYS FALSE
// 4. MultiRange(nullAllowed=false) AND IS NOT NULL
// =>MultiRange(nullAllowed=false)
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return this->clone(/*nullAllowed=*/false);
case FilterKind::kDoubleRange:
case FilterKind::kFloatRange:
// TODO: Implement
VELOX_UNREACHABLE();
case FilterKind::kBytesValues:
case FilterKind::kBytesRange:
case FilterKind::kMultiRange: {
bool bothNullAllowed = nullAllowed_ && other->testNull();
bool bothNanAllowed = nanAllowed_;

std::vector<const Filter*> otherFilters;
std::vector<std::unique_ptr<Filter>> merged;
std::vector<std::string> byteValues;

if (other->kind() == FilterKind::kMultiRange) {
auto multiRangeOther = static_cast<const MultiRange*>(other);
for (auto const& filterOther : multiRangeOther->filters()) {
otherFilters.emplace_back(filterOther.get());
}
bothNanAllowed = bothNanAllowed && multiRangeOther->nanAllowed();
} else {
otherFilters.emplace_back(other);
}

merged.reserve(this->filters().size() + otherFilters.size());

for (auto const& filter : this->filters()) {
for (auto const& filterOther : otherFilters) {
auto innerMerged = filter->mergeWith(filterOther);

switch (innerMerged->kind()) {
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
continue;
case FilterKind::kBytesValues: {
auto mergedBytesValues =
static_cast<const BytesValues*>(innerMerged.get());
byteValues.reserve(
byteValues.size() + mergedBytesValues->values().size());
for (const auto& value : mergedBytesValues->values()) {
byteValues.emplace_back(value);
}
}
default:
merged.emplace_back(innerMerged.release());
}
}
}

if (!byteValues.empty()) {
merged.emplace_back(std::make_unique<BytesValues>(
std::move(byteValues), bothNullAllowed));
}

if (merged.empty()) {
return nullOrFalse(bothNullAllowed);
} else if (merged.size() == 1) {
return merged.front()->clone(bothNullAllowed);
} else {
return std::make_unique<MultiRange>(
std::move(merged), bothNullAllowed, bothNanAllowed);
}
}
default:
VELOX_UNREACHABLE();
}
}

} // namespace facebook::velox::common
Loading

0 comments on commit 2a55e6c

Please sign in to comment.