Skip to content

Commit

Permalink
Implemented the GroupByScan for the Agnostic+ mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
joka921 committed Oct 5, 2021
1 parent d883b40 commit 8a7441a
Show file tree
Hide file tree
Showing 17 changed files with 314 additions and 353 deletions.
19 changes: 13 additions & 6 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,14 @@ void GroupBy::computeResult(ResultTable* result) {

// find out, if we simply count over a POS index scan, then we can use
// the special logic.
auto indexScan = dynamic_cast<const IndexScan*>(_subtree->getRootOperation().get());

if (indexScan && indexScan->getType() == IndexScan::POS_FREE_O && aggregates.size() == 2 && aggregates[0]._inCol == 0 && aggregates[0]._type == ParsedQuery::AggregateType::SAMPLE && aggregates[1]._inCol == 1 && aggregates[1]._type == ParsedQuery::AggregateType::COUNT) {
auto indexScan =
dynamic_cast<const IndexScan*>(_subtree->getRootOperation().get());

if (indexScan && indexScan->getType() == IndexScan::POS_FREE_O &&
aggregates.size() == 2 && aggregates[0]._inCol == 0 &&
aggregates[0]._type == ParsedQuery::AggregateType::SAMPLE &&
aggregates[1]._inCol == 1 &&
aggregates[1]._type == ParsedQuery::AggregateType::COUNT) {
performGroupByOnIndexScan(result, indexScan->getPredicate());
return;
}
Expand All @@ -821,15 +826,17 @@ void GroupBy::computeResult(ResultTable* result) {
}
}

void GroupBy::performGroupByOnIndexScan(ResultTable* resultTable, const string& predicate) {
void GroupBy::performGroupByOnIndexScan(ResultTable* resultTable,
const string& predicate) {
// TODO: support other permutations as well
Id col0Id;
bool idWasFound = getIndex().getVocab().getId(predicate, &col0Id);
if (! idWasFound) {
if (!idWasFound) {
return;
}
const auto& permutation = getIndex().POS();
auto blockGenerator = CompressedRelationMetaData::ScanBlockGenerator(col0Id, permutation, nullptr);
auto blockGenerator = CompressedRelationMetaData::ScanBlockGenerator(
col0Id, permutation, nullptr);
auto result = resultTable->_data.moveToStatic<2>();
Id lastId = ID_NO_VALUE;
size_t count = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/engine/GroupBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class GroupBy : public Operation {
const vector<GroupBy::Aggregate>& aggregates,
IdTable* dynResult, const ResultTable* inTable,
ResultTable* outTable, const Index& index) const;
void performGroupByOnIndexScan(ResultTable* resultTable, const string& predicate);
void performGroupByOnIndexScan(ResultTable* resultTable,
const string& predicate);

FRIEND_TEST(GroupByTest, doGroupBy);
};
132 changes: 63 additions & 69 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include "../util/Cache.h"
#include "../util/CompressionUsingZstd/ZstdWrapper.h"
#include "../util/ConcurrentCache.h"
#include "../util/Generator.h"
#include "../util/ParallelPipeline.h"
#include "../util/TypeTraits.h"
#include "./Permutations.h"
#include "ConstantsIndexBuilding.h"
#include "../util/ParallelPipeline.h"
#include "../util/Generator.h"

using namespace std::chrono_literals;

Expand All @@ -38,13 +38,13 @@ auto blockRangeForCol0Id(Id col0Id, const auto& permutation) {
permutation._meta.blockData().begin(),
permutation._meta.blockData().end(), KeyLhs{col0Id, col0Id},
[](const auto& a, const auto& b) {
return a._col0FirstId < b._col0FirstId &&
a._col0LastId < b._col0LastId;
return a._col0FirstId < b._col0FirstId && a._col0LastId < b._col0LastId;
});
}

template<typename Iterator>
bool isFirstBlockIncomplete(Iterator beginBlock, Iterator endBlock, Id col0Id, const auto& col0MetaData) {
template <typename Iterator>
bool isFirstBlockIncomplete(Iterator beginBlock, Iterator endBlock, Id col0Id,
const auto& col0MetaData) {
// The first block might contain entries that are not part of our
// actual scan result.
bool firstBlockIsIncomplete =
Expand All @@ -66,29 +66,35 @@ bool isFirstBlockIncomplete(Iterator beginBlock, Iterator endBlock, Id col0Id, c
return firstBlockIsIncomplete;
}

std::vector<std::array<Id, 2>> readIncompleteBlock(const auto& permutation, const auto& blockMetaData, const auto& col0MetaData) {
auto cacheKey =
permutation._readableName + std::to_string(blockMetaData._offsetInFile);

auto uncompressedBuffer =
globalBlockCache()
.computeOnce(
cacheKey,
[&]() { return CompressedRelationMetaData::readAndDecompressBlock(blockMetaData, permutation); })
._resultPointer;

// Extract the part of the block that actually belongs to the relation
auto begin = uncompressedBuffer->begin() + col0MetaData._offsetInBlock;
auto end = begin + col0MetaData._numRows;
std::vector<std::array<Id, 2>> firstBlock;
firstBlock.reserve(end - begin);
std::copy(begin, end, std::back_inserter(firstBlock));
return firstBlock;
std::vector<std::array<Id, 2>> readIncompleteBlock(const auto& permutation,
const auto& blockMetaData,
const auto& col0MetaData) {
auto cacheKey =
permutation._readableName + std::to_string(blockMetaData._offsetInFile);

auto uncompressedBuffer =
globalBlockCache()
.computeOnce(
cacheKey,
[&]() {
return CompressedRelationMetaData::readAndDecompressBlock(
blockMetaData, permutation);
})
._resultPointer;

// Extract the part of the block that actually belongs to the relation
auto begin = uncompressedBuffer->begin() + col0MetaData._offsetInBlock;
auto end = begin + col0MetaData._numRows;
std::vector<std::array<Id, 2>> firstBlock;
firstBlock.reserve(end - begin);
std::copy(begin, end, std::back_inserter(firstBlock));
return firstBlock;
}

// ____________________________________________________________________________
template <class Permutation>
cppcoro::generator<CompressedRelationMetaData::DecompressedBlock> CompressedRelationMetaData::ScanBlockGenerator(
cppcoro::generator<CompressedRelationMetaData::DecompressedBlock>
CompressedRelationMetaData::ScanBlockGenerator(
Id col0Id, const Permutation& permutation,
ad_utility::SharedConcurrentTimeoutTimer timer) {
if (permutation._meta.col0IdExists(col0Id)) {
Expand All @@ -99,7 +105,8 @@ cppcoro::generator<CompressedRelationMetaData::DecompressedBlock> CompressedRela

// The first block might contain entries that are not part of our
// actual scan result.
bool firstBlockIsIncomplete = isFirstBlockIncomplete(beginBlock, endBlock, col0Id, metaData);
bool firstBlockIsIncomplete =
isFirstBlockIncomplete(beginBlock, endBlock, col0Id, metaData);

// We have at most one block that is incomplete and thus requires trimming.
// Set up a lambda, that reads this block and decompresses it to
Expand All @@ -121,8 +128,9 @@ cppcoro::generator<CompressedRelationMetaData::DecompressedBlock> CompressedRela
&permutation]() -> std::optional<NumRowsAndCompressedBlock> {
if (blockItForReader < endBlock) {
auto numRows = blockItForReader->_numRows;
return std::pair(numRows, CompressedRelationMetaData::readCompressedBlockFromFile(
*blockItForReader++, permutation));
return std::pair(
numRows, CompressedRelationMetaData::readCompressedBlockFromFile(
*blockItForReader++, permutation));
}
return std::nullopt;
};
Expand All @@ -132,9 +140,12 @@ cppcoro::generator<CompressedRelationMetaData::DecompressedBlock> CompressedRela
};

std::vector<std::array<Id, 2>> intermediateResult;
auto returner = [&](DecompressedBlock && result) { intermediateResult = std::move(result); };
auto returner = [&](DecompressedBlock&& result) {
intermediateResult = std::move(result);
};

ad_pipeline::Pipeline p(true, {1, 10, 0}, readBlocks, decompressLambda, returner);
ad_pipeline::Pipeline p(true, {1, 10, 0}, readBlocks, decompressLambda,
returner);

while (auto optionalTask = p.popManually()) {
optionalTask.value()();
Expand All @@ -153,15 +164,15 @@ void CompressedRelationMetaData::scan(
AD_CHECK(result->cols() == 2);
}
if (permutation._meta.col0IdExists(col0Id)) {

const auto& metaData = permutation._meta.getMetaData(col0Id);

// get all the blocks where _col0FirstId <= col0Id <= _col0LastId
auto [beginBlock, endBlock] = blockRangeForCol0Id(col0Id, permutation);

// The first block might contain entries that are not part of our
// actual scan result.
bool firstBlockIsIncomplete = isFirstBlockIncomplete(beginBlock, endBlock, col0Id, metaData);
bool firstBlockIsIncomplete =
isFirstBlockIncomplete(beginBlock, endBlock, col0Id, metaData);

// The total size of the result is now known.
result->resize(metaData.getNofElements());
Expand Down Expand Up @@ -226,44 +237,33 @@ using V = std::vector<std::array<Id, 2>>;
using Timer = const ad_utility::SharedConcurrentTimeoutTimer&;
// Explicit instantiations for all six permutations
template void CompressedRelationMetaData::scan<Permutation::POS_T, IdTable>(
Id key, IdTable* result, const Permutation::POS_T& p,
Timer);
Id key, IdTable* result, const Permutation::POS_T& p, Timer);
template void CompressedRelationMetaData::scan<Permutation::PSO_T, IdTable>(
Id key, IdTable* result, const Permutation::PSO_T& p,
Timer);
Id key, IdTable* result, const Permutation::PSO_T& p, Timer);
template void CompressedRelationMetaData::scan<Permutation::SPO_T, IdTable>(
Id key, IdTable* result, const Permutation::SPO_T& p,
Timer timer);
Id key, IdTable* result, const Permutation::SPO_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::SOP_T, IdTable>(
Id key, IdTable* result, const Permutation::SOP_T& p,
Timer timer);
Id key, IdTable* result, const Permutation::SOP_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::OPS_T, IdTable>(
Id key, IdTable* result, const Permutation::OPS_T& p,
Timer timer);
Id key, IdTable* result, const Permutation::OPS_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::OSP_T, IdTable>(
Id key, IdTable* result, const Permutation::OSP_T& p,
Timer timer);
Id key, IdTable* result, const Permutation::OSP_T& p, Timer timer);

template void CompressedRelationMetaData::scan<Permutation::POS_T, V>(
Id key, V* result, const Permutation::POS_T& p,
Timer timer);
Id key, V* result, const Permutation::POS_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::PSO_T, V>(
Id key, V* result, const Permutation::PSO_T& p,
Timer timer);
Id key, V* result, const Permutation::PSO_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::SPO_T, V>(
Id key, V* result, const Permutation::SPO_T& p,
Timer timer);
Id key, V* result, const Permutation::SPO_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::SOP_T, V>(
Id key, V* result, const Permutation::SOP_T& p,
Timer timer);
Id key, V* result, const Permutation::SOP_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::OPS_T, V>(
Id key, V* result, const Permutation::OPS_T& p,
Timer timer);
Id key, V* result, const Permutation::OPS_T& p, Timer timer);
template void CompressedRelationMetaData::scan<Permutation::OSP_T, V>(
Id key, V* result, const Permutation::OSP_T& p,
Timer timer);
Id key, V* result, const Permutation::OSP_T& p, Timer timer);
// ____________________________________________________________________________
template cppcoro::generator<CompressedRelationMetaData::DecompressedBlock> CompressedRelationMetaData::ScanBlockGenerator<Permutation::POS_T> (
template cppcoro::generator<CompressedRelationMetaData::DecompressedBlock>
CompressedRelationMetaData::ScanBlockGenerator<Permutation::POS_T>(
Id col0Id, const Permutation::POS_T& permutation,
ad_utility::SharedConcurrentTimeoutTimer timer);

Expand Down Expand Up @@ -418,23 +418,17 @@ void CompressedRelationMetaData::scan(

// Explicit instantiations for all six permutations
template void CompressedRelationMetaData::scan<Permutation::POS_T, IdTable>(
Id, Id, IdTable*, const Permutation::POS_T&,
Timer);
Id, Id, IdTable*, const Permutation::POS_T&, Timer);
template void CompressedRelationMetaData::scan<Permutation::PSO_T, IdTable>(
Id, Id, IdTable*, const Permutation::PSO_T&,
Timer);
Id, Id, IdTable*, const Permutation::PSO_T&, Timer);
template void CompressedRelationMetaData::scan<Permutation::SOP_T, IdTable>(
const Id, const Id, IdTable*, const Permutation::SOP_T&,
Timer);
const Id, const Id, IdTable*, const Permutation::SOP_T&, Timer);
template void CompressedRelationMetaData::scan<Permutation::SPO_T, IdTable>(
const Id, const Id, IdTable*, const Permutation::SPO_T&,
Timer);
const Id, const Id, IdTable*, const Permutation::SPO_T&, Timer);
template void CompressedRelationMetaData::scan<Permutation::OPS_T, IdTable>(
const Id, const Id, IdTable*, const Permutation::OPS_T&,
Timer);
const Id, const Id, IdTable*, const Permutation::OPS_T&, Timer);
template void CompressedRelationMetaData::scan<Permutation::OSP_T, IdTable>(
const Id, const Id, IdTable*, const Permutation::OSP_T&,
Timer);
const Id, const Id, IdTable*, const Permutation::OSP_T&, Timer);

