Skip to content

Commit

Permalink
query result cache: recreate AqlItemBlocks from cache before sending
Browse files Browse the repository at this point in the history
  • Loading branch information
ObiWahn committed Sep 8, 2017
1 parent 8299618 commit c9d9c0e
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 106 deletions.
9 changes: 4 additions & 5 deletions arangod/Aql/AqlItemBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,10 @@ AqlItemBlock::AqlItemBlock(ResourceMonitor* resourceMonitor, VPackSlice const sl
dataIterator.next();
VPackSlice highBound = dataIterator.value();
dataIterator.next();

int64_t low =
VelocyPackHelper::getNumericValue<int64_t>(lowBound, 0);
int64_t high =
VelocyPackHelper::getNumericValue<int64_t>(highBound, 0);

int64_t low = VelocyPackHelper::getNumericValue<int64_t>(lowBound, 0);
int64_t high = VelocyPackHelper::getNumericValue<int64_t>(highBound, 0);

AqlValue a(low, high);
try {
setValue(i, column, a);
Expand Down
102 changes: 100 additions & 2 deletions arangod/Aql/AqlQueryResultCache.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

#include "Aql/AqlQueryResultCache.h"
#include "Aql/ExecutionPlan.h"
#include "Aql/QueryCache.h"
#include "Aql/AqlItemBlock.h"

#include "Basics/Result.h"
#include "Cluster/ClusterInfo.h"
Expand Down Expand Up @@ -30,9 +30,107 @@ std::string fakeQueryString(ExecutionPlan const* subPlan){
return result;
}

Result properties(VPackBuilder& result) {
auto vPackToBlock(VPackArrayIterator& iter
,std::unique_ptr<ResourceMonitor>& monitor
,std::size_t atLeast, std::size_t atMost
)
-> std::tuple<Result, std::unique_ptr<AqlItemBlock>, std::size_t>
{
//tries to read `items` items from `iter` into a default inititalized AqlItemBlock `block`
std::tuple<Result,std::unique_ptr<AqlItemBlock>, std::size_t> result{TRI_ERROR_NO_ERROR,nullptr,0};

Result& rv = std::get<0>(result);
std::unique_ptr<AqlItemBlock>& block = std::get<1>(result);
std::size_t& items = std::get<2>(result);

if(atLeast && !iter.valid()){
rv.reset(TRI_ERROR_INTERNAL,"try to get items from invalid iterator");
}

bool skip = (monitor == nullptr);

TRI_ASSERT(iter.value().isArray());
auto max = std::min(atMost,iter.size()-iter.index());

if(!skip){
block.reset(new AqlItemBlock(monitor.get()
,max /*nrItems*/
,iter.value().length() /*nrRegs*/
));
}

std::size_t items_received = 0;
try {
while(items_received < max && iter.valid()) {
if(!skip){
std::size_t reg=0;
for(auto const& value : VPackArrayIterator(iter.value())){
if(value != VPackSlice::nullSlice()){
AqlValue aql(value);
block->setValue(items_received, reg, aql);
}
++reg;
}
}
++items_received;
iter.next();
}
} CATCH_TO_RESULT(rv)
items=items_received;
return result;
}

Result blockToVPack(AqlItemBlock const& block, VPackBuilder& builder, std::size_t regs /*0 if unknown*/){
// this functions writes rows (results) of an AqlItemBlock as Arrays to an
// already open VPackBuilder.
//
// the following translation is applied:
// empty -> nullSlice
// range -> "peter"
// slice (untranslated)


// due to lack of discussion a pragmatic implementation will be used
// this will be slow because the data in aql blocks is stored per column
// and we need to access rows. This is a hard requirement because we need
// to append data. While the itemblock has chosen the columnise layout
// to store the data more effective. This is expected to be slow. Especially
// if we need to create later the "raw" VelocyPack that is used in the remote
// (coordinator) to create a new itemblock.
Result rv;
{
std::size_t n = block.getNrRegs();
if(regs){
if (regs != n ){
TRI_ASSERT(false);
return rv.reset(TRI_ERROR_INTERNAL, "number of registers in AqlItemBlock does not match expected value");
}
} else {
regs = n;
}
}
LOG_DEVEL << "adding AqlItemBlock with regs: " << regs << "and items: " << block.size();
try {
for(std::size_t i = 0; i < block.size() /*number of items in block*/ ; i++){
builder.openArray();
for(std::size_t r=0 ; r < regs /*number of vars in block*/; r++){
AqlValue const& val = block.getValueReference(i,r);
if(val.isRange()){
builder.add(VPackValuePair("peter",5));
} else if(val.isEmpty()) {
builder.add(VPackSlice::nullSlice());
} else {
builder.add(val.slice());
}
}
builder.close();
}
} CATCH_TO_RESULT(rv)
return rv;
}

Result properties(VPackBuilder& result) {
Result rv;
if(ServerState::instance()->isCoordinator()) {
ClusterInfo* ci = ClusterInfo::instance();
if (!ci){
Expand Down
12 changes: 11 additions & 1 deletion arangod/Aql/AqlQueryResultCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

#include <velocypack/Slice.h>
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>

namespace arangodb {
namespace aql {

class AqlItemBlock;
class ExecutionPlan;
class ResourceMonitor;

namespace cache {

Expand All @@ -30,9 +33,16 @@ Result clear();
// the empty string signals that the query is not cacheable
std::string fakeQueryString(ExecutionPlan const*);

// convert AqlItemBlock to and VPackSlice contained in passed builder that contains an array arrays of values
Result blockToVPack(AqlItemBlock const& block, VPackBuilder& builder, std::size_t regs = 0);

auto vPackToBlock(VPackArrayIterator&
,std::unique_ptr<ResourceMonitor>&
,std::size_t atLeast, std::size_t atMost)
-> std::tuple<Result,std::unique_ptr<AqlItemBlock>, std::size_t>;

}
}
}

#endif

2 changes: 1 addition & 1 deletion arangod/Aql/AqlValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ struct AqlValue final {
// note: this is the default constructor and should be as cheap as possible
AqlValue() noexcept {
// construct a slice of type None
_data.internal[0] = '\x00';
_data.internal[0] = '\x00'; //none
setType(AqlValueType::VPACK_INLINE);
}

Expand Down
1 change: 0 additions & 1 deletion arangod/Aql/ExecutionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,6 @@ bool CalculationNode::fakeQueryStringThisNode(std::string& outString) const {
outString.append(_outVariable->name);
arangodb::basics::StringBuffer buff{TRI_UNKNOWN_MEM_ZONE};
_expression->stringify(&buff,true /*quote strings*/);
LOG_DEVEL << "############CN EXPRESSION################ " << std::string(buff.c_str(), buff.length());
outString.append(buff.c_str(),buff.length());
return true;
}
Expand Down
102 changes: 38 additions & 64 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "Aql/AqlItemBlock.h"
#include "Aql/AqlTransaction.h"
#include "Aql/AqlQueryResultCache.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionPlan.h"
Expand Down Expand Up @@ -227,7 +228,6 @@ Query::~Query() {
}

Result Query::resultStart(){
//LOG_DEVEL << "startResult";
Result rv;
if(! _resultBuilder){
VPackOptions options = VPackOptions::Defaults;
Expand All @@ -242,8 +242,8 @@ Result Query::resultStart(){
return rv;
}

Result Query::resultAdd(AqlItemBlock const& value){
LOG_DEVEL << "resultAdd";
Result Query::resultAddComplete(AqlItemBlock const& value){
LOG_DEVEL_IF(_queryCacheId) << _queryCacheId << " query - resultAdd";
Result rv;
auto const resultRegister = _engine->resultRegister();
size_t const n = value.size();
Expand All @@ -258,7 +258,11 @@ Result Query::resultAdd(AqlItemBlock const& value){
return rv;
}

Result Query::resultAdd(AqlItemBlock const& value
Result Query::resultAddPart(AqlItemBlock const& value, std::size_t regs){
return arangodb::aql::cache::blockToVPack(value, *_resultBuilder, regs);
}

Result Query::resultAddComplete(AqlItemBlock const& value
,v8::Isolate* isolate
,QueryResultV8& result
,uint32_t& j //position
Expand Down Expand Up @@ -289,10 +293,8 @@ Result Query::cacheStore(uint64_t queryHash){
TRI_ASSERT(_resultBuilder->isOpenArray());
_resultBuilder->close();
if (_warnings.empty()) {
LOG_DEVEL_IF(_queryCacheId && !_queryString.empty()) << _queryString;
// finally store the generated result in the query cache
LOG_DEVEL << "store query: " << queryHash << " " << _queryString;
LOG_DEVEL << "store query: " << _resultBuilder->slice().toJson();
LOG_DEVEL << queryHash << " query - store: '" << _queryString; // << "' contents: @@@" << _resultBuilder->slice().toJson() << "@@@";
auto result = QueryCache::instance()->store(
_vocbase, queryHash, _queryString,
_resultBuilder, _trx->state()->collectionNames());
Expand All @@ -306,12 +308,13 @@ Result Query::cacheStore(uint64_t queryHash){

Result Query::cacheUse(uint64_t queryHash){
Result rv;
LOG_DEVEL_IF(_queryCacheId) << "try to use query: " << _queryCacheId << _queryString;
LOG_DEVEL_IF(_queryCacheId) << queryHash << " query - cacheUse - try to find query: " << _queryString;
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup( _vocbase, queryHash, _queryString);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);

if (cacheEntry != nullptr) {
LOG_DEVEL_IF(_queryCacheId) << "found query: " << _queryCacheId << _queryString;
LOG_DEVEL_IF(_queryCacheId) << queryHash << " found query - string: " << _queryString;
TRI_ASSERT(queryHash == _queryCacheId || ! _queryCacheId);
// got a result from the query cache
if(ExecContext::CURRENT != nullptr) {
AuthInfo* info = AuthenticationFeature::INSTANCE->authInfo();
Expand All @@ -326,17 +329,19 @@ Result Query::cacheUse(uint64_t queryHash){
_cachedResultBuilder = cacheEntry->_queryResult;
_cachedResultIterator = std::unique_ptr<VPackArrayIterator>( new VPackArrayIterator(_cachedResultBuilder->slice()));
} else {
LOG_DEVEL_IF(_queryCacheId) << "no query found in cache: " << _queryCacheId << _queryString;
LOG_DEVEL_IF(_queryCacheId) << _queryCacheId << " query - useCache - no query found in cache: " << _queryString;
}
return rv;
}

Result Query::cacheGetSome(std::size_t atLeast, std::size_t atMost, VPackBuilder& builder, std::size_t& count){
Result Query::cacheGetOrSkipSomePart(VPackBuilder& builder, bool skip, std::size_t atLeast, std::size_t atMost ){
Result rv;
TRI_ASSERT(builder.isOpenObject());

if(! _cachedResultBuilder || ! _cachedResultIterator){
return rv.reset(TRI_ERROR_INTERNAL, "cached result not available");
}

bool exhausted = cacheExhausted();

if (exhausted) {
Expand All @@ -345,64 +350,27 @@ Result Query::cacheGetSome(std::size_t atLeast, std::size_t atMost, VPackBuilder
return rv;
}

count = 0;

// raw and data
// no ranges expected for data

LOG_DEVEL << "cached Result: " << _cachedResultBuilder->slice().toJson();
{
VPackBuilder dataBuilder;
dataBuilder.openArray();
builder.add("raw", VPackValue(VPackValueType::Array));
builder.add(VPackSlice::nullSlice());
builder.add(VPackSlice::nullSlice());
while(_cachedResultIterator->valid() && count < atMost){
if(_cachedResultIterator->value().isNull()){
dataBuilder.add(VPackValue(0));
} else {
builder.add(_cachedResultIterator->value());
dataBuilder.add(VPackValue(1));
}
count++;
exhausted = false;
_cachedResultIterator->next();
}
builder.close(); // raw arrar
dataBuilder.close(); // data array
builder.add("data", dataBuilder.slice());
std::unique_ptr<ResourceMonitor> monitor(nullptr);
if(!skip){
monitor.reset(new ResourceMonitor{});
}
//LOG_DEVEL_IF(_queryCacheId) << _queryCacheId << " query - cached Result: " << _cachedResultBuilder->slice().toJson();
auto result = arangodb::aql::cache::vPackToBlock(*_cachedResultIterator, monitor, atLeast, atMost);
THROW_ARANGO_EXCEPTION_IF_FAIL(std::get<0>(result));
auto& block = std::get<1>(result);

builder.add("nrItems",VPackValue(count));
builder.add("nrRegs",VPackValue(1)); //always one register
builder.add("error", VPackValue(false));
builder.add("exhausted", VPackValue(exhausted));
builder.add("cached", VPackValue(true));

//if (count < atLeast) {
// LOG_DEVEL << "count is too low " << count;
// rv.reset(TRI_ERROR_BAD_PARAMETER, "you tried to get more items than there are in the iterator");
//}

return rv;
}

Result Query::cacheSkipSome(std::size_t atLeast, std::size_t atMost, std::size_t& count, bool& exhausted){
Result rv;
exhausted = false;
if(! _cachedResultBuilder || ! _cachedResultIterator){
return rv.reset(TRI_ERROR_INTERNAL, "cached result not available");
}

count = 0;
while(_cachedResultIterator->valid() && count <= atMost){
_cachedResultIterator->next();
count++;
if(block){
block->toVelocyPack(nullptr /*trx*/, builder);
} else {
builder.add("error", VPackValue(false));
builder.add("exhausted", VPackValue(exhausted));
}

if (count < atLeast) {
exhausted = true;
}
builder.add("cached", VPackValue(true));

return rv;
}
Expand Down Expand Up @@ -608,6 +576,12 @@ void Query::prepare(QueryRegistry* registry, uint64_t queryHash) {
engine.release();
}

// try {
// auto fake = arangodb::aql::cache::fakeQueryString(plan.get());
// LOG_DEVEL_IF(!_queryString.empty() && !fake.empty()) << "AQL PREPARE: " << _queryString << " " << fake;
// } catch (std::exception const& e) {
// LOG_DEVEL << e.what();
// } catch (...) {}
_plan = std::move(plan);
}

Expand Down Expand Up @@ -751,7 +725,7 @@ QueryResult Query::execute(QueryRegistry* registry) {
res.context = std::make_shared<transaction::StandaloneContext>(_vocbase);

res.warnings = warningsToVelocyPack();
res.result = std::move(_cachedResultBuilder);
res.result = _cachedResultBuilder;
res.cached = true;
return res;
}
Expand Down Expand Up @@ -783,7 +757,7 @@ QueryResult Query::execute(QueryRegistry* registry) {
// iterate over result, return it and store it in query cache
while (nullptr != (value = _engine->getSome(
1, ExecutionBlock::DefaultBatchSize()))) {
rv = resultAdd(*value);
rv = resultAddComplete(*value);
delete value;
value = nullptr;
THROW_ARANGO_EXCEPTION_IF_FAIL(rv);
Expand Down Expand Up @@ -921,7 +895,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
uint32_t j = 0;
while (nullptr != (value = _engine->getSome(
1, ExecutionBlock::DefaultBatchSize()))) {
rv = resultAdd(*value,isolate, result, j, canCache);
rv = resultAddComplete(*value,isolate, result, j, canCache);
delete value;
value = nullptr;
THROW_ARANGO_EXCEPTION_IF_FAIL(rv);
Expand Down
Loading

0 comments on commit c9d9c0e

Please sign in to comment.