Skip to content

Commit

Permalink
Implement mergewith for bytesRange and bytesValue
Browse files Browse the repository at this point in the history
This commit is a follow up of #119
and introduces mergeWith implmentation for BytesRange and BytesValues.

TODO:

- Handling case of merging multiple byte ranges which results in MultiRange as the output
- Merging MultiRange with MultiRange
  • Loading branch information
atanu1991 committed Oct 1, 2021
1 parent 45c25f5 commit a12933b
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 0 deletions.
268 changes: 268 additions & 0 deletions velox/type/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,4 +779,272 @@ std::unique_ptr<Filter> BigintMultiRange::mergeWith(const Filter* other) const {
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> BytesRange::mergeWith(const Filter* other) const {
switch (other->kind()) {
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return std::make_unique<BytesRange>(*this, false);
case FilterKind::kBytesValues:
case FilterKind::kMultiRange:
return other->mergeWith(this);
case FilterKind::kBytesRange: {
bool bothNullAllowed = nullAllowed_ && other->testNull();

auto otherRange = static_cast<const BytesRange*>(other);

bool upperUnbounded = false, lowerUnbounded = false;
bool upperExclusive = false, lowerExclusive = false;
bool bothUpper = false, bothLower = false;
std::string upper = "", lower = "";

if (this->upperUnbounded_ && otherRange->upperUnbounded_) {
bothUpper = true;
upperUnbounded = true;
lowerExclusive = otherRange->lowerExclusive_;
int cmp = lower_.compare(otherRange->lower_);
if (cmp > 0) {
lowerExclusive = lowerExclusive_;
} else if (cmp == 0) {
lowerExclusive = otherRange->lowerExclusive_ && lowerExclusive_;
}
lower = std::max(lower_, otherRange->lower_);
}
if (this->lowerUnbounded_ && otherRange->lowerUnbounded_) {
bothLower = true;
lowerUnbounded = true;
upperExclusive = otherRange->upperExclusive_;
int cmp = lower_.compare(otherRange->lower_);
if (cmp < 0) {
upperExclusive = upperExclusive_;
} else if (cmp == 0) {
upperExclusive = otherRange->upperExclusive_ && upperExclusive;
}
upper = std::min(upper_, otherRange->upper_);
}
if (bothLower || bothUpper) {
return std::make_unique<BytesRange>(
lower,
lowerUnbounded,
lowerExclusive,
upper,
upperUnbounded,
upperExclusive,
bothNullAllowed);
}

// Will handle the case of same filter being upperUnbounded and
// lowerUnbounded later, as this can result in multiple ranges

bool thisInOther = otherRange->testBytesRange(
this->lowerUnbounded_ ? this->upper_ : this->lower_,
this->upperUnbounded_ ? this->lower_ : this->upper_,
bothNullAllowed);

bool otherInThis = this->testBytesRange(
otherRange->lowerUnbounded_ ? otherRange->upper_ : otherRange->lower_,
otherRange->upperUnbounded_ ? otherRange->lower_ : otherRange->upper_,
bothNullAllowed);

if (!thisInOther && !otherInThis) {
return nullOrFalse(bothNullAllowed);
}

if (this->upperUnbounded_) {
lower = std::max(
lower_, otherRange->lowerUnbounded_ ? lower_ : otherRange->lower_);
upper = otherRange->upper_;
upperExclusive = otherRange->upperExclusive_;
lowerExclusive = this->lowerExclusive_;
if (!otherRange->lowerUnbounded_) {
int cmp = lower_.compare(otherRange->lower_);
if (cmp < 0) {
lowerExclusive = otherRange->lowerExclusive_;
} else if (cmp == 0) {
lowerExclusive = otherRange->lowerExclusive_ && lowerExclusive_;
}
}
} else if (this->lowerUnbounded_) {
lower = otherRange->lower_;
upper = std::min(
upper_, otherRange->upperUnbounded_ ? upper_ : otherRange->upper_);
lowerExclusive = otherRange->lowerExclusive_;
upperExclusive = this->upperExclusive_;

if (!otherRange->upperUnbounded_) {
int cmp = upper_.compare(otherRange->upper_);
if (cmp > 0) {
upperExclusive = otherRange->upperExclusive_;
} else if (cmp == 0) {
upperExclusive = otherRange->upperExclusive_ && upperExclusive_;
}
}
} else if (otherRange->upperUnbounded_ || otherRange->lowerUnbounded_) {
return other->mergeWith(this);
} else {
lower = std::max(lower_, otherRange->lower_);
lowerExclusive = otherRange->lowerExclusive_ && lowerExclusive_;
upper = std::min(upper_, otherRange->upper_);
upperExclusive = upperExclusive_ && otherRange->upperExclusive_;
}

return std::make_unique<BytesRange>(
lower,
lowerUnbounded,
lowerExclusive,
upper,
upperUnbounded,
upperExclusive,
bothNullAllowed);
}

default:
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> BytesValues::mergeWith(const Filter* other) const {
switch (other->kind()) {
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
case FilterKind::kMultiRange:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return std::make_unique<BytesValues>(*this, false);
case FilterKind::kBytesValues: {
bool bothNullAllowed = nullAllowed_ && other->testNull();
auto otherBytesValues = static_cast<const BytesValues*>(other);

if (this->upper_.compare(otherBytesValues->lower_) < 0 ||
otherBytesValues->upper_.compare(this->lower_) < 0) {
return nullOrFalse(bothNullAllowed);
}
const BytesValues* smallerFilter = this;
const BytesValues* largerFilter = otherBytesValues;
if (this->values().size() > otherBytesValues->values().size()) {
smallerFilter = otherBytesValues;
largerFilter = this;
}

std::vector<std::string> newValues;
newValues.reserve(smallerFilter->values().size());

for (const auto& value : smallerFilter->values()) {
if (largerFilter->values_.contains(value)) {
newValues.emplace_back(value);
}
}

if (newValues.empty()) {
return nullOrFalse(bothNullAllowed);
}

return std::make_unique<BytesValues>(
std::move(newValues), bothNullAllowed);
}
case FilterKind::kBytesRange: {
auto otherBytesRange = static_cast<const BytesRange*>(other);
bool bothNullAllowed = nullAllowed_ && other->testNull();

if ((!otherBytesRange->isLowerUnbounded() &&
this->upper_.compare(otherBytesRange->lower()) < 0) ||
(!otherBytesRange->isUpperUnbounded() &&
this->lower_.compare(otherBytesRange->upper()) > 0)) {
return nullOrFalse(bothNullAllowed);
}

std::vector<std::string> newValues;
newValues.reserve(this->values().size());
for (const auto& value : this->values()) {
if (otherBytesRange->testBytes(value.data(), value.length())) {
newValues.emplace_back(value);
}
}

if (newValues.empty()) {
return nullOrFalse(bothNullAllowed);
}

return std::make_unique<BytesValues>(
std::move(newValues), bothNullAllowed);
}

default:
VELOX_UNREACHABLE();
}
}

std::unique_ptr<Filter> MultiRange::mergeWith(const Filter* other) const {
switch (other->kind()) {
case FilterKind::kAlwaysTrue:
case FilterKind::kAlwaysFalse:
case FilterKind::kIsNull:
return other->mergeWith(this);
case FilterKind::kIsNotNull:
return this->clone();
case FilterKind::kBytesValues:
case FilterKind::kBytesRange: {
const Filter* otherFilter;
if (other->kind() == FilterKind::kBytesValues) {
otherFilter = static_cast<const BytesValues*>(other);
} else {
otherFilter = static_cast<const BytesRange*>(other);
}

bool bothNullAllowed = this->testNull() && other->testNull();
std::vector<std::unique_ptr<Filter>> newValues;
std::vector<std::string> byteValues;

newValues.reserve(this->filters().size());

for (const auto& filter : this->filters()) {
assert(
filter->kind() == FilterKind::kBytesRange ||
filter->kind() == FilterKind::kBytesValues);
auto merged = otherFilter->mergeWith(filter.get());
switch (merged->kind()) {
case FilterKind::kBytesValues: {
auto mergedBytesValues =
static_cast<const BytesValues*>(merged.get());
byteValues.reserve(
byteValues.size() + mergedBytesValues->values().size());
for (const auto& value : mergedBytesValues->values()) {
byteValues.push_back(value);
}
}
case FilterKind::kBytesRange: {
auto mergedBytesRange =
static_cast<const BytesRange*>(merged.get());
newValues.emplace_back(merged.release());
}
default:
continue;
}
}

if (!byteValues.empty()) {
newValues.emplace_back(std::make_unique<BytesValues>(
std::move(byteValues), bothNullAllowed));
}

if (newValues.empty()) {
return nullOrFalse(bothNullAllowed);
}

if (newValues.size() == 1) {
return std::move(newValues.front());
}

return std::make_unique<MultiRange>(
std::move(newValues), bothNullAllowed, this->nanAllowed());
}
default:
VELOX_UNREACHABLE();
}
}

} // namespace facebook::velox::common
45 changes: 45 additions & 0 deletions velox/type/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,18 @@ class BytesRange final : public AbstractRange {
!lowerExclusive_ && !upperExclusive_ && !lowerUnbounded_ &&
!upperUnbounded_ && lower_ == upper_) {}

BytesRange(const BytesRange& other, bool nullAllowed)
: AbstractRange(
other.lowerUnbounded_,
other.lowerExclusive_,
other.upperUnbounded_,
other.upperExclusive_,
nullAllowed,
FilterKind::kBytesRange),
lower_(other.lower_),
upper_(other.upper_),
singleValue_(other.singleValue_) {}

std::unique_ptr<Filter> clone() const final {
return std::make_unique<BytesRange>(*this);
}
Expand All @@ -1040,6 +1052,8 @@ class BytesRange final : public AbstractRange {
return !singleValue_ || lower_.size() == length;
}

std::unique_ptr<Filter> mergeWith(const Filter* other) const final;

__m256si test8xLength(__m256si lengths) const final {
using V32 = simd::Vectors<int32_t>;
VELOX_DCHECK(singleValue_);
Expand All @@ -1050,6 +1064,14 @@ class BytesRange final : public AbstractRange {
return singleValue_;
}

bool isUpperUnbounded() const {
return upperUnbounded_;
}

bool isLowerUnbounded() const {
return lowerUnbounded_;
}

const std::string& lower() const {
return lower_;
}
Expand Down Expand Up @@ -1083,6 +1105,13 @@ class BytesValues final : public Filter {
upper_ = *std::max_element(values_.begin(), values_.end());
}

BytesValues(const BytesValues& other, bool nullAllowed)
: Filter(true, nullAllowed, FilterKind::kBytesValues),
lower_(other.lower_),
upper_(other.upper_),
values_(other.values_),
lengths_(other.lengths_) {}

std::unique_ptr<Filter> clone() const final {
return std::make_unique<BytesValues>(*this);
}
Expand All @@ -1101,6 +1130,12 @@ class BytesValues final : public Filter {
std::optional<std::string_view> max,
bool hasNull) const final;

std::unique_ptr<Filter> mergeWith(const Filter* other) const final;

const folly::F14FastSet<std::string>& values() const {
return values_;
}

private:
std::string lower_;
std::string upper_;
Expand Down Expand Up @@ -1169,6 +1204,16 @@ class MultiRange final : public Filter {

std::unique_ptr<Filter> clone() const final;

const std::vector<std::unique_ptr<Filter>>& filters() const {
return filters_;
}

std::unique_ptr<Filter> mergeWith(const Filter* other) const final;

bool nanAllowed() const {
return nanAllowed_;
}

bool testDouble(double value) const final;

bool testFloat(float value) const final;
Expand Down
14 changes: 14 additions & 0 deletions velox/type/tests/FilterBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ inline std::unique_ptr<common::BytesRange> greaterThanOrEqual(
min, false, false, "", true, true, nullAllowed);
}

inline std::unique_ptr<common::BytesRange> lessThan(
const std::string& max,
bool nullAllowed = false) {
return std::make_unique<common::BytesRange>(
"", true, false, max, false, false, nullAllowed);
}

inline std::unique_ptr<common::BytesRange> greaterThan(
const std::string& min,
bool nullAllowed = false) {
return std::make_unique<common::BytesRange>(
min, false, false, "", true, false, nullAllowed);
}

inline std::unique_ptr<common::Filter> in(
const std::vector<int64_t>& values,
bool nullAllowed = false) {
Expand Down
Loading

0 comments on commit a12933b

Please sign in to comment.