Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for JSON, CSV and TSV when using CONSTRUCT queries #536

Merged
merged 19 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 63 additions & 25 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ void QueryExecutionTree::setVariableColumns(
}

// _____________________________________________________________________________
template <QueryExecutionTree::ExportSubFormat format>
ad_utility::stream_generator::stream_generator
QueryExecutionTree::generateResults(const vector<string>& selectVars,
size_t limit, size_t offset,
char sep) const {
size_t limit, size_t offset) const {
// They may trigger computation (but does not have to).
shared_ptr<const ResultTable> resultTable = getResult();
LOG(DEBUG) << "Resolving strings for finished binary result...\n";
Expand All @@ -115,10 +115,24 @@ QueryExecutionTree::generateResults(const vector<string>& selectVars,

const IdTable& data = resultTable->_idTable;
size_t upperBound = std::min<size_t>(offset + limit, data.size());
return writeTable(sep, offset, upperBound, std::move(validIndices),
std::move(resultTable));
return writeTable<format>(offset, upperBound, std::move(validIndices),
std::move(resultTable));
}

// Instantiate template function for all enum types

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::generateResults<QueryExecutionTree::CSV>(
const vector<string>& selectVars, size_t limit, size_t offset) const;

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::generateResults<QueryExecutionTree::TSV>(
const vector<string>& selectVars, size_t limit, size_t offset) const;

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::generateResults<QueryExecutionTree::BINARY>(
const vector<string>& selectVars, size_t limit, size_t offset) const;

// ___________________________________________________________________________
QueryExecutionTree::ColumnIndicesAndTypes
QueryExecutionTree::selectedVariablesToColumnIndices(
Expand Down Expand Up @@ -385,9 +399,10 @@ nlohmann::json QueryExecutionTree::writeQLeverJsonTable(
return json;
}

// _________________________________________________________________________________________________________
// _____________________________________________________________________________
template <QueryExecutionTree::ExportSubFormat format>
ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
char sep, size_t from, size_t upperBound,
size_t from, size_t upperBound,
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>
validIndices,
std::shared_ptr<const ResultTable> resultTable) const {
Expand All @@ -396,7 +411,7 @@ ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
}
const auto& idTable = resultTable->_idTable;
// special case : binary export of IdTable
if (sep == 'b') {
if constexpr (format == BINARY) {
for (size_t i = from; i < upperBound; ++i) {
for (size_t j = 0; j < validIndices.size(); ++j) {
if (validIndices[j]) {
Expand All @@ -410,6 +425,8 @@ ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
co_return;
}

constexpr char sep = format == TSV ? '\t' : ',';

for (size_t i = from; i < upperBound; ++i) {
for (size_t j = 0; j < validIndices.size(); ++j) {
if (validIndices[j]) {
Expand Down Expand Up @@ -456,22 +473,23 @@ ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(

// _____________________________________________________________________________

cppcoro::generator<QueryExecutionTree::RdfTriple>
cppcoro::generator<QueryExecutionTree::StringTriple>
QueryExecutionTree::generateRdfGraph(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples, size_t limit,
size_t offset, std::shared_ptr<const ResultTable> res) const {
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const {
size_t upperBound = std::min<size_t>(offset + limit, res->_idTable.size());
auto variableColumns = getVariableColumns();
for (size_t i = offset; i < upperBound; i++) {
Context context{i, *res, variableColumns, _qec->getIndex()};
for (const auto& triple : constructTriples) {
auto subject = triple[0].evaluate(context, SUBJECT);
auto verb = triple[1].evaluate(context, VERB);
auto predicate = triple[1].evaluate(context, PREDICATE);
auto object = triple[2].evaluate(context, OBJECT);
if (!subject.has_value() || !verb.has_value() || !object.has_value()) {
if (!subject.has_value() || !predicate.has_value() ||
!object.has_value()) {
continue;
}
co_yield {std::move(subject.value()), std::move(verb.value()),
co_yield {std::move(subject.value()), std::move(predicate.value()),
std::move(object.value())};
}
}
Expand All @@ -480,50 +498,70 @@ QueryExecutionTree::generateRdfGraph(
// _____________________________________________________________________________
ad_utility::stream_generator::stream_generator
QueryExecutionTree::writeRdfGraphTurtle(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples, size_t limit,
size_t offset, std::shared_ptr<const ResultTable> res) const {
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const {
auto generator = generateRdfGraph(constructTriples, limit, offset, res);
for (const auto& triple : generator) {
co_yield triple._subject;
co_yield ' ';
co_yield triple._verb;
co_yield triple._predicate;
co_yield ' ';
co_yield triple._object;
co_yield " .\n";
}
}

// _____________________________________________________________________________
template <QueryExecutionTree::ExportSubFormat format>
ad_utility::stream_generator::stream_generator
QueryExecutionTree::writeRdfGraphSeparatedValues(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples, size_t limit,
size_t offset, std::shared_ptr<const ResultTable> res, char sep) const {
if (sep == 'b') {
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const {
if constexpr (format == BINARY) {
throw std::runtime_error{
"Binary export is not supported for CONSTRUCT queries"};
}
const auto& escapeFunction =
sep == '\t' ? RdfEscaping::escapeForTsv : RdfEscaping::escapeForCsv;
constexpr auto& escapeFunction =
format == TSV ? RdfEscaping::escapeForTsv : RdfEscaping::escapeForCsv;
constexpr char sep = format == TSV ? '\t' : ',';
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
auto generator = generateRdfGraph(constructTriples, limit, offset, res);
for (auto& triple : generator) {
co_yield escapeFunction(std::move(triple._subject));
co_yield sep;
co_yield escapeFunction(std::move(triple._verb));
co_yield escapeFunction(std::move(triple._predicate));
co_yield sep;
co_yield escapeFunction(std::move(triple._object));
co_yield "\n";
}
}

// Instantiate template function for all enum types

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::writeRdfGraphSeparatedValues<QueryExecutionTree::CSV>(
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::writeRdfGraphSeparatedValues<QueryExecutionTree::TSV>(
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

template ad_utility::stream_generator::stream_generator
QueryExecutionTree::writeRdfGraphSeparatedValues<QueryExecutionTree::BINARY>(
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

// _____________________________________________________________________________
nlohmann::json QueryExecutionTree::writeRdfGraphJson(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples, size_t limit,
size_t offset, std::shared_ptr<const ResultTable> res) const {
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const {
auto generator =
generateRdfGraph(constructTriples, limit, offset, std::move(res));
std::vector<std::array<std::string, 3>> jsonArray;
for (auto& triple : generator) {
jsonArray.push_back({std::move(triple._subject), std::move(triple._verb),
jsonArray.push_back({std::move(triple._subject),
std::move(triple._predicate),
std::move(triple._object)});
}
return jsonArray;
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
34 changes: 18 additions & 16 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <unordered_set>

#include "../parser/data/Context.h"
#include "../parser/data/Types.h"
#include "../parser/data/VarOrTerm.h"
#include "../util/Conversions.h"
#include "../util/Generator.h"
Expand Down Expand Up @@ -52,6 +53,8 @@ class QueryExecutionTree {
MINUS = 20
};

enum ExportSubFormat { CSV, TSV, BINARY };
RobinTF marked this conversation as resolved.
Show resolved Hide resolved

void setOperation(OperationType type, std::shared_ptr<Operation> op);

string asString(size_t indent = 0);
Expand Down Expand Up @@ -106,26 +109,25 @@ class QueryExecutionTree {
const std::vector<string>& selectVariables,
const ResultTable& resultTable) const;

template <ExportSubFormat format>
ad_utility::stream_generator::stream_generator generateResults(
const vector<string>& selectVars, size_t limit = MAX_NOF_ROWS_IN_RESULT,
size_t offset = 0, char sep = '\t') const;
size_t offset = 0) const;

// Generate an RDF graph in turtle format for a CONSTRUCT query.
ad_utility::stream_generator::stream_generator writeRdfGraphTurtle(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples,
size_t limit, size_t offset,
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

// Generate an RDF graph in csv/tsv format for a CONSTRUCT query.
template <ExportSubFormat format>
ad_utility::stream_generator::stream_generator writeRdfGraphSeparatedValues(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples,
size_t limit, size_t offset, std::shared_ptr<const ResultTable> res,
char sep = '\t') const;
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

// Generate an RDF graph in json format for a CONSTRUCT query.
nlohmann::json writeRdfGraphJson(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples,
size_t limit, size_t offset,
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;

nlohmann::json writeResultAsQLeverJson(
Expand Down Expand Up @@ -233,13 +235,13 @@ class QueryExecutionTree {
// used inside of coroutines when using srd::array<std::string, 3> instead
// see https://gcc.gnu.org/bugzilla/show_bug.cgi?id=103909 for more
// information
struct RdfTriple {
struct StringTriple {
std::string _subject;
std::string _verb;
std::string _predicate;
std::string _object;
RdfTriple(std::string subject, std::string verb, std::string object)
StringTriple(std::string subject, std::string predicate, std::string object)
: _subject{std::move(subject)},
_verb{std::move(verb)},
_predicate{std::move(predicate)},
_object{std::move(object)} {}
};

Expand All @@ -260,14 +262,14 @@ class QueryExecutionTree {
toStringAndXsdType(Id id, ResultTable::ResultType type,
const ResultTable& resultTable) const;

template <ExportSubFormat format>
ad_utility::stream_generator::stream_generator writeTable(
char sep, size_t from, size_t upperBound,
size_t from, size_t upperBound,
vector<std::optional<pair<size_t, ResultTable::ResultType>>> validIndices,
shared_ptr<const ResultTable> resultTable = nullptr) const;

// Generate an RDF graph for a CONSTRUCT query.
cppcoro::generator<RdfTriple> generateRdfGraph(
const std::vector<std::array<VarOrTerm, 3>>& constructTriples,
size_t limit, size_t offset,
cppcoro::generator<StringTriple> generateRdfGraph(
const Types::Triples& constructTriples, size_t limit, size_t offset,
std::shared_ptr<const ResultTable> res) const;
};
40 changes: 19 additions & 21 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ Awaitable<json> Server::composeResponseQleverJson(
j["selected"] =
query.hasSelectClause()
? query.selectClause()._selectedVariables
: std::vector<std::string>{"?subject", "?verb", "?object"};
: std::vector<std::string>{"?subject", "?predicate", "?object"};

j["runtimeInformation"] = RuntimeInformation::ordered_json(
qet.getRootOperation()->getRuntimeInfo());
Expand Down Expand Up @@ -208,6 +208,11 @@ Awaitable<json> Server::composeResponseQleverJson(
Awaitable<json> Server::composeResponseSparqlJson(
const ParsedQuery& query, const QueryExecutionTree& qet,
ad_utility::Timer& requestTimer, size_t maxSend) const {
if (!query.hasSelectClause()) {
throw std::runtime_error{
"SPARQL-compliant JSON format is not supported for anything other than "
"SELECT queries"};
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
}
auto compute = [&, maxSend] {
shared_ptr<const ResultTable> resultTable = qet.getResult();
requestTimer.stop();
Expand All @@ -216,35 +221,27 @@ Awaitable<json> Server::composeResponseSparqlJson(
std::min(query._limit.value_or(MAX_NOF_ROWS_IN_RESULT), maxSend);
size_t offset = query._offset.value_or(0);
requestTimer.cont();
if (query.hasSelectClause()) {
j = qet.writeResultAsSparqlJson(query.selectClause()._selectedVariables,
limit, offset, std::move(resultTable));
} else if (query.hasConstructClause()) {
j["results"] = qet.writeRdfGraphJson(query.constructClause(), limit,
offset, std::move(resultTable));
} else {
AD_CHECK(false);
}
j = qet.writeResultAsSparqlJson(query.selectClause()._selectedVariables,
limit, offset, std::move(resultTable));
requestTimer.stop();
return j;
};
return computeInNewThread(compute);
}

// _____________________________________________________________________________
template <QueryExecutionTree::ExportSubFormat format>
Awaitable<ad_utility::stream_generator::stream_generator>
Server::composeResponseSepValues(const ParsedQuery& query,
const QueryExecutionTree& qet,
char sep) const {
auto compute = [&, sep] {
const QueryExecutionTree& qet) const {
auto compute = [&] {
size_t limit = query._limit.value_or(MAX_NOF_ROWS_IN_RESULT);
size_t offset = query._offset.value_or(0);
return query.hasSelectClause()
? qet.generateResults(query.selectClause()._selectedVariables,
limit, offset, sep)
: qet.writeRdfGraphSeparatedValues(query.constructClause(),
limit, offset,
qet.getResult(), sep);
? qet.generateResults<format>(
query.selectClause()._selectedVariables, limit, offset)
: qet.writeRdfGraphSeparatedValues<format>(
query.constructClause(), limit, offset, qet.getResult());
};
return computeInNewThread(compute);
}
Expand Down Expand Up @@ -425,21 +422,22 @@ boost::asio::awaitable<void> Server::processQuery(
switch (mediaType.value()) {
case ad_utility::MediaType::csv: {
auto responseGenerator =
co_await composeResponseSepValues(pq, qet, ',');
co_await composeResponseSepValues<QueryExecutionTree::CSV>(pq, qet);
auto response = createOkResponse(std::move(responseGenerator), request,
ad_utility::MediaType::csv);
co_await send(std::move(response));
} break;
case ad_utility::MediaType::tsv: {
auto responseGenerator =
co_await composeResponseSepValues(pq, qet, '\t');
co_await composeResponseSepValues<QueryExecutionTree::TSV>(pq, qet);
auto response = createOkResponse(std::move(responseGenerator), request,
ad_utility::MediaType::tsv);
co_await send(std::move(response));
} break;
case ad_utility::MediaType::octetStream: {
auto responseGenerator =
co_await composeResponseSepValues(pq, qet, 'b');
co_await composeResponseSepValues<QueryExecutionTree::BINARY>(pq,
qet);
auto response = createOkResponse(std::move(responseGenerator), request,
ad_utility::MediaType::octetStream);
co_await send(std::move(response));
Expand Down
3 changes: 2 additions & 1 deletion src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ class Server {
ad_utility::Timer& requestTimer,
size_t maxSend = MAX_NOF_ROWS_IN_RESULT) const;

template <QueryExecutionTree::ExportSubFormat format>
Awaitable<ad_utility::stream_generator::stream_generator>
composeResponseSepValues(const ParsedQuery& query,
const QueryExecutionTree& qet, char sep) const;
const QueryExecutionTree& qet) const;

static json composeExceptionJson(const string& query, const std::exception& e,
ad_utility::Timer& requestTimer);
Expand Down
3 changes: 2 additions & 1 deletion src/parser/ParsedQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "../util/HashMap.h"
#include "../util/StringUtils.h"
#include "ParseException.h"
#include "data/Types.h"
#include "data/VarOrTerm.h"

using std::string;
Expand Down Expand Up @@ -313,7 +314,7 @@ class ParsedQuery {
bool _distinct = false;
};

using ConstructClause = std::vector<std::array<VarOrTerm, 3>>;
using ConstructClause = Types::Triples;

ParsedQuery() = default;

Expand Down