Skip to content

Commit

Permalink
Refactor copy-errors logic in EvalCtx (#9909)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #9909

Reviewed By: xiaoxmeng

Differential Revision: D57725457

fbshipit-source-id: 44a0a08cd83cef31d7624d7df9704b2f053b4371
  • Loading branch information
mbasmanova authored and facebook-github-bot committed May 23, 2024
1 parent 965ee6c commit 5204da9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 28 deletions.
49 changes: 21 additions & 28 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,17 @@ void EvalCtx::saveAndReset(ContextSaver& saver, const SelectivityVector& rows) {

void EvalCtx::ensureErrorsVectorSize(ErrorVectorPtr& vector, vector_size_t size)
const {
auto oldSize = vector ? vector->size() : 0;
if (!vector) {
vector = std::make_shared<ErrorVector>(
pool(),
OpaqueType::create<void>(),
AlignedBuffer::allocate<bool>(size, pool(), false) /*nulls*/,
size /*length*/,
allocateNulls(size, pool(), bits::kNull),
size,
AlignedBuffer::allocate<ErrorVector::value_type>(
size, pool(), ErrorVector::value_type()),
std::vector<BufferPtr>(0),
SimpleVectorStats<std::shared_ptr<void>>{},
1 /*distinctValueCount*/,
size /*nullCount*/,
false /*isSorted*/,
size /*representedBytes*/);
std::vector<BufferPtr>{});
} else if (vector->size() < size) {
const auto oldSize = vector->size();
vector->resize(size, false);
// Set all new positions to null, including the one to be set.
for (auto i = oldSize; i < size; ++i) {
Expand All @@ -121,7 +116,6 @@ void EvalCtx::addError(
ErrorVectorPtr& errorsPtr) const {
ensureErrorsVectorSize(errorsPtr, index + 1);
if (errorsPtr->isNullAt(index)) {
errorsPtr->setNull(index, false);
errorsPtr->set(index, std::make_shared<std::exception_ptr>(exceptionPtr));
}
}
Expand All @@ -140,31 +134,35 @@ void EvalCtx::addErrors(
return false;
}
if (!fromErrors->isNullAt(row) && toErrors->isNullAt(row)) {
toErrors->set(
row,
std::static_pointer_cast<std::exception_ptr>(
fromErrors->valueAt(row)));
toErrors->set(row, fromErrors->valueAt(row));
}
return true;
});
}

void EvalCtx::copyError(
const ErrorVector& from,
vector_size_t fromIndex,
ErrorVectorPtr& to,
vector_size_t toIndex) const {
const auto fromSize = from.size();
if (fromIndex < fromSize && !from.isNullAt(fromIndex)) {
addError(
toIndex,
*std::static_pointer_cast<std::exception_ptr>(from.valueAt(fromIndex)),
to);
}
}

void EvalCtx::restore(ContextSaver& saver) {
TestValue::adjust("facebook::velox::exec::EvalCtx::restore", this);

peeledFields_ = std::move(saver.peeled);
nullsPruned_ = saver.nullsPruned;
if (errors_) {
int32_t errorSize = errors_->size();
peeledEncoding_->applyToNonNullInnerRows(
*saver.rows, [&](auto outerRow, auto innerRow) {
if (innerRow < errorSize && !errors_->isNullAt(innerRow)) {
addError(
outerRow,
*std::static_pointer_cast<std::exception_ptr>(
errors_->valueAt(innerRow)),
saver.errors);
}
copyError(*errors_, innerRow, saver.errors, outerRow);
});
}
errors_ = std::move(saver.errors);
Expand Down Expand Up @@ -250,12 +248,7 @@ void EvalCtx::addElementErrorsToTopLevel(
const auto* rawElementToTopLevelRows =
elementToTopLevelRows->as<vector_size_t>();
elementRows.applyToSelected([&](auto row) {
if (errors_->isIndexInRange(row) && !errors_->isNullAt(row)) {
addError(
rawElementToTopLevelRows[row],
*std::static_pointer_cast<std::exception_ptr>(errors_->valueAt(row)),
topLevelErrors);
}
copyError(*errors_, row, topLevelErrors, rawElementToTopLevelRows[row]);
});
}

Expand Down
9 changes: 9 additions & 0 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ class EvalCtx {
private:
void ensureErrorsVectorSize(ErrorVectorPtr& errors, vector_size_t size) const;

// Copy error from 'from' at index 'fromIndex' to 'to' at index 'toIndex'.
// No-op if 'from' doesn't have an error at 'fromIndex' or if 'to' already has
// an error at 'toIndex'.
void copyError(
const ErrorVector& from,
vector_size_t fromIndex,
ErrorVectorPtr& to,
vector_size_t toIndex) const;

core::ExecCtx* const execCtx_;
ExprSet* const exprSet_;
const RowVector* row_;
Expand Down

0 comments on commit 5204da9

Please sign in to comment.