Skip to content

Commit

Permalink
send query string to DBServers
Browse files Browse the repository at this point in the history
  • Loading branch information
ObiWahn committed Aug 25, 2017
1 parent 279e9bd commit 667bc35
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions arangod/Aql/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "ExecutionEngine.h"

#include "Aql/AqlQueryResultCache.h"
#include "Aql/BasicBlocks.h"
#include "Aql/CalculationBlock.h"
#include "Aql/ClusterBlocks.h"
Expand Down Expand Up @@ -57,7 +58,7 @@ using namespace arangodb::aql;

// @brief Local struct to create the
// information required to build traverser engines
// on DB servers.
// on DB servers.
struct TraverserEngineShardLists {
explicit TraverserEngineShardLists(size_t length) {
// Make sure they all have a fixed size.
Expand Down Expand Up @@ -257,7 +258,7 @@ struct Instanciator final : public WalkerWorker<ExecutionNode> {
}

engine->addBlock(eb.get());

if (!en->hasParent()) {
// yes. found a new root!
root = eb.get();
Expand Down Expand Up @@ -448,7 +449,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
bool populated;
// in the original plan that needs this engine
};

void includedShards(std::unordered_set<std::string> const& allowed) {
_includedShards = allowed;
}
Expand Down Expand Up @@ -513,7 +514,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
~CoordinatorInstanciator() {}

/// @brief generatePlanForOneShard
void generatePlanForOneShard(VPackBuilder& builder, size_t nr,
void generatePlanForOneShard(VPackBuilder& builder, std::string& fakeQueryString, size_t nr,
EngineInfo* info, QueryId& connectedId,
std::string const& shardId, bool verbose) {
// copy the relevant fragment of the plan for each shard
Expand Down Expand Up @@ -547,21 +548,21 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
plan.root(previous);
plan.setVarUsageComputed();
fakeQueryString = arangodb::aql::cache::fakeQueryString(&plan);
return plan.root()->toVelocyPack(builder, verbose);
}

/// @brief distributePlanToShard, send a single plan to one shard
void distributePlanToShard(arangodb::CoordTransactionID& coordTransactionID,
EngineInfo* info,
QueryId& connectedId, std::string const& shardId,
VPackSlice const& planSlice) {
VPackSlice const& planSlice, std::string const& fakeQueryString) {
Collection* collection = info->getCollection();
// create a JSON representation of the plan
VPackBuilder result;
result.openObject();

result.add("plan", VPackValue(VPackValueType::Object));

VPackBuilder tmp;
query->ast()->variables()->toVelocyPack(tmp);
result.add("initialize", VPackValue(false));
Expand All @@ -576,7 +577,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// mop: this is currently only working for satellites and hardcoded to their structure
for (auto auxiliaryCollection: info->getAuxiliaryCollections()) {
TRI_ASSERT(auxiliaryCollection->isSatellite());

// add the collection
result.openObject();
auto auxiliaryShards = auxiliaryCollection->shardIds();
Expand All @@ -598,7 +599,11 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
result.add(VPackValue("options"));
// the toVelocyPack will open & close the "options" object
query->queryOptions().toVelocyPack(result, true);


if(!fakeQueryString.empty()){
result.add("fakeQueryString", VPackValue(fakeQueryString));
}

result.close();

TRI_ASSERT(result.isClosed());
Expand Down Expand Up @@ -675,8 +680,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
}
}
size_t numShards = shardIds->size();

size_t numShards = shardIds->size();
//LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n";

if (nrok != static_cast<int>(numShards)) {
Expand Down Expand Up @@ -710,11 +715,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// inject the current shard id into the collection
VPackBuilder b;
collection->setCurrentShard(shardId);
generatePlanForOneShard(b, nr++, info, connectedId, shardId, true);
std::string fakeQueryString;
generatePlanForOneShard(b, fakeQueryString, nr++, info, connectedId, shardId, true);

distributePlanToShard(coordTransactionID, info,
connectedId, shardId,
b.slice());
b.slice(), fakeQueryString);
}
collection->resetCurrentShard();
for (auto const& auxiliaryCollection: auxiliaryCollections) {
Expand Down Expand Up @@ -814,17 +820,17 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
"Could not find responsible server for shard " + shardId);
}

// use "server:" instead of "shard:" to send query fragments to
// use "server:" instead of "shard:" to send query fragments to
// the correct servers, even after failover or when a follower drops
// the problem with using the previous shard-based approach was that
// the problem with using the previous shard-based approach was that
// responsibilities for shards may change at runtime.
// however, an AQL query must send all requests for the query to the
// however, an AQL query must send all requests for the query to the
// initially used servers.
// if there is a failover while the query is executing, we must still
// send all following requests to the same servers, and not the newly
// if there is a failover while the query is executing, we must still
// send all following requests to the same servers, and not the newly
// responsible servers.
// otherwise we potentially would try to get data from a query from
// server B while the query was only instanciated on server A.
// otherwise we potentially would try to get data from a query from
// server B while the query was only instantiated on server A.
TRI_ASSERT(!serverList->empty());
auto& leader = (*serverList)[0];
ExecutionBlock* r = new RemoteBlock(engine.get(), remoteNode,
Expand Down Expand Up @@ -1198,8 +1204,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// assign the current node to the current engine
engines[currentEngineId].nodes.emplace_back(en);
}
};
}; // struct CoordinatorInstanciator

/// @brief shutdown, will be called exactly once for the whole query
int ExecutionEngine::shutdown(int errorCode) {
int res = TRI_ERROR_NO_ERROR;
Expand All @@ -1216,7 +1222,7 @@ int ExecutionEngine::shutdown(int errorCode) {
}

res = _root->shutdown(errorCode);

// prevent a duplicate shutdown
_wasShutdown = true;
}
Expand All @@ -1232,7 +1238,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
bool const isCoordinator =
arangodb::ServerState::instance()->isCoordinator(role);
bool const isDBServer = arangodb::ServerState::instance()->isDBServer(role);

TRI_ASSERT(queryRegistry != nullptr);

ExecutionEngine* engine = nullptr;
Expand Down

0 comments on commit 667bc35

Please sign in to comment.