Skip to content

Commit

Permalink
Read from followers2: APM-296 (#16499)
Browse files Browse the repository at this point in the history
* More cases to handle outgoing HTTP header.
* More cases for outgoing dirty reads header.
* Change outgoing HTTP header.
* Apply suggestions from code review

Co-authored-by: Jan <jsteemann@users.noreply.github.com>
  • Loading branch information
neunhoef and jsteemann committed Jul 6, 2022
1 parent 3bdaefe commit 5af87c7
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ devel
not-yet-officially committed data. Use at your own risk if performance
is more important than correctness or if you know that data does not
change.
The responses which can contain dirty reads will have set the HTTP header
"x-arango-potential-dirty-read" set to "true".

* Changed HTTP response code for error number 1521 from 500 to 400.

Expand Down
18 changes: 16 additions & 2 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Query::Query(QueryId id, std::shared_ptr<transaction::Context> ctx,
transaction::V8Context::isEmbedded()),
_registeredInV8Context(false),
_queryKilled(false),
_queryHashCalculated(false) {
_queryHashCalculated(false),
_allowDirtyReads(false) {
if (!_transactionContext) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL, "failed to create query transaction context");
Expand Down Expand Up @@ -384,6 +385,11 @@ std::unique_ptr<ExecutionPlan> Query::preparePlan() {
_trx->addHint(
transaction::Hints::Hint::FROM_TOPLEVEL_AQL); // only used on toplevel

// We need to preserve the information about dirty reads, since the
// transaction who knows might be gone before we have produced the
// result:
_allowDirtyReads = _trx->state()->options().allowDirtyReads;

// As soon as we start to instantiate the plan we have to clean it
// up before killing the unique_ptr

Expand Down Expand Up @@ -454,6 +460,9 @@ ExecutionState Query::execute(QueryResult& queryResult) {
queryResult.data = cacheEntry->_queryResult;
queryResult.extra = cacheEntry->_stats;
queryResult.cached = true;
// Note: cached queries were never done with dirty reads,
// so we can always hand out the result here without extra
// HTTP header.
return ExecutionState::DONE;
}
// if no permissions, fall through to regular querying
Expand Down Expand Up @@ -545,8 +554,12 @@ ExecutionState Query::execute(QueryResult& queryResult) {
// must close result array here because it must be passed as a closed
// array to the query cache
queryResult.data->close();
queryResult.allowDirtyReads = _allowDirtyReads;

if (useQueryCache && _warnings.empty()) {
if (useQueryCache && !_allowDirtyReads && _warnings.empty()) {
// Cannot cache dirty reads! Yes, the query cache is not used in
// the cluster anyway, but we leave this condition in here for
// a future in which the query cache could be used in the cluster!
std::unordered_map<std::string, std::string> dataSources =
_queryDataSources;

Expand Down Expand Up @@ -788,6 +801,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate) {
queryResult.v8Data = resArray;
queryResult.context = _trx->transactionContext();
queryResult.extra = std::make_shared<VPackBuilder>();
queryResult.allowDirtyReads = _allowDirtyReads;

if (useQueryCache && _warnings.empty()) {
auto dataSources = _queryDataSources;
Expand Down
7 changes: 7 additions & 0 deletions arangod/Aql/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class Query : public QueryContext, public std::enable_shared_from_this<Query> {
// so user actually has a chance to kill it here.
void debugKillQuery() override;

bool allowDirtyReads() const noexcept { return _allowDirtyReads; }

protected:
/// @brief initializes the query
void init(bool createProfile);
Expand Down Expand Up @@ -356,6 +358,11 @@ class Query : public QueryContext, public std::enable_shared_from_this<Query> {
// retrigger a kill.
bool _wasDebugKilled{false};
#endif

bool _allowDirtyReads; // this is set from the information in the
// transaction, it is valid and remains valid
// once `preparePlan` has run and can be queried
// until the query object is gone!
};

} // namespace aql
Expand Down
5 changes: 4 additions & 1 deletion arangod/Aql/QueryCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ QueryStreamCursor::QueryStreamCursor(std::shared_ptr<arangodb::aql::Query> q,
: Cursor(TRI_NewServerSpecificTick(), batchSize, ttl, /*hasCount*/ false),
_query(std::move(q)),
_queryResultPos(0),
_finalization(false) {
_finalization(false),
_allowDirtyReads(false) {
_query->prepareQuery(SerializationFormat::SHADOWROWS);
_allowDirtyReads = _query->allowDirtyReads(); // is set by prepareQuery!
TRI_IF_FAILURE("QueryStreamCursor::directKillAfterPrepare") {
debugKillQuery();
}

// In all the following ASSERTs it is valid (though unlikely) that the query
// is already killed In the cluster this kill operation will trigger cleanup
// side-effects, such as changing the STATE and commiting / aborting the
Expand Down
16 changes: 16 additions & 0 deletions arangod/Aql/QueryCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class QueryResultCursor final : public arangodb::Cursor {
/// If no extras are set this will return a NONE slice.
arangodb::velocypack::Slice extra() const;

/// @brief Remember, if dirty reads were allowed:
bool allowDirtyReads() const override final {
return _result.allowDirtyReads;
}

private:
DatabaseGuard _guard;
aql::QueryResult _result;
Expand Down Expand Up @@ -110,6 +115,14 @@ class QueryStreamCursor final : public arangodb::Cursor {

std::shared_ptr<transaction::Context> context() const override final;

// The following method returns, if the transaction the query is using
// allows dirty reads (reads from followers).
virtual bool allowDirtyReads() const override final {
// We got this information from the query directly in the constructor,
// when `prepareQuery` has been called:
return _allowDirtyReads;
}

private:
// Writes from _queryResults to builder. Removes copied blocks from
// _queryResults and sets _queryResultPos appropriately. Relies on the caller
Expand All @@ -134,6 +147,9 @@ class QueryStreamCursor final : public arangodb::Cursor {
transaction::Methods::StatusChangeCallback _stateChangeCb;

bool _finalization;

bool _allowDirtyReads; // keep this information when the query is already
// gone.
};

} // namespace aql
Expand Down
13 changes: 10 additions & 3 deletions arangod/Aql/QueryResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ struct QueryResult {
QueryResult(QueryResult&& other) = default;
QueryResult& operator=(QueryResult&& other) = default;

QueryResult() : result(), cached(false) {}
QueryResult() : result(), cached(false), allowDirtyReads(false) {}

explicit QueryResult(Result const& res) : result(res), cached(false) {}
explicit QueryResult(Result const& res)
: result(res), cached(false), allowDirtyReads(false) {}

explicit QueryResult(Result&& res) : result(std::move(res)), cached(false) {}
explicit QueryResult(Result&& res)
: result(std::move(res)), cached(false), allowDirtyReads(false) {}

virtual ~QueryResult() = default;

Expand Down Expand Up @@ -85,6 +87,11 @@ struct QueryResult {
public:
Result result;
bool cached;
bool allowDirtyReads; // indicate that query was done with dirty reads,
// we need to preserve this here, since query results
// can live longer than their query objects and the
// transaction therein and we might still need the
// information to produce the outgoing HTTP header!
std::unordered_set<std::string> bindParameters;
std::vector<std::string> collectionNames;
std::shared_ptr<arangodb::velocypack::Builder> data;
Expand Down
5 changes: 4 additions & 1 deletion arangod/RestHandler/RestBaseHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ using namespace arangodb::rest;

RestBaseHandler::RestBaseHandler(ArangodServer& server, GeneralRequest* request,
GeneralResponse* response)
: RestHandler(server, request, response) {}
: RestHandler(server, request, response), _potentialDirtyReads(false) {}

////////////////////////////////////////////////////////////////////////////////
/// @brief parses the body as VelocyPack
Expand Down Expand Up @@ -94,6 +94,9 @@ void RestBaseHandler::generateResult(
rest::ResponseCode code, Payload&& payload,
std::shared_ptr<transaction::Context> context) {
resetResponse(code);
if (_potentialDirtyReads) {
_response->setHeaderNC(StaticStrings::PotentialDirtyRead, "true");
}
writeResult(std::forward<Payload>(payload), *(context->getVPackOptions()));
}

Expand Down
8 changes: 8 additions & 0 deletions arangod/RestHandler/RestBaseHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,13 @@ class RestBaseHandler : public rest::RestHandler {

template<typename Payload>
void writeResult(Payload&&, arangodb::velocypack::Options const& options);

/// @brief configure if outgoing responses will have the potential
/// dirty reads header set:
void setOutgoingDirtyReadsHeader(bool flag) { _potentialDirtyReads = flag; }

/// @brief Flag, if the outgoing response should have an HTTP header
/// indicating potential dirty reads:
bool _potentialDirtyReads;
};
} // namespace arangodb
9 changes: 9 additions & 0 deletions arangod/RestHandler/RestCursorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Context.h"
#include "Utils/Cursor.h"
#include "Utils/CursorRepository.h"
Expand Down Expand Up @@ -314,6 +315,10 @@ RestStatus RestCursorHandler::handleQueryResult() {
// result is smaller than batchSize and will be returned directly. no need
// to create a cursor

if (_queryResult.allowDirtyReads) {
setOutgoingDirtyReadsHeader(true);
}

VPackOptions options = VPackOptions::Defaults;
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
Expand Down Expand Up @@ -598,6 +603,10 @@ RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code) {
return RestStatus::WAITING;
}

if (_cursor->allowDirtyReads()) {
setOutgoingDirtyReadsHeader(true);
}

if (r.ok()) {
builder.add(StaticStrings::Error, VPackValue(false));
builder.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
Expand Down
8 changes: 8 additions & 0 deletions arangod/RestHandler/RestDocumentHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ RestStatus RestDocumentHandler::readSingleDocument(bool generateBody) {
return RestStatus::DONE;
}

if (_activeTrx->state()->options().allowDirtyReads) {
setOutgoingDirtyReadsHeader(true);
}

return waitForFuture(
_activeTrx->documentAsync(collection, search, options)
.thenValue([=, this,
Expand Down Expand Up @@ -854,6 +858,10 @@ RestStatus RestDocumentHandler::readManyDocuments() {
return RestStatus::DONE;
}

if (_activeTrx->state()->options().allowDirtyReads) {
setOutgoingDirtyReadsHeader(true);
}

return waitForFuture(
_activeTrx->documentAsync(cname, search, opOptions)
.thenValue([=, this](OperationResult opRes) {
Expand Down
8 changes: 7 additions & 1 deletion arangod/RestHandler/RestEdgesHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Aql/QueryString.h"
#include "Aql/Variable.h"
#include "Basics/StringUtils.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Helpers.h"
#include "Transaction/StandaloneContext.h"
#include "Utils/CollectionNameResolver.h"
Expand Down Expand Up @@ -191,7 +192,12 @@ bool RestEdgesHandler::readEdges() {

auto queryResult = ::queryEdges(_vocbase, collectionName, direction,
startVertex, allowDirtyReads);

if (queryResult.allowDirtyReads) {
// Note that this is not necessarily the same as `allowDirtyReads` above!
// This is, because this particular query could be in the context of a
// transcation doing dirty reads!
setOutgoingDirtyReadsHeader(true);
}
if (queryResult.result.fail()) {
if (queryResult.result.is(TRI_ERROR_REQUEST_CANCELED) ||
(queryResult.result.is(TRI_ERROR_QUERY_KILLED))) {
Expand Down
1 change: 1 addition & 0 deletions arangod/RestHandler/RestTransactionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ void RestTransactionHandler::executeBegin() {
// a new transaction. Otherwise, we use the default given by the
// existing transaction.
allowDirtyReads = true;
setOutgoingDirtyReadsHeader(true);
}

// start
Expand Down
3 changes: 3 additions & 0 deletions arangod/RestHandler/RestVocbaseBaseHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ void RestVocbaseBaseHandler::generateDocument(VPackSlice const& input,
if (!rev.empty()) {
_response->setHeaderNC(StaticStrings::Etag, "\"" + rev + "\"");
}
if (_potentialDirtyReads) {
_response->setHeaderNC(StaticStrings::PotentialDirtyRead, "true");
}

try {
_response->setContentType(_request->contentTypeResponse());
Expand Down
2 changes: 2 additions & 0 deletions arangod/Utils/Cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class Cursor {
virtual void setWakeupHandler(std::function<bool()> const& cb) {}
virtual void resetWakeupHandler() {}

virtual bool allowDirtyReads() const { return false; }

protected:
CursorId const _id;
size_t const _batchSize;
Expand Down

0 comments on commit 5af87c7

Please sign in to comment.