Skip to content

Commit

Permalink
Fix bugs and segfaults
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed May 21, 2024
1 parent ee15c4a commit 4bf7c9c
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 44 deletions.
5 changes: 4 additions & 1 deletion src/engine/CartesianProductJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class CartesianProductJoin : public Operation {
bool knownEmptyResult() override;

// The Cartesian product join can efficiently evaluate a limited result.
[[nodiscard]] bool supportsLimit() const override { return true; }
[[nodiscard]] bool supportsLimit(
[[maybe_unused]] bool lazyResult) const override {
return true;
}

protected:
// Don't promise any sorting of the result.
Expand Down
4 changes: 2 additions & 2 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class IndexScan : public Operation {
}

// Currently only the full scans support a limit clause.
[[nodiscard]] bool supportsLimit() const override {
return getResultWidth() == 3;
[[nodiscard]] bool supportsLimit(bool lazyResult) const override {
return !lazyResult && getResultWidth() == 3;
}

Permutation::Enum permutation() const { return permutation_; }
Expand Down
11 changes: 6 additions & 5 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ std::shared_ptr<const Result> Operation::getResult(
// Apply LIMIT and OFFSET, but only if the call to `computeResult` did not
// already perform it. An example for an operation that directly computes
// the Limit is a full index scan with three variables.
if (!supportsLimit()) {
if (!supportsLimit(!result.isDataEvaluated())) {
runtimeInfo().addLimitOffsetRow(_limit, std::chrono::milliseconds{0},
true);
result.applyLimitOffset(_limit,
Expand Down Expand Up @@ -214,9 +214,10 @@ std::shared_ptr<const Result> Operation::getResult(
return nullptr;
}

if (result._resultPointer->resultTable()->isDataEvaluated()) {
updateRuntimeInformationOnSuccess(result, timer.msecs());
}
updateRuntimeInformationOnSuccess(
result, result._resultPointer->resultTable()->isDataEvaluated()
? timer.msecs()
: result._resultPointer->runtimeInfo().totalTime_);

if (result._resultPointer->resultTable()->isDataEvaluated()) {
auto resultNumRows =
Expand Down Expand Up @@ -330,7 +331,7 @@ void Operation::updateRuntimeInformationOnSuccess(

// ____________________________________________________________________________________________________________________
void Operation::updateRuntimeInformationOnSuccess(
const ConcurrentLruCache ::ResultAndCacheStatus& resultAndCacheStatus,
const ConcurrentLruCache::ResultAndCacheStatus& resultAndCacheStatus,
Milliseconds duration) {
updateRuntimeInformationOnSuccess(
*resultAndCacheStatus._resultPointer->resultTable(),
Expand Down
5 changes: 4 additions & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ class Operation {

// True iff this operation directly implement a `OFFSET` and `LIMIT` clause on
// its result.
[[nodiscard]] virtual bool supportsLimit() const { return false; }
[[nodiscard]] virtual bool supportsLimit(
[[maybe_unused]] bool lazyResult) const {
return false;
}

public:
virtual float getMultiplicity(size_t col) = 0;
Expand Down
53 changes: 29 additions & 24 deletions src/engine/Result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ auto Result::getMergedLocalVocab(const Result& resultTable1,
LocalVocab Result::getCopyOfLocalVocab() const { return localVocab().clone(); }

// _____________________________________________________________________________
void Result::validateIdTable(const IdTable& idTable) const {
AD_CONTRACT_CHECK(std::ranges::all_of(sortedBy_, [&idTable](size_t numCols) {
void Result::validateIdTable(const IdTable& idTable,
const std::vector<ColumnIndex>& sortedBy) {
AD_CONTRACT_CHECK(std::ranges::all_of(sortedBy, [&idTable](size_t numCols) {
return numCols < idTable.numColumns();
}));

[[maybe_unused]] auto compareRowsByJoinColumns = [this](const auto& row1,
const auto& row2) {
for (size_t col : sortedBy_) {
if (row1[col] != row2[col]) {
return row1[col] < row2[col];
}
}
return false;
};
[[maybe_unused]] auto compareRowsByJoinColumns =
[&sortedBy](const auto& row1, const auto& row2) {
for (size_t col : sortedBy) {
if (row1[col] != row2[col]) {
return row1[col] < row2[col];
}
}
return false;
};
AD_EXPENSIVE_CHECK(std::ranges::is_sorted(idTable, compareRowsByJoinColumns));
}

Expand All @@ -61,7 +62,7 @@ Result::Result(IdTable idTable, std::vector<ColumnIndex> sortedBy,
sortedBy_{std::move(sortedBy)},
localVocab_{std::move(localVocab.localVocab_)} {
AD_CONTRACT_CHECK(localVocab_ != nullptr);
validateIdTable(std::get<IdTable>(data_));
validateIdTable(std::get<IdTable>(data_), sortedBy_);
}

// _____________________________________________________________________________
Expand All @@ -75,13 +76,13 @@ Result::Result(cppcoro::generator<IdTable> idTables,
std::vector<ColumnIndex> sortedBy,
SharedLocalVocabWrapper localVocab)
: data_{ad_utility::CacheableGenerator{
[this, idTables = std::move(
idTables)]() mutable -> cppcoro::generator<IdTable> {
[](auto idTables,
auto sortedBy) mutable -> cppcoro::generator<IdTable> {

Check warning on line 80 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L80

Added line #L80 was not covered by tests
for (IdTable& idTable : idTables) {
validateIdTable(idTable);
validateIdTable(idTable, sortedBy);
co_yield std::move(idTable);
}
}()}},
}(std::move(idTables), sortedBy)}},

Check warning on line 85 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L82-L85

Added lines #L82 - L85 were not covered by tests
sortedBy_{std::move(sortedBy)},
localVocab_{std::move(localVocab.localVocab_)} {
AD_CONTRACT_CHECK(localVocab_ != nullptr);
Expand Down Expand Up @@ -123,6 +124,7 @@ void Result::applyLimitOffset(
// Apply the OFFSET clause. If the offset is `0` or the offset is larger
// than the size of the `IdTable`, then this has no effect and runtime
// `O(1)` (see the docs for `std::shift_left`).
AD_CONTRACT_CHECK(limitTimeCallback);
AD_CONTRACT_CHECK(
!std::holds_alternative<cppcoro::generator<const IdTable>>(data_));
using Gen = ad_utility::CacheableGenerator<IdTable>;
Expand All @@ -140,12 +142,13 @@ void Result::applyLimitOffset(
}

Check warning on line 142 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L141-L142

Added lines #L141 - L142 were not covered by tests
for (auto&& idTable : original) {
ad_utility::timer::Timer limitTimer{ad_utility::timer::Timer::Started};
size_t originalSize = idTable.numRows();
modifyIdTable(idTable, limitOffset);
uint64_t offsetDelta = limitOffset.actualOffset(idTable.numRows());
uint64_t offsetDelta = limitOffset.actualOffset(originalSize);
limitOffset._offset -= offsetDelta;

Check warning on line 148 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L144-L148

Added lines #L144 - L148 were not covered by tests
if (limitOffset._limit.has_value()) {
limitOffset._limit.value() -=
limitOffset.actualSize(idTable.numRows() - offsetDelta);
limitOffset.actualSize(originalSize - offsetDelta);
}
limitTimeCallback(limitTimer.msecs());

Check warning on line 153 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L150-L153

Added lines #L150 - L153 were not covered by tests
if (limitOffset._offset == 0) {
Expand Down Expand Up @@ -175,11 +178,13 @@ void Result::enforceLimitOffset(const LimitOffsetClause& limitOffset) {
auto generator =
[](cppcoro::generator<IdTable> original,
LimitOffsetClause limitOffset) -> cppcoro::generator<IdTable> {
size_t elementCount = 0;

Check warning on line 181 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L178-L181

Added lines #L178 - L181 were not covered by tests
for (auto&& idTable : original) {
AD_CONTRACT_CHECK(idTable.numRows() ==
limitOffset.actualSize(idTable.numRows()));
elementCount += idTable.numRows();
AD_CONTRACT_CHECK(elementCount <= limitOffset.actualSize(elementCount));
co_yield std::move(idTable);
}
AD_CONTRACT_CHECK(elementCount == limitOffset.actualSize(elementCount));
}(std::move(std::get<Gen>(data_)).extractGenerator(), limitOffset);
data_.emplace<Gen>(std::move(generator));
} else {
Expand Down Expand Up @@ -378,9 +383,9 @@ Result Result::createResultWithFallback(
std::chrono::duration_cast<std::chrono::milliseconds>(stop - start));
}
};
return Result{generator(std::move(original), std::move(fallback),
std::move(onIteration)),
original->sortedBy_, original->localVocab_};
return Result{
generator(original, std::move(fallback), std::move(onIteration)),
original->sortedBy_, original->localVocab_};
}

Check warning on line 389 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L382-L389

Added lines #L382 - L389 were not covered by tests

// _____________________________________________________________________________
Expand All @@ -399,6 +404,6 @@ Result Result::createResultAsMasterConsumer(
co_yield idTable;
}
};
return Result{generator(std::move(original), std::move(onIteration)),
return Result{generator(original, std::move(onIteration)),
original->sortedBy_, original->localVocab_};
}

Check warning on line 409 in src/engine/Result.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Result.cpp#L402-L409

Added lines #L402 - L409 were not covered by tests
3 changes: 2 additions & 1 deletion src/engine/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class Result {
Result(cppcoro::generator<const IdTable> idTables,
std::vector<ColumnIndex> sortedBy, LocalVocabPtr localVocab);

void validateIdTable(const IdTable& idTable) const;
static void validateIdTable(const IdTable& idTable,
const std::vector<ColumnIndex>& sortedBy);

public:
// Construct from the given arguments (see above) and check the following
Expand Down
22 changes: 13 additions & 9 deletions src/util/CacheableGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Chair of Algorithms and Data Structures.
// Author: Robin Textor-Falconi <textorr@informatik.uni-freiburg.de>

#ifndef REUSABLEGENERATOR_H
#define REUSABLEGENERATOR_H
#ifndef CACHEABLEGENERATOR_H
#define CACHEABLEGENERATOR_H

#include <chrono>
#include <optional>
Expand Down Expand Up @@ -67,6 +67,10 @@ class CacheableGenerator {
}
return;
}
if (generatorIterator_.has_value() &&
generatorIterator_.value() == generator_.end()) {
return;
}
if (masterState_ == MasterIteratorState::MASTER_STARTED) {
if (!isMaster) {
conditionVariable_.wait(lock, [this, index]() {
Expand Down Expand Up @@ -118,7 +122,7 @@ class CacheableGenerator {
public:
bool isDone(size_t index) noexcept {
std::shared_lock lock{mutex_};
return index == cachedValues_.size() && generatorIterator_.has_value() &&
return index >= cachedValues_.size() && generatorIterator_.has_value() &&
generatorIterator_.value() == generator_.end();
}

Expand Down Expand Up @@ -201,7 +205,7 @@ class CacheableGenerator {

public:
explicit Iterator(std::weak_ptr<ComputationStorage> storage, bool isMaster)
: storage_{storage,
: storage_{std::move(storage),
[isMaster](auto&& storage) {
if (isMaster) {
auto pointer = storage.lock();
Expand All @@ -214,7 +218,7 @@ class CacheableGenerator {
}

friend bool operator==(const Iterator& it, IteratorSentinel) noexcept {
return !it.storage()->isDone(it.currentIndex_);
return it.storage()->isDone(it.currentIndex_);
}

friend bool operator!=(const Iterator& it, IteratorSentinel s) noexcept {
Expand All @@ -238,14 +242,14 @@ class CacheableGenerator {
// Need to provide post-increment operator to implement the 'Range' concept.
void operator++(int) { (void)operator++(); }

Reference operator*() const noexcept {
Reference operator*() const {
return storage()->getCachedValue(currentIndex_);
}

Pointer operator->() const noexcept { return std::addressof(operator*()); }
Pointer operator->() const { return std::addressof(operator*()); }
};

Iterator begin(bool isMaster = false) const noexcept {
Iterator begin(bool isMaster = false) const {
return Iterator{computationStorage_, isMaster};
}

Expand Down Expand Up @@ -277,4 +281,4 @@ class CacheableGenerator {
};
}; // namespace ad_utility

#endif // REUSABLEGENERATOR_H
#endif // CACHEABLEGENERATOR_H
4 changes: 3 additions & 1 deletion test/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class ValuesForTesting : public Operation {
}
return {std::move(table), resultSortedOn(), localVocab_.clone()};
}
bool supportsLimit() const override { return supportsLimit_; }
bool supportsLimit([[maybe_unused]] bool lazyResult) const override {
return supportsLimit_;
}

private:
// ___________________________________________________________________________
Expand Down

0 comments on commit 4bf7c9c

Please sign in to comment.