Skip to content

Commit

Permalink
SERVICE clauses now request JSON instead of TSV from the remote endpo…
Browse files Browse the repository at this point in the history
…int (#1361)

The RDF JSON format is a well-specified and lossless serialization of query results. This doesn't hold for th e TSV format, which
might lose information (e.g. about datatypes) and vary between different SPARQL engines.
With this change, more SERVICE queries work as expected, in particular when the result of the SERVICE contains literals that contain escape characters, non-ascii characters, or Literals with datatypes.

Current limitations:
* Blank nodes in the result of the SERVICE query are not yet supported, but as of this commit, this case will cause a human-readable error message that allows the user to work around this limitation.
* We currently have to store and parse the complete JSON response at once, which might take up quite some space. However, A lazy JSON parser as well as a lazy JSON exporter are currently being worked on, s.t. we can also consume the SERVICE response in chunks (as was the case for the old TSV implementation).
  • Loading branch information
UNEXENU committed Jul 4, 2024
1 parent bb7fba2 commit 9e8cc07
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 233 deletions.
190 changes: 108 additions & 82 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,23 @@

#include "engine/CallFixedSize.h"
#include "engine/ExportQueryExecutionTrees.h"
#include "engine/Values.h"
#include "engine/VariableToColumnMap.h"
#include "global/RuntimeParameters.h"
#include "parser/TokenizerCtre.h"
#include "parser/TurtleParser.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/Views.h"
#include "util/http/HttpClient.h"
#include "util/http/HttpUtils.h"

// ____________________________________________________________________________
Service::Service(QueryExecutionContext* qec,
parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction,
GetResultFunction getResultFunction,
std::shared_ptr<QueryExecutionTree> siblingTree)
: Operation{qec},
parsedServiceClause_{std::move(parsedServiceClause)},
getTsvFunction_{std::move(getTsvFunction)},
getResultFunction_{std::move(getResultFunction)},
siblingTree_{std::move(siblingTree)} {}

// ____________________________________________________________________________
Expand Down Expand Up @@ -122,60 +120,101 @@ Result Service::computeResult([[maybe_unused]] bool requestLaziness) {
<< ", target: " << serviceUrl.target() << ")" << std::endl
<< serviceQuery << std::endl;

// Send the query to the remote SPARQL endpoint via a POST request and get the
// result as TSV.
//
// TODO: We ask for the result as TSV because that is a compact and
// easy-to-parse format. It might not be the best choice regarding robustness
// and portability though. In particular, we are not sure how deterministic
// the TSV output is with respect to the precise encoding of literals.
cppcoro::generator<std::span<std::byte>> tsvByteResult =
ad_utility::reChunkAtSeparator(
getTsvFunction_(serviceUrl, cancellationHandle_,
boost::beast::http::verb::post, serviceQuery,
"application/sparql-query",
"text/tab-separated-values"),
static_cast<std::byte>('\n'));

// TODO<GCC12> Use `std::views::transform` instead.
auto tsvResult = [](auto byteResult) -> cppcoro::generator<std::string_view> {
for (std::span<std::byte> bytes : byteResult) {
co_yield std::string_view{reinterpret_cast<const char*>(bytes.data()),
bytes.size()};
cppcoro::generator<std::span<std::byte>> jsonByteResult = getResultFunction_(
serviceUrl, cancellationHandle_, boost::beast::http::verb::post,
serviceQuery, "application/sparql-query",
"application/sparql-results+json");

std::basic_string<char, std::char_traits<char>,
ad_utility::AllocatorWithLimit<char>>
jsonStr(_executionContext->getAllocator());
for (std::span<std::byte> bytes : jsonByteResult) {
jsonStr.append(reinterpret_cast<const char*>(bytes.data()), bytes.size());
checkCancellation();
}

// Parse the received result.
auto throwErrorWithContext = [&jsonStr](std::string_view sv) {
throw std::runtime_error(absl::StrCat(
sv,
" First 100 bytes: ", std::string_view{jsonStr.data()}.substr(0, 100)));
};
std::vector<std::string> resVariables;
std::vector<nlohmann::json> resBindings;
try {
auto jsonResult = nlohmann::json::parse(jsonStr);

if (jsonResult.empty()) {
throw std::runtime_error(absl::StrCat("Response from SPARQL endpoint ",
serviceUrl.host(), " is empty"));
}
}(std::move(tsvByteResult));

// The first line of the TSV result contains the variable names.
auto begin = tsvResult.begin();
if (begin == tsvResult.end()) {
throw std::runtime_error(absl::StrCat("Response from SPARQL endpoint ",
serviceUrl.host(), " is empty"));
resVariables = jsonResult["head"]["vars"].get<std::vector<std::string>>();
resBindings =
jsonResult["results"]["bindings"].get<std::vector<nlohmann::json>>();
} catch (const nlohmann::json::parse_error&) {
throwErrorWithContext("Failed to parse the Service result as JSON.");
} catch (const nlohmann::json::type_error&) {
throwErrorWithContext("JSON result does not have the expected structure.");
}
std::string_view tsvHeaderRow = *begin;
LOG(INFO) << "Header row of TSV result: " << tsvHeaderRow << std::endl;

// Check that the variables in the header row agree with those requested by
// the SERVICE query.
// Check if result header row is expected.
std::string headerRow = absl::StrCat("?", absl::StrJoin(resVariables, " ?"));
std::string expectedHeaderRow = absl::StrJoin(
parsedServiceClause_.visibleVariables_, "\t", Variable::AbslFormatter);
if (tsvHeaderRow != expectedHeaderRow) {
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
if (headerRow != expectedHeaderRow) {
throw std::runtime_error(absl::StrCat(
"Header row of TSV result for SERVICE query is \"", tsvHeaderRow,
"Header row of JSON result for SERVICE query is \"", headerRow,
"\", but expected \"", expectedHeaderRow, "\""));
}

// Set basic properties of the result table.
IdTable idTable{getExecutionContext()->getAllocator()};
idTable.setNumColumns(getResultWidth());
LocalVocab localVocab{};
// Fill the result table using the `writeTsvResult` method below.
// Fill the result table using the `writeJsonResult` method below.
size_t resWidth = getResultWidth();
CALL_FIXED_SIZE(resWidth, &Service::writeTsvResult, this,
std::move(tsvResult), &idTable, &localVocab);
CALL_FIXED_SIZE(resWidth, &Service::writeJsonResult, this, resVariables,
resBindings, &idTable, &localVocab);

return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeJsonResult(const std::vector<std::string>& vars,
const std::vector<nlohmann::json>& bindings,
IdTable* idTablePtr, LocalVocab* localVocab) {
IdTableStatic<I> idTable = std::move(*idTablePtr).toStatic<I>();
checkCancellation();
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());

size_t rowIdx = 0;
for (const auto& binding : bindings) {
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < vars.size(); ++colIdx) {
TripleComponent tc = binding.contains(vars[colIdx])
? bindingToTripleComponent(binding[vars[colIdx]])
: TripleComponent::UNDEF();

Id id = std::move(tc).toValueId(getIndex().getVocab(), *localVocab);
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
}
}
rowIdx++;
checkCancellation();
}

AD_CORRECTNESS_CHECK(rowIdx == idTable.size());
LOG(INFO) << "Number of rows in result: " << idTable.size() << std::endl;
LOG(INFO) << "Number of entries in local vocabulary per column: "
<< absl::StrJoin(numLocalVocabPerColumn, ", ") << std::endl;
*idTablePtr = std::move(idTable).toDynamic();
checkCancellation();
}

// ____________________________________________________________________________
std::optional<std::string> Service::getSiblingValuesClause() const {
if (siblingTree_ == nullptr) {
Expand Down Expand Up @@ -237,48 +276,35 @@ std::optional<std::string> Service::getSiblingValuesClause() const {
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeTsvResult(cppcoro::generator<std::string_view> tsvResult,
IdTable* idTablePtr, LocalVocab* localVocab) {
IdTableStatic<I> idTable = std::move(*idTablePtr).toStatic<I>();
checkCancellation();
size_t rowIdx = 0;
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());
std::string lastLine;
const size_t numVariables = parsedServiceClause_.visibleVariables_.size();
for (std::string_view line : tsvResult) {
// Print first line.
if (rowIdx == 0) {
LOG(INFO) << "First non-header row of TSV result: " << line << std::endl;
}
std::vector<std::string_view> valueStrings = absl::StrSplit(line, "\t");
if (valueStrings.size() != numVariables) {
throw std::runtime_error(absl::StrCat(
"Number of columns in row ", rowIdx + 1, " of TSV result is ",
valueStrings.size(), " but number of variables in header row is ",
numVariables, ". Line: ", line));
}
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < valueStrings.size(); colIdx++) {
TripleComponent tc = TurtleStringParser<TokenizerCtre>::parseTripleObject(
valueStrings[colIdx]);
Id id = std::move(tc).toValueId(getIndex().getVocab(), *localVocab);
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
}
}
rowIdx++;
checkCancellation();
lastLine = line;
TripleComponent Service::bindingToTripleComponent(const nlohmann::json& cell) {
if (!cell.contains("type") || !cell.contains("value")) {
throw std::runtime_error("Missing type or value field in binding.");
}
if (idTable.size() > 1) {
LOG(INFO) << "Last non-header row of TSV result: " << lastLine << std::endl;

const auto type = cell["type"].get<std::string_view>();
const auto value = cell["value"].get<std::string_view>();

TripleComponent tc;
if (type == "literal") {
if (cell.contains("datatype")) {
tc = TurtleParser<TokenizerCtre>::literalAndDatatypeToTripleComponent(
value, TripleComponent::Iri::fromIrirefWithoutBrackets(
cell["datatype"].get<std::string_view>()));
} else if (cell.contains("xml:lang")) {
tc = TripleComponent::Literal::literalWithoutQuotes(
value, cell["xml:lang"].get<std::string>());
} else {
tc = TripleComponent::Literal::literalWithoutQuotes(value);
}
} else if (type == "uri") {
tc = TripleComponent::Iri::fromIrirefWithoutBrackets(value);
} else if (type == "bnode") {
throw std::runtime_error(
"Blank nodes in the result of a SERVICE are currently not supported. "
"For now, consider filtering them out using the ISBLANK function or "
"converting them via the STR function.");
} else {
throw std::runtime_error(absl::StrCat("Type ", type, " is undefined."));
}
AD_CORRECTNESS_CHECK(rowIdx == idTable.size());
LOG(INFO) << "Number of rows in result: " << idTable.size() << std::endl;
LOG(INFO) << "Number of entries in local vocabulary per column: "
<< absl::StrJoin(numLocalVocabPerColumn, ", ") << std::endl;
*idTablePtr = std::move(idTable).toDynamic();
checkCancellation();
return tc;
}
44 changes: 22 additions & 22 deletions src/engine/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,37 @@
#include "util/http/HttpClient.h"

// The SERVICE operation. Sends a query to the remote endpoint specified by the
// service IRI, gets the result as TSV, parses it, and writes it into a result
// service IRI, gets the result as JSON, parses it, and writes it into a result
// table.
//
// TODO: The current implementation works, but is preliminary in several
// respects:
//
// 1. Reading the result as TSV has potential problems (see comment in
// `computeResult` for details).
//
// 2. There should be a timeout.
// 1. There should be a timeout.
//
// 3. A variable in place of the IRI is not yet supported (see comment in
// 2. A variable in place of the IRI is not yet supported (see comment in
// `computeResult` for details).
//
// 4. The SERVICE is currently executed *after* the query planning. The
// 3. The SERVICE is currently executed *after* the query planning. The
// estimates of the result size, cost, and multiplicities are therefore dummy
// values.
//
class Service : public Operation {
public:
// The type of the function used to obtain the results, see below.
using GetTsvFunction = std::function<cppcoro::generator<std::span<std::byte>>(
const ad_utility::httpUtils::Url&,
ad_utility::SharedCancellationHandle handle,
const boost::beast::http::verb&, std::string_view, std::string_view,
std::string_view)>;
using GetResultFunction =
std::function<cppcoro::generator<std::span<std::byte>>(
const ad_utility::httpUtils::Url&,
ad_utility::SharedCancellationHandle handle,
const boost::beast::http::verb&, std::string_view, std::string_view,
std::string_view)>;

private:
// The parsed SERVICE clause.
parsedQuery::Service parsedServiceClause_;

// The function used to obtain the result from the remote endpoint.
GetTsvFunction getTsvFunction_;
GetResultFunction getResultFunction_;

// The siblingTree, used for SERVICE clause optimization.
std::shared_ptr<QueryExecutionTree> siblingTree_;
Expand All @@ -57,7 +55,7 @@ class Service : public Operation {
// but in our tests (`ServiceTest`) we use a mock function that does not
// require a running `HttpServer`.
Service(QueryExecutionContext* qec, parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction = sendHttpOrHttpsRequest,
GetResultFunction getResultFunction = sendHttpOrHttpsRequest,
std::shared_ptr<QueryExecutionTree> siblingTree = nullptr);

// Set the siblingTree (subTree that will later be joined with the Result of
Expand Down Expand Up @@ -91,24 +89,26 @@ class Service : public Operation {
// A SERVICE clause has no children.
vector<QueryExecutionTree*> getChildren() override { return {}; }

// Convert the given binding to TripleComponent.
static TripleComponent bindingToTripleComponent(const nlohmann::json& cell);

private:
// The string returned by this function is used as cache key.
std::string getCacheKeyImpl() const override;

// Compute the result using `getTsvFunction_`.
// Compute the result using `getResultFunction_`.
Result computeResult([[maybe_unused]] bool requestLaziness) override;

// Get a VALUES clause that contains the values of the siblingTree's result.
std::optional<std::string> getSiblingValuesClause() const;

// Write the given TSV result to the given result object. The `I` is the width
// of the result table.
// Write the given JSON result to the given result object. The `I` is the
// width of the result table.
//
// NOTE: This is similar to `Values::writeValues`, except that we have to
// parse TSV here and not a VALUES clause. Note that the only reason that
// `tsvResult` is not `const` here is because the method iterates over the
// input range and thus changes it.
// parse JSON here and not a VALUES clause.
template <size_t I>
void writeTsvResult(cppcoro::generator<std::string_view> tsvResult,
IdTable* idTable, LocalVocab* localVocab);
void writeJsonResult(const std::vector<std::string>& vars,
const std::vector<nlohmann::json>& bindings,
IdTable* idTable, LocalVocab* localVocab);
};
Loading

0 comments on commit 9e8cc07

Please sign in to comment.