Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support of SERVICE clause #793

Merged
merged 5 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_library(engine
Union.cpp Union.h
MultiColumnJoin.cpp MultiColumnJoin.h
TransitivePath.cpp TransitivePath.h
Service.cpp Service.h
Values.cpp Values.h
Bind.cpp Bind.h
idTable/IdTable.h
Expand All @@ -39,5 +40,4 @@ add_library(engine
../util/Parameters.h RuntimeInformation.cpp CheckUsePatternTrick.cpp CheckUsePatternTrick.h
VariableToColumnMap.cpp ExportQueryExecutionTrees.cpp )


target_link_libraries(engine index parser sparqlExpressions http SortPerformanceEstimator absl::flat_hash_set ${ICU_LIBRARIES} boost_iostreams)
2 changes: 2 additions & 0 deletions src/engine/CheckUsePatternTrick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ bool isVariableContainedInGraphPatternOperation(
});
} else if constexpr (std::is_same_v<T, p::Values>) {
return ad_utility::contains(arg._inlineValues._variables, variable);
} else if constexpr (std::is_same_v<T, p::Service>) {
return ad_utility::contains(arg.visibleVariables_, variable);
} else {
static_assert(std::is_same_v<T, p::TransPath>);
// The `TransPath` is set up later in the query planning, when this
Expand Down
11 changes: 11 additions & 0 deletions src/engine/LocalVocab.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ LocalVocabIndex LocalVocab::getIndexAndAddIfNotContained(std::string&& word) {
return getIndexAndAddIfNotContainedImpl(std::move(word));
}

// _____________________________________________________________________________
std::optional<LocalVocabIndex> LocalVocab::getIndexOrNullopt(
const std::string& word) const {
auto localVocabIndex = wordsToIndexesMap_.find(word);
if (localVocabIndex != wordsToIndexesMap_.end()) {
return localVocabIndex->second;
} else {
return std::nullopt;
}
}

// _____________________________________________________________________________
const std::string& LocalVocab::getWord(LocalVocabIndex localVocabIndex) const {
if (localVocabIndex.get() >= indexesToWordsMap_.size()) {
Expand Down
5 changes: 5 additions & 0 deletions src/engine/LocalVocab.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class LocalVocab {
LocalVocabIndex getIndexAndAddIfNotContained(const std::string& word);
LocalVocabIndex getIndexAndAddIfNotContained(std::string&& word);

// Get the index of a word in the local vocabulary, or std::nullopt if it is
// not contained. This is useful for testing.
std::optional<LocalVocabIndex> getIndexOrNullopt(
const std::string& word) const;

// The number of words in the vocabulary.
size_t size() const { return indexesToWordsMap_.size(); }

Expand Down
4 changes: 4 additions & 0 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "engine/NeutralElementOperation.h"
#include "engine/OptionalJoin.h"
#include "engine/OrderBy.h"
#include "engine/Service.h"
#include "engine/Sort.h"
#include "engine/TextOperationWithFilter.h"
#include "engine/TransitivePath.h"
Expand Down Expand Up @@ -191,6 +192,8 @@ void QueryExecutionTree::setOperation(std::shared_ptr<Op> operation) {
_type = DISTINCT;
} else if constexpr (std::is_same_v<Op, Values>) {
_type = VALUES;
} else if constexpr (std::is_same_v<Op, Service>) {
_type = SERVICE;
} else if constexpr (std::is_same_v<Op, TransitivePath>) {
_type = TRANSITIVE_PATH;
} else if constexpr (std::is_same_v<Op, OrderBy>) {
Expand Down Expand Up @@ -230,6 +233,7 @@ template void QueryExecutionTree::setOperation(std::shared_ptr<Bind>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Sort>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Distinct>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Values>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Service>);
template void QueryExecutionTree::setOperation(std::shared_ptr<TransitivePath>);
template void QueryExecutionTree::setOperation(std::shared_ptr<OrderBy>);
template void QueryExecutionTree::setOperation(std::shared_ptr<GroupBy>);
Expand Down
1 change: 1 addition & 0 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class QueryExecutionTree {
MULTICOLUMN_JOIN,
TRANSITIVE_PATH,
VALUES,
SERVICE,
BIND,
MINUS,
NEUTRAL_ELEMENT,
Expand Down
5 changes: 4 additions & 1 deletion src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <engine/OptionalJoin.h>
#include <engine/OrderBy.h>
#include <engine/QueryPlanner.h>
#include <engine/Service.h>
#include <engine/Sort.h>
#include <engine/TextOperationWithFilter.h>
#include <engine/TextOperationWithoutFilter.h>
Expand Down Expand Up @@ -445,7 +446,9 @@ std::vector<QueryPlanner::SubtreePlan> QueryPlanner::optimize(
SubtreePlan valuesPlan =
makeSubtreePlan<Values>(_qec, arg._inlineValues);
joinCandidates(std::vector{std::move(valuesPlan)});

} else if constexpr (std::is_same_v<T, p::Service>) {
SubtreePlan servicePlan = makeSubtreePlan<Service>(_qec, arg);
joinCandidates(std::vector{std::move(servicePlan)});
} else if constexpr (std::is_same_v<T, p::Bind>) {
// The logic of the BIND operation is implemented in the joinCandidates
// lambda. Reason: BIND does not add a new join operation like for the
Expand Down
192 changes: 192 additions & 0 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2022 - 2023, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Hannah Bast (bast@cs.uni-freiburg.de)

#include "engine/Service.h"

#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "engine/CallFixedSize.h"
#include "engine/Values.h"
#include "engine/VariableToColumnMap.h"
#include "parser/TokenizerCtre.h"
#include "parser/TurtleParser.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/http/HttpClient.h"
#include "util/http/HttpUtils.h"

// ____________________________________________________________________________
Service::Service(QueryExecutionContext* qec,
parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction)
: Operation{qec},
parsedServiceClause_{std::move(parsedServiceClause)},
getTsvFunction_{std::move(getTsvFunction)} {}

// ____________________________________________________________________________
std::string Service::asStringImpl(size_t indent) const {
std::ostringstream os;
for (size_t i = 0; i < indent; ++i) {
os << " ";
}
// TODO: This duplicates code in GraphPatternOperation.cpp .
hannahbast marked this conversation as resolved.
Show resolved Hide resolved
os << "SERVICE " << parsedServiceClause_.serviceIri_.toSparql() << " {\n"
<< parsedServiceClause_.prologue_ << "\n"
<< parsedServiceClause_.graphPatternAsString_ << "\n}\n";
return std::move(os).str();
}

// ____________________________________________________________________________
std::string Service::getDescriptor() const {
return absl::StrCat("Service with IRI ",
parsedServiceClause_.serviceIri_.toSparql());
}

// ____________________________________________________________________________
size_t Service::getResultWidth() const {
return parsedServiceClause_.visibleVariables_.size();
}

// ____________________________________________________________________________
VariableToColumnMap Service::computeVariableToColumnMap() const {
VariableToColumnMap map;
const auto& visibleVariables = parsedServiceClause_.visibleVariables_;
for (size_t i = 0; i < visibleVariables.size(); i++) {
map[visibleVariables[i]] = i;
}
return map;
}

// ____________________________________________________________________________
float Service::getMultiplicity([[maybe_unused]] size_t col) {
// TODO: For now, we don't have any information about the multiplicities at
// query planning time, so we just return `1` for each column.
return 1;
}

// ____________________________________________________________________________
size_t Service::getSizeEstimate() {
// TODO: For now, we don't have any information about the result size at
// query planning time, so we just return `100'000`.
return 100'000;
}

// ____________________________________________________________________________
size_t Service::getCostEstimate() {
// TODO: For now, we don't have any information about the cost at query
// planning time, so we just return ten times the estimated size.
return 10 * getSizeEstimate();
}

// ____________________________________________________________________________
void Service::computeResult(ResultTable* result) {
// Get the URL of the SPARQL endpoint.
std::string_view serviceIriString = parsedServiceClause_.serviceIri_.iri();
AD_CONTRACT_CHECK(serviceIriString.starts_with("<") &&
serviceIriString.ends_with(">"));
serviceIriString.remove_prefix(1);
serviceIriString.remove_suffix(1);
ad_utility::httpUtils::Url serviceUrl{serviceIriString};

// Construct the query to be sent to the SPARQL endpoint.
std::string variablesForSelectClause = absl::StrJoin(
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
std::string serviceQuery = absl::StrCat(
parsedServiceClause_.prologue_, "\nSELECT ", variablesForSelectClause,
" WHERE ", parsedServiceClause_.graphPatternAsString_);
LOG(INFO) << "Sending SERVICE query to remote endpoint "
hannahbast marked this conversation as resolved.
Show resolved Hide resolved
<< "(protocol: " << serviceUrl.protocolAsString()
<< ", host: " << serviceUrl.host()
<< ", port: " << serviceUrl.port()
<< ", target: " << serviceUrl.target() << ")" << std::endl
<< serviceQuery << std::endl;

// Send the query to the remote SPARQL endpoint via a POST request and get the
// result as TSV.
//
// TODO: We should support a timeout here.
//
// TODO: We ask for the result as TSV because that is a compact and
// easy-to-parse format. It might not be the best choice regarding robustness
// and portability though. In particular, we are not sure how deterministic
// the TSV output is with respect to the precise encoding of literals.
std::istringstream tsvResult =
getTsvFunction_(serviceUrl, boost::beast::http::verb::post, serviceQuery,
"application/sparql-query", "text/tab-separated-values");

// The first line of the TSV result contains the variable names.
std::string tsvHeaderRow;
if (!std::getline(tsvResult, tsvHeaderRow)) {
throw std::runtime_error(absl::StrCat("Response from SPARQL endpoint ",
serviceUrl.host(), " is empty"));
}
LOG(INFO) << "Header row of TSV result: " << tsvHeaderRow << std::endl;

// Check that the variables in the header row agree with those requested by
// the SERVICE query.
std::string expectedHeaderRow = absl::StrJoin(
parsedServiceClause_.visibleVariables_, "\t", Variable::AbslFormatter);
if (tsvHeaderRow != expectedHeaderRow) {
throw std::runtime_error(absl::StrCat(
"Header row of TSV result for SERVICE query is \"", tsvHeaderRow,
"\", but expected \"", expectedHeaderRow, "\""));
}

// Set basic properties of the result table (the `_resultTypes` don't matter,
// as long as they have the right size, see `ResultTypes.h`).
result->_sortedBy = resultSortedOn();
result->_idTable.setNumColumns(getResultWidth());
result->_resultTypes.resize(parsedServiceClause_.visibleVariables_.size(),
ResultTable::ResultType::KB);

// Fill the result table using the `writeTsvResult` method below.
size_t resWidth = getResultWidth();
CALL_FIXED_SIZE(resWidth, &Service::writeTsvResult, this,
std::move(tsvResult), result);
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeTsvResult(std::istringstream tsvResult,
ResultTable* result) {
IdTableStatic<I> idTable = std::move(result->_idTable).toStatic<I>();
size_t rowIdx = 0;
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());
std::string line;
std::string lastLine;
const size_t numVariables = parsedServiceClause_.visibleVariables_.size();
while (lastLine = std::move(line), std::getline(tsvResult, line)) {
// Print first line.
if (rowIdx == 0) {
LOG(INFO) << "First non-header row of TSV result: " << line << std::endl;
}
std::vector<std::string_view> valueStrings = absl::StrSplit(line, "\t");
if (valueStrings.size() != numVariables) {
throw std::runtime_error(absl::StrCat(
"Number of columns in ", rowIdx + 1, " of TSV result is ",
valueStrings.size(), "but number of variables in header row is ",
numVariables));
}
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < valueStrings.size(); colIdx++) {
TripleComponent tc = TurtleStringParser<TokenizerCtre>::parseTripleObject(
valueStrings[colIdx]);
Id id = std::move(tc).toValueId(getIndex().getVocab(),
result->localVocabNonConst());
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
}
}
rowIdx++;
}
if (idTable.size() > 1) {
LOG(INFO) << "Last non-header row of TSV result: " << lastLine << std::endl;
}
AD_CORRECTNESS_CHECK(rowIdx == idTable.size());
LOG(INFO) << "Number of rows in result: " << idTable.size() << std::endl;
LOG(INFO) << "Number of entries in local vocabulary per column: "
<< absl::StrJoin(numLocalVocabPerColumn, ", ") << std::endl;
result->_idTable = std::move(idTable).toDynamic();
}
91 changes: 91 additions & 0 deletions src/engine/Service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 - 2023, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Hannah Bast (bast@cs.uni-freiburg.de)

#pragma once

#include <functional>

#include "engine/Operation.h"
#include "engine/Values.h"
#include "parser/ParsedQuery.h"
#include "util/http/HttpClient.h"

// The SERVICE operation. Sends a query to the remote endpoint specified by the
// service IRI, gets the result as TSV, parses it, and writes it into a result
// table.
//
// TODO: The current implementation works, but is preliminary in several
// respects:
//
// 1. Reading the result as TSV has potential problems (see comment in
// `computeResult` for details).
//
// 2. There should be a timeout.
//
// 3. A variable in place of the IRI is not yet supported (see comment in
// `computeResult` for details).
//
// 4. The SERVICE is currently executed *after* the query planning. The
// estimates of the result size, cost, and multiplicities are therefore dummy
// values.
//
class Service : public Operation {
public:
// The type of the function used to obtain the results, see below.
using GetTsvFunction = std::function<std::istringstream(
ad_utility::httpUtils::Url, const boost::beast::http::verb&,
std::string_view, std::string_view, std::string_view)>;

private:
// The parsed SERVICE clause.
parsedQuery::Service parsedServiceClause_;

// The function used to obtain the result from the remote endpoint.
GetTsvFunction getTsvFunction_;

public:
// Construct from parsed Service clause.
//
// NOTE: The third argument is the function used to obtain the result from the
// remote endpoint. The default is to use `httpUtils::sendHttpOrHttpsRequest`,
// but in our tests (`ServiceTest`) we use a mock function that does not
// require a running `HttpServer`.
Service(QueryExecutionContext* qec, parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction = sendHttpOrHttpsRequest);

// Methods inherited from base class `Operation`.
std::string getDescriptor() const override;
size_t getResultWidth() const override;
std::vector<size_t> resultSortedOn() const override { return {}; }
float getMultiplicity(size_t col) override;
size_t getSizeEstimate() override;
size_t getCostEstimate() override;
VariableToColumnMap computeVariableToColumnMap() const override;

// Not relevant for SERVICE.
void setTextLimit([[maybe_unused]] size_t limit) override {}

// We know nothing about the result at query planning time.
bool knownEmptyResult() override { return false; }

// A SERVICE clause has no children.
vector<QueryExecutionTree*> getChildren() override { return {}; }

private:
// The string returned by this function is used as cache key.
std::string asStringImpl(size_t indent = 0) const override;

// Compute the result using `getTsvFunction_`.
void computeResult(ResultTable* result) override;

// Write the given TSV result to the given result object. The `I` is the width
// of the result table.
//
// NOTE: This is similar to `Values::writeValues`, except that we have to
// parse TSV here and not a VALUES clause. Note that the only reason that
// `tsvResult` is not `const` here is because the method iterates over the
// `std::istringstream` and thus changes it.
template <size_t I>
void writeTsvResult(std::istringstream tsvResult, ResultTable* result);
};
3 changes: 3 additions & 0 deletions src/parser/GraphPatternOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ void GraphPatternOperation::toString(std::ostringstream& os,
} else if constexpr (std::is_same_v<T, Values>) {
os << "VALUES (" << arg._inlineValues.variablesToString() << ") "
<< arg._inlineValues.valuesToString();
} else if constexpr (std::is_same_v<T, Service>) {
os << "SERVICE " << arg.serviceIri_.toSparql() << " { "
<< arg.graphPatternAsString_ << " }";
} else if constexpr (std::is_same_v<T, BasicGraphPattern>) {
for (size_t i = 0; i + 1 < arg._triples.size(); ++i) {
os << "\n";
Expand Down
Loading