Skip to content

Commit

Permalink
Chunked transfer encoding for TSV and CSV export (#513)
Browse files Browse the repository at this point in the history
- TSV and CSV exports are no longer completely materialized in RAM on the qlever server, but are directly
  sent to the client as a `chunked` http response.
  • Loading branch information
RobinTF committed Dec 6, 2021
1 parent 0646719 commit ba9ecf0
Show file tree
Hide file tree
Showing 12 changed files with 680 additions and 38 deletions.
47 changes: 28 additions & 19 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ void QueryExecutionTree::writeResultToStream(std::ostream& out,
const vector<string>& selectVars,
size_t limit, size_t offset,
char sep) const {
auto generator = generateResults(selectVars, limit, offset, sep);
while (generator.hasNext()) {
out << generator.next();
}
}

// _____________________________________________________________________________
ad_utility::stream_generator::stream_generator
QueryExecutionTree::generateResults(const vector<string>& selectVars,
size_t limit, size_t offset,
char sep) const {
// They may trigger computation (but does not have to).
shared_ptr<const ResultTable> res = getResult();
LOG(DEBUG) << "Resolving strings for finished binary result...\n";
Expand All @@ -108,14 +119,12 @@ void QueryExecutionTree::writeResultToStream(std::ostream& out,
}
}
if (validIndices.size() == 0) {
return;
return {};
}

const IdTable& data = res->_data;
size_t upperBound = std::min<size_t>(offset + limit, data.size());
writeTable(data, sep, offset, upperBound, validIndices, out);

LOG(DEBUG) << "Done creating readable result.\n";
return writeTable(data, sep, offset, upperBound, std::move(validIndices));
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -267,26 +276,25 @@ nlohmann::json QueryExecutionTree::writeJsonTable(
}

// _________________________________________________________________________________________________________
void QueryExecutionTree::writeTable(
ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
const IdTable& data, char sep, size_t from, size_t upperBound,
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>&
validIndices,
std::ostream& out) const {
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>
validIndices) const {
shared_ptr<const ResultTable> res = getResult();

// special case : binary export of IdTable
if (sep == 'b') {
for (size_t i = from; i < upperBound; ++i) {
for (size_t j = 0; j < validIndices.size(); ++j) {
if (validIndices[j]) {
const auto& val = *validIndices[j];
out.write(reinterpret_cast<const char*>(&data(i, val.first)),
sizeof(Id));
co_yield std::string_view{
reinterpret_cast<const char*>(&data(i, val.first)), sizeof(Id)};
}
}
}
return;
co_return;
}

for (size_t i = from; i < upperBound; ++i) {
for (size_t j = 0; j < validIndices.size(); ++j) {
if (validIndices[j]) {
Expand All @@ -297,34 +305,35 @@ void QueryExecutionTree::writeTable(
.idToOptionalString(data(i, val.first))
.value_or("");
if (ad_utility::startsWith(entity, VALUE_PREFIX)) {
out << ad_utility::convertIndexWordToValueLiteral(entity);
co_yield ad_utility::convertIndexWordToValueLiteral(entity);
} else {
out << entity;
co_yield entity;
}
break;
}
case ResultTable::ResultType::VERBATIM:
out << data(i, val.first);
co_yield data(i, val.first);
break;
case ResultTable::ResultType::TEXT:
out << _qec->getIndex().getTextExcerpt(data(i, val.first));
co_yield _qec->getIndex().getTextExcerpt(data(i, val.first));
break;
case ResultTable::ResultType::FLOAT: {
float f;
std::memcpy(&f, &data(i, val.first), sizeof(float));
out << f;
co_yield f;
break;
}
case ResultTable::ResultType::LOCAL_VOCAB: {
out << res->idToOptionalString(data(i, val.first)).value_or("");
co_yield res->idToOptionalString(data(i, val.first)).value_or("");
break;
}
default:
AD_THROW(ad_semsearch::Exception::INVALID_PARAMETER_VALUE,
"Cannot deduce output type.");
}
}
out << (j + 1 < validIndices.size() ? sep : '\n');
co_yield(j + 1 < validIndices.size() ? sep : '\n');
}
}
LOG(DEBUG) << "Done creating readable result.\n";
}
12 changes: 8 additions & 4 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "../util/Conversions.h"
#include "../util/HashSet.h"
#include "../util/streamable_generator.h"
#include "./Operation.h"
#include "./QueryExecutionContext.h"

