Skip to content

Commit

Permalink
Various improvements to the Server. (#529)
Browse files Browse the repository at this point in the history
* Limit the number of simulataneous query processings.
  Previously, the number of server connections was limited, which often lead to unnecessary blocking.
* First open the socket, then initialize the server (fail fast in case the port is already in use).
* Compute results only once, even if they don't fit in the cache.
* Fixed a dangling reference in the TSV/CSV export.
  • Loading branch information
joka921 committed Jan 3, 2022
1 parent 4c16ab5 commit 49dabf9
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 174 deletions.
3 changes: 1 addition & 2 deletions src/ServerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ int main(int argc, char** argv) {

try {
Server server(port, numThreads, memLimit);
server.initialize(index, text, usePatterns, enablePatternTrick);
server.run();
server.run(index, text, usePatterns, enablePatternTrick);
} catch (const std::exception& e) {
// This code should never be reached as all exceptions should be handled
// within server.run()
Expand Down
75 changes: 46 additions & 29 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ QueryExecutionTree::generateResults(const vector<string>& selectVars,
size_t limit, size_t offset,
char sep) const {
// They may trigger computation (but does not have to).
shared_ptr<const ResultTable> res = getResult();
shared_ptr<const ResultTable> resultTable = getResult();
LOG(DEBUG) << "Resolving strings for finished binary result...\n";
vector<std::optional<pair<size_t, ResultTable::ResultType>>> validIndices;
for (auto var : selectVars) {
Expand All @@ -102,18 +102,19 @@ QueryExecutionTree::generateResults(const vector<string>& selectVars,
auto it = getVariableColumns().find(var);
if (it != getVariableColumns().end()) {
validIndices.push_back(pair<size_t, ResultTable::ResultType>(
it->second, res->getResultType(it->second)));
it->second, resultTable->getResultType(it->second)));
} else {
validIndices.push_back(std::nullopt);
}
}
if (validIndices.size() == 0) {
if (validIndices.empty()) {
return {};
}

const IdTable& data = res->_idTable;
const IdTable& data = resultTable->_idTable;
size_t upperBound = std::min<size_t>(offset + limit, data.size());
return writeTable(data, sep, offset, upperBound, std::move(validIndices));
return writeTable(sep, offset, upperBound, std::move(validIndices),
std::move(resultTable));
}

// ___________________________________________________________________________
Expand All @@ -139,38 +140,45 @@ QueryExecutionTree::selectedVariablesToColumnIndices(

// _____________________________________________________________________________
nlohmann::json QueryExecutionTree::writeResultAsQLeverJson(
const vector<string>& selectVars, size_t limit, size_t offset) const {
const vector<string>& selectVars, size_t limit, size_t offset,
shared_ptr<const ResultTable> resultTable) const {
// They may trigger computation (but does not have to).
shared_ptr<const ResultTable> res = getResult();
if (!resultTable) {
resultTable = getResult();
}
LOG(DEBUG) << "Resolving strings for finished binary result...\n";
ColumnIndicesAndTypes validIndices =
selectedVariablesToColumnIndices(selectVars, *res);
if (validIndices.size() == 0) {
return nlohmann::json(std::vector<std::string>());
selectedVariablesToColumnIndices(selectVars, *resultTable);
if (validIndices.empty()) {
return {std::vector<std::string>()};
}

return writeQLeverJsonTable(res->_idTable, offset, limit, validIndices);
return writeQLeverJsonTable(offset, limit, validIndices,
std::move(resultTable));
}

// _____________________________________________________________________________
nlohmann::json QueryExecutionTree::writeResultAsSparqlJson(
const vector<string>& selectVars, size_t limit, size_t offset) const {
const vector<string>& selectVars, size_t limit, size_t offset,
shared_ptr<const ResultTable> resultTable) const {
using nlohmann::json;

// This might trigger the actual query processing.
shared_ptr<const ResultTable> queryResult = getResult();
if (!resultTable) {
resultTable = getResult();
}
LOG(DEBUG) << "Finished computing the query result in the ID space. "
"Resolving strings in result...\n";
ColumnIndicesAndTypes columns =
selectedVariablesToColumnIndices(selectVars, *queryResult);
selectedVariablesToColumnIndices(selectVars, *resultTable);

std::erase(columns, std::nullopt);

if (columns.empty()) {
return {std::vector<std::string>()};
}

const IdTable& idTable = queryResult->_idTable;
const IdTable& idTable = resultTable->_idTable;

json result;
result["head"]["vars"] = selectVars;
Expand Down Expand Up @@ -222,7 +230,7 @@ nlohmann::json QueryExecutionTree::writeResultAsSparqlJson(
for (const auto& column : columns) {
const auto& currentId = idTable(rowIndex, column->_columnIndex);
const auto& optionalValue =
toStringAndXsdType(currentId, column->_resultType, *queryResult);
toStringAndXsdType(currentId, column->_resultType, *resultTable);
if (!optionalValue.has_value()) {
continue;
}
Expand Down Expand Up @@ -339,9 +347,12 @@ QueryExecutionTree::toStringAndXsdType(Id id, ResultTable::ResultType type,

// __________________________________________________________________________________________________________
nlohmann::json QueryExecutionTree::writeQLeverJsonTable(
const IdTable& data, size_t from, size_t limit,
const ColumnIndicesAndTypes& columns) const {
shared_ptr<const ResultTable> res = getResult();
size_t from, size_t limit, const ColumnIndicesAndTypes& columns,
shared_ptr<const ResultTable> resultTable) const {
if (!resultTable) {
resultTable = getResult();
}
const IdTable& data = resultTable->_idTable;
nlohmann::json json = nlohmann::json::parse("[]");

const auto upperBound = std::min(data.size(), limit + from);
Expand All @@ -356,7 +367,7 @@ nlohmann::json QueryExecutionTree::writeQLeverJsonTable(
}
const auto& currentId = data(rowIndex, opt->_columnIndex);
const auto& optionalStringAndXsdType =
toStringAndXsdType(currentId, opt->_resultType, *res);
toStringAndXsdType(currentId, opt->_resultType, *resultTable);
if (!optionalStringAndXsdType.has_value()) {
row.emplace_back(nullptr);
continue;
Expand All @@ -374,18 +385,23 @@ nlohmann::json QueryExecutionTree::writeQLeverJsonTable(

// _________________________________________________________________________________________________________
ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
const IdTable& data, char sep, size_t from, size_t upperBound,
char sep, size_t from, size_t upperBound,
const vector<std::optional<pair<size_t, ResultTable::ResultType>>>
validIndices) const {
shared_ptr<const ResultTable> res = getResult();
validIndices,
std::shared_ptr<const ResultTable> resultTable) const {
if (!resultTable) {
resultTable = getResult();
}
const auto& idTable = resultTable->_idTable;
// special case : binary export of IdTable
if (sep == 'b') {
for (size_t i = from; i < upperBound; ++i) {
for (size_t j = 0; j < validIndices.size(); ++j) {
if (validIndices[j]) {
const auto& val = *validIndices[j];
co_yield std::string_view{
reinterpret_cast<const char*>(&data(i, val.first)), sizeof(Id)};
reinterpret_cast<const char*>(&idTable(i, val.first)),
sizeof(Id)};
}
}
}
Expand All @@ -399,7 +415,7 @@ ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
switch (val.second) {
case ResultTable::ResultType::KB: {
string entity = _qec->getIndex()
.idToOptionalString(data(i, val.first))
.idToOptionalString(idTable(i, val.first))
.value_or("");
if (ad_utility::startsWith(entity, VALUE_PREFIX)) {
co_yield ad_utility::convertIndexWordToValueLiteral(entity);
Expand All @@ -409,19 +425,20 @@ ad_utility::stream_generator::stream_generator QueryExecutionTree::writeTable(
break;
}
case ResultTable::ResultType::VERBATIM:
co_yield data(i, val.first);
co_yield idTable(i, val.first);
break;
case ResultTable::ResultType::TEXT:
co_yield _qec->getIndex().getTextExcerpt(data(i, val.first));
co_yield _qec->getIndex().getTextExcerpt(idTable(i, val.first));
break;
case ResultTable::ResultType::FLOAT: {
float f;
std::memcpy(&f, &data(i, val.first), sizeof(float));
std::memcpy(&f, &idTable(i, val.first), sizeof(float));
co_yield f;
break;
}
case ResultTable::ResultType::LOCAL_VOCAB: {
co_yield res->idToOptionalString(data(i, val.first)).value_or("");
co_yield resultTable->idToOptionalString(idTable(i, val.first))
.value_or("");
break;
}
default:
Expand Down
20 changes: 11 additions & 9 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,13 @@ class QueryExecutionTree {
const vector<string>& selectVars, size_t limit = MAX_NOF_ROWS_IN_RESULT,
size_t offset = 0, char sep = '\t') const;

nlohmann::json writeResultAsQLeverJson(const vector<string>& selectVars,
size_t limit, size_t offset) const;
nlohmann::json writeResultAsQLeverJson(
const vector<string>& selectVars, size_t limit, size_t offset,
shared_ptr<const ResultTable> resultTable = nullptr) const;

nlohmann::json writeResultAsSparqlJson(const vector<string>& selectVars,
size_t limit, size_t offset) const;
nlohmann::json writeResultAsSparqlJson(
const vector<string>& selectVars, size_t limit, size_t offset,
shared_ptr<const ResultTable> preComputedResult = nullptr) const;

const std::vector<size_t>& resultSortedOn() const {
return _rootOperation->getResultSortedOn();
Expand Down Expand Up @@ -223,15 +225,15 @@ class QueryExecutionTree {
* @return a 2D-Json array corresponding to the IdTable given the arguments
*/
nlohmann::json writeQLeverJsonTable(
const IdTable& data, size_t from, size_t limit,
const ColumnIndicesAndTypes& columns) const;
size_t from, size_t limit, const ColumnIndicesAndTypes& columns,
std::shared_ptr<const ResultTable> resultTable = nullptr) const;

[[nodiscard]] std::optional<std::pair<std::string, const char*>>
toStringAndXsdType(Id id, ResultTable::ResultType type,
const ResultTable& resultTable) const;

ad_utility::stream_generator::stream_generator writeTable(
const IdTable& data, char sep, size_t from, size_t upperBound,
vector<std::optional<pair<size_t, ResultTable::ResultType>>> validIndices)
const;
char sep, size_t from, size_t upperBound,
vector<std::optional<pair<size_t, ResultTable::ResultType>>> validIndices,
shared_ptr<const ResultTable> resultTable = nullptr) const;
};

0 comments on commit 49dabf9

Please sign in to comment.