// ___________________________________________________________________________
void CompressedRelationWriter::addRelation(
Expand Down
16 changes: 8 additions & 8 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#define QLEVER_COMPRESSEDRELATION_H

#include <algorithm>
#include "../util/Generator.h"
#include <vector>

#include "../global/Id.h"
#include "../util/BufferedVector.h"
#include "../util/File.h"
#include "../util/Generator.h"
#include "../util/Serializer/SerializeVector.h"
#include "../util/Serializer/Serializer.h"
#include "../util/Timer.h"
Expand Down Expand Up @@ -93,8 +93,8 @@ struct CompressedRelationMetaData {
// The IdTable is a rather expensive type, so we don't include it here.
// but we can also not forward declare it because it is actually an alias.
template <class Permutation, typename IdTableImpl>
static void scan(Id col0Id, IdTableImpl* result,
const Permutation& permutation,
static void scan(
Id col0Id, IdTableImpl* result, const Permutation& permutation,
const ad_utility::SharedConcurrentTimeoutTimer& timer = nullptr);

/**
Expand All @@ -112,12 +112,12 @@ struct CompressedRelationMetaData {
* members of Index class).
*/
template <class PermutationInfo, typename IdTableImpl>
static void scan(Id count, Id col1Id, IdTableImpl* result,
const PermutationInfo& permutation,
const ad_utility::SharedConcurrentTimeoutTimer& timer = nullptr);

static void scan(
Id count, Id col1Id, IdTableImpl* result,
const PermutationInfo& permutation,
const ad_utility::SharedConcurrentTimeoutTimer& timer = nullptr);

// ____________________________________________________________________________
// ____________________________________________________________________________
template <class Permutation>
static cppcoro::generator<DecompressedBlock> ScanBlockGenerator(
Id col0Id, const Permutation& permutation,
Expand Down

0 comments on commit 8a7441a

Please sign in to comment.