Expand Down Expand Up @@ -90,6 +91,10 @@ class QueryExecutionTree {
size_t limit = MAX_NOF_ROWS_IN_RESULT,
size_t offset = 0, char sep = '\t') const;

ad_utility::stream_generator::stream_generator generateResults(
const vector<string>& selectVars, size_t limit = MAX_NOF_ROWS_IN_RESULT,
size_t offset = 0, char sep = '\t') const;

nlohmann::json writeResultAsJson(const vector<string>& selectVars,
size_t limit, size_t offset) const;

Expand Down Expand Up @@ -200,9 +205,8 @@ class QueryExecutionTree {
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>&
validIndices) const;

void writeTable(
ad_utility::stream_generator::stream_generator writeTable(
const IdTable& data, char sep, size_t from, size_t upperBound,
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>&
validIndices,
std::ostream& out) const;
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>
validIndices) const;
};
20 changes: 8 additions & 12 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,12 @@ json Server::composeResponseJson(const ParsedQuery& query,
}

// _____________________________________________________________________________
string Server::composeResponseSepValues(const ParsedQuery& query,
const QueryExecutionTree& qet,
char sep) {
std::ostringstream os;
ad_utility::stream_generator::stream_generator Server::composeResponseSepValues(
const ParsedQuery& query, const QueryExecutionTree& qet, char sep) {
size_t limit = query._limit.value_or(MAX_NOF_ROWS_IN_RESULT);
size_t offset = query._offset.value_or(0);
qet.writeResultToStream(os, query._selectClause._selectedVariables, limit,
offset, sep);

return os.str();
return qet.generateResults(query._selectClause._selectedVariables, limit,
offset, sep);
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -289,14 +285,14 @@ boost::asio::awaitable<void> Server::processQuery(

if (containsParam("action", "csv_export")) {
// CSV export
auto responseString = composeResponseSepValues(pq, qet, ',');
auto response = createOkResponse(std::move(responseString), request,
auto responseGenerator = composeResponseSepValues(pq, qet, ',');
auto response = createOkResponse(std::move(responseGenerator), request,
ad_utility::MediaType::csv);
co_await send(std::move(response));
} else if (containsParam("action", "tsv_export")) {
// TSV export
auto responseString = composeResponseSepValues(pq, qet, '\t');
auto response = createOkResponse(std::move(responseString), request,
auto responseGenerator = composeResponseSepValues(pq, qet, '\t');
auto response = createOkResponse(std::move(responseGenerator), request,
ad_utility::MediaType::tsv);
co_await send(std::move(response));
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "../parser/SparqlParser.h"
#include "../util/AllocatorWithLimit.h"
#include "../util/HttpServer/HttpServer.h"
#include "../util/HttpServer/streamable_body.h"
#include "../util/Socket.h"
#include "../util/Timer.h"
#include "./QueryExecutionContext.h"
Expand Down Expand Up @@ -98,9 +99,9 @@ class Server {
ad_utility::Timer& requestTimer,
size_t sendMax = MAX_NOF_ROWS_IN_RESULT);

static string composeResponseSepValues(const ParsedQuery& query,
const QueryExecutionTree& qet,
char sep);
static ad_utility::stream_generator::stream_generator
composeResponseSepValues(const ParsedQuery& query,
const QueryExecutionTree& qet, char sep);

static json composeResponseJson(const string& query, const std::exception& e,
ad_utility::Timer& requestTimer);
Expand Down
19 changes: 19 additions & 0 deletions src/util/HttpServer/HttpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

#include "../StringUtils.h"
#include "../TypeTraits.h"
#include "../streamable_generator.h"
#include "./MediaTypes.h"
#include "./UrlParser.h"
#include "./beast.h"
#include "./streamable_body.h"

/// Several utilities for using/customizing the HttpServer template from
/// HttpServer.h
Expand Down Expand Up @@ -71,6 +73,23 @@ static auto createOkResponse(std::string text, const HttpRequest auto& request,
request, mimeType);
}

/// Create a HttpResponse from a stream_generator with status 200 OK.
static auto createOkResponse(
ad_utility::stream_generator::stream_generator&& generator,
const HttpRequest auto& request, MediaType mimeType) {
http::response<ad_utility::httpUtils::httpStreams::streamable_body> response{
http::status::ok, request.version()};
response.set(http::field::content_type, toString(mimeType));
response.keep_alive(request.keep_alive());
response.body() = std::move(generator);
// Set Content-Length and Transfer-Encoding.
// Because ad_utility::httpUtils::httpStreams::streamable_body::size
// is not defined, Content-Length will be cleared and Transfer-Encoding
// will be set to chunked
response.prepare_payload();
return response;
}

/// Create a HttpResponse from a string with status 200 OK and mime type
/// "application/json". Otherwise behaves the same as
/// createHttpResponseFromString.
Expand Down
122 changes: 122 additions & 0 deletions src/util/HttpServer/streamable_body.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2021, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Robin Textor-Falconi (textorr@informatik.uni-freiburg.de)

#pragma once

#include <exception>

#include "../Log.h"
#include "../streamable_generator.h"
#include "./beast.h"

namespace ad_utility::httpUtils::httpStreams {

/**
* A message body represented by a stream_generator. This allows to use a
* generator function to dynamically create a response.
* Example usage:
* http::response<streamable_body> response;
* // generatorFunction returns a ad_utility::stream_generator::stream_generator
* response.body() = generatorFunction();
* response.prepare_payload();
*/
struct streamable_body {
// Algorithm for retrieving buffers when serializing.
class writer;

// The type of the message::body member.
// This determines which type response<streamable_body>::body() returns
using value_type = ad_utility::stream_generator::stream_generator;
};

/**
* Algorithm for retrieving buffers when serializing.
*
* Objects of this type are created during serialization
* to extract the buffers representing the body.
*/
class streamable_body::writer {
value_type& _body;

public:
// The type of buffer sequence returned by `get`.
using const_buffers_type = boost::asio::const_buffer;

/**
* `h` holds the headers of the message we are
* serializing, while `b` holds the body.
*
* The BodyWriter concept allows the writer to choose
* whether to take the message by const reference or
* non-const reference. Depending on the choice, a
* serializer constructed using that body type will
* require the same const or non-const reference to
* construct.
*
* Readers which accept const messages usually allow
* the same body to be serialized by multiple threads
* concurrently, while readers accepting non-const
* messages may only be serialized by one thread at
* a time.
*
* We need the non-const case here, because the one-shot stream_generator
* conceptually can't allow const access.
*/
template <bool isRequest, class Fields>
writer([[maybe_unused]] boost::beast::http::header<isRequest, Fields>& h,
value_type& b)
: _body{b} {}

/**
* This is called before the body is serialized and
* gives the writer a chance to do something that might
* need to return an error code.
*/
void init(boost::system::error_code& ec) noexcept {
// Set the error code to "no error" (default value).
ec = {};
}

/**
* This function is called zero or more times to
* retrieve buffers. A return value of `boost::none`
* means there are no more buffers. Otherwise,
* the contained pair will have the next buffer
* to serialize, and a `bool` indicating whether
* or not there may be additional buffers.
*
* Our strategy is to iterate over the generator to get the data step by step.
*/
boost::optional<std::pair<const_buffers_type, bool>> get(
boost::system::error_code& ec) {
// Return the buffer to the caller.
//
// The second element of the pair indicates whether or
// not there is more data. As long as there is some
// unread bytes, there will be more data. Otherwise,
// we set this bool to `false` so we will not be called
// again.
//
try {
std::string_view view = _body.next();
ec = {};
// we can safely pass away the data() pointer because
// it's just referencing the memory inside the generator's promise
// it won't be modified until the next call to _body.next()
return {{
const_buffers_type{view.data(), view.size()},
_body.hasNext() // `true` if there are more buffers.
}};
} catch (const std::exception& e) {
ec = {EPIPE, boost::system::generic_category()};
LOG(ERROR) << "Failed to generate response:\n" << e.what() << '\n';
return boost::none;
}
}
};

static_assert(boost::beast::http::is_body<streamable_body>::value,
"Body type requirements not met");

} // namespace ad_utility::httpUtils::httpStreams

0 comments on commit ba9ecf0

Please sign in to comment.