Skip to content

Commit

Permalink
ENG-12945: AdHocLarge system procedure (for internal use) (#4889)
Browse files Browse the repository at this point in the history
* Create new sysproc @AdHocLarge that sets the large bit in JSON plan (planner changes only)
* EE support for creating large temp tables when this bit is set
* Assorted tests
  • Loading branch information
Christopher M. Wolff committed Oct 5, 2017
1 parent 871e5f4 commit 4ea9b74
Show file tree
Hide file tree
Showing 111 changed files with 1,683 additions and 523 deletions.
1 change: 1 addition & 0 deletions build.py
Expand Up @@ -497,6 +497,7 @@
CTX.TESTS['execution'] = """
add_drop_table
engine_test
ExecutorVectorTest
FragmentManagerTest
"""
if whichtests in ("${eetestsuite}", "executors"):
Expand Down
90 changes: 71 additions & 19 deletions src/ee/common/LargeTempTableBlockCache.cpp
Expand Up @@ -15,6 +15,8 @@
* along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
*/

#include <sstream>

#include "LargeTempTableBlockCache.h"

#include "common/Topend.h"
Expand All @@ -24,14 +26,19 @@

namespace voltdb {

LargeTempTableBlockCache::LargeTempTableBlockCache()
: m_blockList()
LargeTempTableBlockCache::LargeTempTableBlockCache(int64_t maxCacheSizeInBytes)
: m_maxCacheSizeInBytes(maxCacheSizeInBytes)
, m_blockList()
, m_idToBlockMap()
, m_nextId(0)
, m_totalAllocatedBytes(0)
{
}

LargeTempTableBlockCache::~LargeTempTableBlockCache() {
assert (m_blockList.size() == 0);
}

std::pair<int64_t, LargeTempTableBlock*> LargeTempTableBlockCache::getEmptyBlock(LargeTempTable* ltt) {
int64_t id = getNextId();

Expand Down Expand Up @@ -80,6 +87,15 @@ void LargeTempTableBlockCache::unpinBlock(int64_t blockId) {
(*(mapIt->second))->unpin();
}

bool LargeTempTableBlockCache::blockIsPinned(int64_t blockId) const {
auto mapIt = m_idToBlockMap.find(blockId);
if (mapIt == m_idToBlockMap.end()) {
throwDynamicSQLException("Request for unknown block ID in LargeTempTableBlockCache");
}

return (*(mapIt->second))->isPinned();
}

void LargeTempTableBlockCache::releaseBlock(int64_t blockId) {
auto mapIt = m_idToBlockMap.find(blockId);
if (mapIt == m_idToBlockMap.end()) {
Expand All @@ -95,48 +111,84 @@ void LargeTempTableBlockCache::releaseBlock(int64_t blockId) {
}

m_idToBlockMap.erase(blockId);
// Block list contains unique_ptrs so erasing will invoke
// destructors and free resources.
m_blockList.erase(it);
}

bool LargeTempTableBlockCache::storeABlock() {
void LargeTempTableBlockCache::releaseAllBlocks() {
if (! m_blockList.empty()) {
m_idToBlockMap.clear();

Topend* topend = ExecutorContext::getExecutorContext()->getTopend();
BOOST_FOREACH (auto& block, m_blockList) {
if (block->isPinned()) {
block->unpin();
}

if (! block->isResident()) {
bool rc = topend->releaseLargeTempTableBlock(block->id());
assert(rc);
}
}
m_blockList.clear();
}
}

void LargeTempTableBlockCache::storeABlock() {

assert(m_blockList.size() > 0);
auto it = m_blockList.end();
do {
--it;
LargeTempTableBlock *block = it->get();
if (!block->isPinned() && block->isResident()) {
Topend* topend = ExecutorContext::getExecutorContext()->getTopend();
return topend->storeLargeTempTableBlock(block->id(), block);
bool success = topend->storeLargeTempTableBlock(block->id(), block);
if (! success) {
throwDynamicSQLException("Topend failed to store LTT block");
}
return;
}

}
while (it != m_blockList.begin());

return false;
throwDynamicSQLException("Failed to find unpinned LTT block to make space");
}

void LargeTempTableBlockCache::increaseAllocatedMemory(int64_t numBytes) {
m_totalAllocatedBytes += numBytes;

if (m_totalAllocatedBytes > CACHE_SIZE_IN_BYTES()) {
// Okay, we've increased the memory footprint over the size of the
// cache. Clear out some space.
while (m_totalAllocatedBytes > CACHE_SIZE_IN_BYTES()) {
int64_t bytesBefore = m_totalAllocatedBytes;
if (!storeABlock()) {
throwDynamicSQLException("Could not store a large temp table block to make space in cache");
}

assert(bytesBefore > m_totalAllocatedBytes);
}
// If we've increased the memory footprint over the size of the
// cache, clear out some space.
while (m_totalAllocatedBytes > maxCacheSizeInBytes()) {
int64_t bytesBefore = m_totalAllocatedBytes;
storeABlock();
assert(bytesBefore > m_totalAllocatedBytes);
}

}

void LargeTempTableBlockCache::decreaseAllocatedMemory(int64_t numBytes) {
assert(numBytes <= m_totalAllocatedBytes);
m_totalAllocatedBytes -= numBytes;
}

std::string LargeTempTableBlockCache::debug() const {
std::ostringstream oss;
oss << "LargeTempTableBlockCache:\n";
BOOST_FOREACH(auto& block, m_blockList) {
bool isResident = block->isResident();
oss << " Block id " << block->id() << ": "
<< (block->isPinned() ? "" : "un") << "pinned, "
<< (isResident ? "" : "not ") << "resident\n";
oss << " Tuple count: " << block->activeTupleCount() << "\n";
oss << " Using " << block->getAllocatedMemory() << " bytes \n";
oss << " " << block->getAllocatedTupleMemory() << " bytes for tuple storage\n";
oss << " " << block->getAllocatedPoolMemory() << " bytes for pool storage\n";
}

oss << "Total bytes used: " << allocatedMemory() << "\n";

return oss.str();
}

} // end namespace voltdb
43 changes: 35 additions & 8 deletions src/ee/common/LargeTempTableBlockCache.h
Expand Up @@ -54,7 +54,12 @@ class LargeTempTableBlockCache {
* Construct an instance of a cache containing zero large temp
* table blocks.
*/
LargeTempTableBlockCache();
LargeTempTableBlockCache(int64_t maxCacheSizeInBytes);

/**
* A do-nothing destructor
*/
~LargeTempTableBlockCache();

/** Get a new empty block for the supplied table. Returns the id
of the new block and the new block. */
Expand All @@ -64,6 +69,9 @@ class LargeTempTableBlockCache {
store to disk when the cache becomes full. */
void unpinBlock(int64_t blockId);

/** Returns true if the block is pinned. */
bool blockIsPinned(int64_t blockId) const;

/** Fetch (and pin) the specified block, loading it from disk if
necessary. */
LargeTempTableBlock* fetchBlock(int64_t blockId);
Expand All @@ -81,6 +89,8 @@ class LargeTempTableBlockCache {
/** Called from LargeTempTableBlock destructor. */
void decreaseAllocatedMemory(int64_t numBytes);

/** The number of pinned (blocks currently being inserted into or
scanned) entries in the cache */
size_t numPinnedEntries() const {
size_t cnt = 0;
BOOST_FOREACH(auto &block, m_blockList) {
Expand All @@ -92,6 +102,8 @@ class LargeTempTableBlockCache {
return cnt;
}

/** The number of blocks that are cached in memory (as opposed to
stored on disk) */
size_t residentBlockCount() const {
size_t count = 0;
BOOST_FOREACH(auto &block, m_blockList) {
Expand All @@ -103,22 +115,35 @@ class LargeTempTableBlockCache {
return count;
}

/** The total number of large temp table blocks, both cached in
memory and stored on disk */
size_t totalBlockCount() const {
return m_blockList.size();
}

/** The number of bytes (in large temp table tuple blocks and
associated string pool data) in blocks that are cached in
memory */
int64_t allocatedMemory() const {
return m_totalAllocatedBytes;
}

private:

// Set to be modifiable here for testing purposes
static int64_t& CACHE_SIZE_IN_BYTES() {
static int64_t cacheSizeInBytes = 50 * 1024 * 1024; // 50 MB
return cacheSizeInBytes;
/** The max size that the cache can grow to. If we insert a tuple
or allocate a new block and exceed this amount, we need to
store an unpinned block to disk. */
int64_t maxCacheSizeInBytes() const {
return m_maxCacheSizeInBytes;
}

/** Release all large temp table blocks (both resident and stored
on disk) */
void releaseAllBlocks();

/** Return a string containing useful debug information */
std::string debug() const;

private:

// This at some point may need to be unique across the entire process
int64_t getNextId() {
int64_t nextId = m_nextId;
Expand All @@ -127,7 +152,9 @@ class LargeTempTableBlockCache {
}

// Stores the least recently used block to disk.
bool storeABlock();
void storeABlock();

const int64_t m_maxCacheSizeInBytes;

typedef std::list<std::unique_ptr<LargeTempTableBlock>> BlockList;

Expand Down
7 changes: 4 additions & 3 deletions src/ee/common/executorcontext.cpp
Expand Up @@ -99,7 +99,7 @@ ExecutorContext::ExecutorContext(int64_t siteId,
m_uniqueId(0),
m_currentTxnTimestamp(0),
m_currentDRTimestamp(0),
m_lttBlockCache(),
m_lttBlockCache(engine ? engine->tempTableMemoryLimit() : 50*1024*1024), // engine may be null in unit tests
m_traceOn(false),
m_lastCommittedSpHandle(0),
m_siteId(siteId),
Expand All @@ -114,9 +114,10 @@ ExecutorContext::ExecutorContext(int64_t siteId,
}

ExecutorContext::~ExecutorContext() {
m_lttBlockCache.releaseAllBlocks();

// currently does not own any of its pointers

// ... or none, now that the one is going away.
VOLT_DEBUG("De-installing EC(%ld)", (long)this);

pthread_setspecific(static_key, NULL);
Expand Down Expand Up @@ -214,7 +215,7 @@ UniqueTempTableResult ExecutorContext::executeExecutors(const std::vector<Abstra
executorList[i]->cleanupTempOutputTable();
}

TempTable *result = executorList[ttl-1]->getPlanNode()->getTempOutputTable();
AbstractTempTable *result = executorList[ttl-1]->getPlanNode()->getTempOutputTable();
return UniqueTempTableResult(result);
}

Expand Down
2 changes: 1 addition & 1 deletion src/ee/execution/ExecutorVector.cpp
Expand Up @@ -169,7 +169,7 @@ void ExecutorVector::initPlanNode(VoltDBEngine* engine, AbstractPlanNode* node)
}

// Now use the plannode to initialize the executor for execution later on
if (executor->init(engine, &m_limits)) {
if (executor->init(engine, *this)) {
return;
}

Expand Down
14 changes: 10 additions & 4 deletions src/ee/execution/ExecutorVector.h
Expand Up @@ -79,13 +79,16 @@ class ExecutorVector {
static boost::shared_ptr<ExecutorVector> fromCatalogStatement(VoltDBEngine* engine,
catalog::Statement *stmt);

/** Build the list of executors from its plan node fragment */
void init(VoltDBEngine* engine);

/** Accessor function to satisfy boost::multi_index::const_mem_fun template. */
int64_t getFragId() const { return m_fragId; }

const TempTableLimits& limits() const { return m_limits; }
TempTableLimits* limits() const {
return const_cast<TempTableLimits*>(&m_limits);
}

bool isLargeQuery() const {
return m_fragment->isLargeQuery();
}

/** Return a std::string with helpful info about this object. */
std::string debug() const;
Expand Down Expand Up @@ -127,6 +130,9 @@ class ExecutorVector {
, m_fragment(fragment)
{ }

/** Build the list of executors from its plan node fragment */
void init(VoltDBEngine* engine);

void initPlanNode(VoltDBEngine* engine, AbstractPlanNode* node);

const int64_t m_fragId;
Expand Down
2 changes: 0 additions & 2 deletions src/ee/execution/JNITopend.h
Expand Up @@ -67,7 +67,6 @@ class JNITopend : public Topend {
std::string decodeBase64AndDecompress(const std::string& buffer);

bool storeLargeTempTableBlock(int64_t blockId, LargeTempTableBlock* block) {
throwFatalException("unimplemented method \"%s\" called!", __FUNCTION__);
return false;
}

Expand All @@ -77,7 +76,6 @@ class JNITopend : public Topend {
}

bool releaseLargeTempTableBlock(int64_t blockId) {
throwFatalException("unimplemented method \"%s\" called!", __FUNCTION__);
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ee/execution/ProgressMonitorProxy.h
Expand Up @@ -31,7 +31,7 @@ class ProgressMonitorProxy {
executorContext->pullTuplesRemainingUntilProgressReport(exec->getPlanNode()->getPlanNodeType()))
, m_countDown(m_tuplesRemainingUntilReport)
{
const TempTable *tt = exec->getTempOutputTable();
const AbstractTempTable *tt = exec->getTempOutputTable();
if (tt != NULL) {
m_limits = tt->getTempTableLimits();
}
Expand Down
3 changes: 2 additions & 1 deletion src/ee/execution/VoltDBEngine.cpp
Expand Up @@ -92,6 +92,7 @@
#include "storage/streamedtable.h"
#include "storage/TableCatalogDelegate.hpp"
#include "storage/tablefactory.h"
#include "storage/temptable.h"

#include "org_voltdb_jni_ExecutionEngine.h" // to use static values

Expand Down Expand Up @@ -2245,7 +2246,7 @@ void VoltDBEngine::addToTuplesModified(int64_t amount) {
m_executorContext->addToTuplesModified(amount);
}

void TempTableTupleDeleter::operator()(TempTable* tbl) const {
void TempTableTupleDeleter::operator()(AbstractTempTable* tbl) const {
if (tbl != NULL) {
tbl->deleteAllTempTuples();
}
Expand Down
5 changes: 3 additions & 2 deletions src/ee/execution/VoltDBEngine.h
Expand Up @@ -89,6 +89,7 @@ namespace voltdb {
class AbstractDRTupleStream;
class AbstractExecutor;
class AbstractPlanNode;
class AbstractTempTable;
class EnginePlanSet; // Locally defined in VoltDBEngine.cpp
class ExecutorContext;
class ExecutorVector;
Expand All @@ -103,7 +104,7 @@ class TheHashinator;

class TempTableTupleDeleter {
public:
void operator()(TempTable* tbl) const;
void operator()(AbstractTempTable* tbl) const;
};

struct UserDefinedFunctionInfo {
Expand All @@ -114,7 +115,7 @@ struct UserDefinedFunctionInfo {
// UniqueTempTableResult is a smart pointer wrapper around a temp
// table. It doesn't delete the temp table, but it will delete the
// contents of the table when it goes out of scope.
typedef std::unique_ptr<TempTable, TempTableTupleDeleter> UniqueTempTableResult;
typedef std::unique_ptr<AbstractTempTable, TempTableTupleDeleter> UniqueTempTableResult;

const int64_t DEFAULT_TEMP_TABLE_MEMORY = 1024 * 1024 * 100;

Expand Down

0 comments on commit 4ea9b74

Please sign in to comment.