Skip to content

Commit

Permalink
Parallelization of the Pattern Trick Using OpenMP taskloops.
Browse files Browse the repository at this point in the history
The degree of parallelism is configurable at compiletime
Since it is task-based it also performs well under heavy load.
  • Loading branch information
joka921 committed Nov 27, 2020
1 parent 4910a63 commit 183f996
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 41 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN cmake -DCMAKE_BUILD_TYPE=Release -DLOGLEVEL=DEBUG -DUSE_PARALLEL=true .. &&

FROM base as runtime
WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y wget python3-yaml unzip curl bzip2 pkg-config libicu-dev python3-icu libgomp1

ARG UID=1000
Expand Down
123 changes: 84 additions & 39 deletions src/engine/CountAvailablePredicates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,68 +236,113 @@ void CountAvailablePredicates::computePatternTrick(
LOG(DEBUG) << "For " << input.size() << " entities in column "
<< subjectColumn << std::endl;

ad_utility::HashMap<Id, size_t> predicateCounts;
ad_utility::HashMap<size_t, size_t> patternCounts;
size_t inputIdx = 0;
class MergeableId : public ad_utility::HashMap<Id, size_t> {
public:
MergeableId& operator%=(const MergeableId& rhs) {
for (const auto& [key, value] : rhs) {
(*this)[key] += value;
}
return *this;
}
};

class MergeableSizeT : public ad_utility::HashMap<size_t, size_t> {
public:
MergeableSizeT& operator%=(const MergeableSizeT& rhs) {
for (const auto& [key, value] : rhs) {
(*this)[key] += value;
}
return *this;
}
};
MergeableId predicateCounts;
MergeableSizeT patternCounts;

#pragma omp declare reduction(MergeHashmapsId:MergeableId : omp_out %= omp_in)
#pragma omp declare reduction(MergeHashmapsSizeT:MergeableSizeT \
: omp_out %= omp_in)

// These variables are used to gather additional statistics
size_t numEntitiesWithPatterns = 0;
// the number of distinct predicates in patterns
size_t numPatternPredicates = 0;
// the number of predicates counted without patterns
size_t numListPredicates = 0;
Id lastSubject = ID_NO_VALUE;
while (inputIdx < input.size()) {
// Skip over elements with the same subject (don't count them twice)
Id subject = input(inputIdx, subjectColumn);
if (subject == lastSubject) {
inputIdx++;
continue;
}
lastSubject = subject;
if (subject < hasPattern.size() && hasPattern[subject] != NO_PATTERN) {
// The subject matches a pattern
patternCounts[hasPattern[subject]]++;
numEntitiesWithPatterns++;
} else if (subject < hasPredicate.size()) {
// The subject does not match a pattern
size_t numPredicates;
Id* predicateData;
std::tie(predicateData, numPredicates) = hasPredicate[subject];
numListPredicates += numPredicates;
if (numPredicates > 0) {
for (size_t i = 0; i < numPredicates; i++) {
predicateCounts[predicateData[i]]++;

if (input.size() > 0) { // avoid strange OpenMP segfaults
#pragma omp parallel
#pragma omp single
#pragma omp taskloop grainsize(500000) default(none) reduction(MergeHashmapsId:predicateCounts) reduction(MergeHashmapsSizeT : patternCounts) \
reduction(+ : numEntitiesWithPatterns) reduction(+: numPatternPredicates) reduction(+: numListPredicates) shared(input, subjectColumn, hasPattern, hasPredicate)
for (size_t inputIdx = 0; inputIdx < input.size(); ++inputIdx) {
// Skip over elements with the same subject (don't count them twice)
Id subject = input(inputIdx, subjectColumn);
if (inputIdx > 0 && subject == input(inputIdx - 1, subjectColumn)) {
continue;
}

if (subject < hasPattern.size() && hasPattern[subject] != NO_PATTERN) {
// The subject matches a pattern
patternCounts[hasPattern[subject]]++;
numEntitiesWithPatterns++;
} else if (subject < hasPredicate.size()) {
// The subject does not match a pattern
size_t numPredicates;
Id* predicateData;
std::tie(predicateData, numPredicates) = hasPredicate[subject];
numListPredicates += numPredicates;
if (numPredicates > 0) {
for (size_t i = 0; i < numPredicates; i++) {
predicateCounts[predicateData[i]]++;
}
} else {
LOG(TRACE) << "No pattern or has-relation entry found for entity "
<< std::to_string(subject) << std::endl;
}
} else {
LOG(TRACE) << "No pattern or has-relation entry found for entity "
<< std::to_string(subject) << std::endl;
LOG(TRACE) << "Subject " << subject
<< " does not appear to be an entity "
"(its id is to high)."
<< std::endl;
}
} else {
LOG(TRACE) << "Subject " << subject
<< " does not appear to be an entity "
"(its id is to high)."
<< std::endl;
}
inputIdx++;
}
LOG(DEBUG) << "Using " << patternCounts.size()
<< " patterns for computing the result." << std::endl;
// the number of predicates counted with patterns
size_t numPredicatesSubsumedInPatterns = 0;
// resolve the patterns to predicate counts
for (const auto& it : patternCounts) {
std::pair<Id*, size_t> pattern = patterns[it.first];
numPatternPredicates += pattern.second;
for (size_t i = 0; i < pattern.second; i++) {
predicateCounts[pattern.first[i]] += it.second;
numPredicatesSubsumedInPatterns += it.second;

LOG(DEBUG) << "Converting PatternMap to vector" << std::endl;
// flatten into a vector, to make iterable
std::vector<std::pair<size_t, size_t>> patternVec;
patternVec.reserve(patternCounts.size());
for (const auto& p : patternCounts) {
patternVec.push_back(p);
}

LOG(DEBUG) << "Start convertin patterns" << std::endl;
if (patternVec.begin() != patternVec.end()) { // avoid segfaults with OpenMP
#pragma omp parallel
#pragma omp single
#pragma omp taskloop grainsize(100000) default(none) reduction(MergeHashmapsId:predicateCounts) reduction(+ : numPredicatesSubsumedInPatterns) \
reduction(+ : numEntitiesWithPatterns) reduction(+: numPatternPredicates) reduction(+: numListPredicates) shared( patternVec, patterns)
for (auto it = patternVec.begin(); it != patternVec.end(); ++it) {
std::pair<Id*, size_t> pattern = patterns[it->first];
numPatternPredicates += pattern.second;
for (size_t i = 0; i < pattern.second; i++) {
predicateCounts[pattern.first[i]] += it->second;
numPredicatesSubsumedInPatterns += it->second;
}
}
}
LOG(DEBUG) << "Finished converting patterns" << std::endl;
// write the predicate counts to the result
result.reserve(predicateCounts.size());
for (const auto& it : predicateCounts) {
result.push_back({it.first, static_cast<Id>(it.second)});
}
LOG(DEBUG) << "Finished writing results" << std::endl;

// Print interesting statistics about the pattern trick
double ratioHasPatterns =
Expand Down
6 changes: 4 additions & 2 deletions src/util/Log.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,13 @@ class Log {
static void imbue(const std::locale& locale) { std::cout.imbue(locale); }

static string getTimeStamp() {
using namespace std::chrono;
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);

auto ms = duration_cast<milliseconds>(now.time_since_epoch()) % 1000;
std::stringstream ss;
ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X");
ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X") << '.'
<< ms.count();
return ss.str();
}

Expand Down

0 comments on commit 183f996

Please sign in to comment.