Skip to content

Commit

Permalink
Revert "Actually use curl for aql."
Browse files Browse the repository at this point in the history
This reverts commit 1b94d76.
  • Loading branch information
maierlars committed Apr 26, 2024
1 parent 2acf1fc commit 7fd90b7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 38 deletions.
74 changes: 39 additions & 35 deletions arangod/Aql/Executor/RemoteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "Cluster/ServerState.h"
#include "Logger/LogMacros.h"
#include "Network/ConnectionPool.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Rest/CommonDefines.h"
Expand Down Expand Up @@ -359,62 +358,67 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}
network::RequestOptions options;
options.database = _query.vocbase().name();
options.timeout = kDefaultTimeOutSecs;

TRI_IF_FAILURE("RemoteExecutor::impatienceTimeout") {
// Vastly lower the request timeout. This should guarantee
// a network timeout triggered and not continue with the query.
options.timeout = std::chrono::seconds(2);
arangodb::network::EndpointSpec spec;
auto res = network::resolveDestination(nf, _server, spec).get();
if (res != TRI_ERROR_NO_ERROR) {
return Result(res);
}
TRI_ASSERT(!spec.endpoint.empty());

network::Headers header;
auto req = fuerte::createRequest(type, fuerte::ContentType::VPack);
req->header.database = _query.vocbase().name();
req->header.path = absl::StrCat(urlPart, "/", _queryId);
req->addVPack(std::move(body));

// Later, we probably want to set these sensibly:
req->timeout(kDefaultTimeOutSecs);
if (!_distributeId.empty()) {
header.emplace(StaticStrings::AqlShardIdHeader, _distributeId);
req->header.addMeta(StaticStrings::AqlShardIdHeader, _distributeId);
}

network::addSourceHeader(nullptr, *req);

LOG_TOPIC("2713c", DEBUG, Logger::COMMUNICATION)
<< "request to '" << _server << "' '" << fuerte::to_string(type) << " "
<< req->header.path << "'";

bool isFromPool;
network::ConnectionPtr conn =
pool->leaseConnection(spec.endpoint, isFromPool);

_requestInFlight = true;
auto ticket = generateRequestTicket();
TRI_IF_FAILURE("RemoteExecutor::impatienceTimeout") {
// Vastly lower the request timeout. This should guarantee
// a network timeout triggered and not continue with the query.
req->timeout(std::chrono::seconds(2));
}

_engine->getQuery().incHttpRequests(unsigned(1));

network::sendRequest(pool, _server, type,
absl::StrCat(urlPart, "/", _queryId), std::move(body),
options, header)
.thenFinal([this, ticket, sqs = _engine->sharedState()](
futures::Try<network::Response> resp) {
conn->sendRequest(
std::move(req),
[this, ticket, spec, sqs = _engine->sharedState()](
fuerte::Error err, std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
// `this` is only valid as long as sharedState is valid.
// So we must execute this under sharedState's mutex.
sqs->executeAndWakeup([&] {
auto result = basics::catchToResultT([&]() { return &resp.get(); });

std::lock_guard<std::mutex> guard(_communicationMutex);

if (_lastTicket == ticket) {
ScopeGuard inFlightGuard(
[&]() noexcept { _requestInFlight = false; });

if (result.fail()) {
_lastError = result.result();
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
auto err = result.get()->error;
auto res = result.get()->stealResponse();
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
network::EndpointSpec spec;
spec.serverId = _server;
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
_lastResponse = std::move(res);
}

_requestInFlight = false;
return true;
}
return false;
});
});

_engine->getQuery().incHttpRequests(unsigned(1));

return {};
}

Expand Down
3 changes: 0 additions & 3 deletions arangod/Network/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,6 @@ ErrorCode fuerteToArangoErrorCode(fuerte::Error err) {
LOG_TOPIC_IF("abcdf", ERR, Logger::COMMUNICATION,
err != fuerte::Error::NoError)
<< "communication error: '" << fuerte::to_string(err) << "'";
if (err == fuerte::Error::ConnectionClosed) {
CrashHandler::logBacktrace();
}
return toArangoErrorCodeInternal(err);
}

Expand Down

0 comments on commit 7fd90b7

Please sign in to comment.