Permalink
Browse files

Moved EvictBlocksToDisk from PersistentTable to AntiCacheManager

  • Loading branch information...
Atreyee committed Apr 8, 2014
1 parent ec94004 commit 23025d2183418ec40f9225d39ae31bc49b767ec0
@@ -33,11 +33,13 @@
#include "storage/temptable.h"
#include "storage/tablefactory.h"
#include "anticache/EvictionIterator.h"
#include "boost/timer.hpp"
#include "anticache/EvictedTable.h"
#include <string>
#include <vector>
#include <time.h>
#include <stdlib.h>
#define MAX_EVICTED_TUPLE_SIZE 2500
namespace voltdb
{
@@ -466,7 +468,7 @@ Table* AntiCacheEvictionManager::evictBlock(PersistentTable *table, long blockSi
int32_t lastBlocksEvicted = table->getBlocksEvicted();
int64_t lastBytesEvicted = table->getBytesEvicted();
if (table->evictBlockToDisk(blockSize, numBlocks) == false) {
if (evictBlockToDisk(table, blockSize, numBlocks) == false) {
throwFatalException("Failed to evict tuples from table '%s'", table->name().c_str());
}
@@ -487,12 +489,192 @@ Table* AntiCacheEvictionManager::evictBlock(PersistentTable *table, long blockSi
return (m_evictResultTable);
}
bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const long block_size, int num_blocks) {
voltdb::Table* m_evictedTable = table->getEvictedTable();
int m_tuplesEvicted = table->getTuplesEvicted();
int m_blocksEvicted = table->getBlocksEvicted();
int m_bytesEvicted = table->getBytesEvicted();
int m_tuplesWritten = table->getTuplesWritten();
int m_blocksWritten = table->getBlocksWritten();
int m_bytesWritten = table->getBytesWritten();
if (m_evictedTable == NULL) {
throwFatalException("Trying to evict block from table '%s' before its "\
"EvictedTable has been initialized", table->name().c_str());
}
VOLT_DEBUG("Evicting a block of size %ld bytes from table '%s' with %d tuples",
block_size, table->name().c_str(), (int)table->allocatedTupleCount());
VOLT_DEBUG("%s Table Schema:\n%s",
m_evictedTable->name().c_str(), m_evictedTable->schema()->debug().c_str());
// get the AntiCacheDB instance from the executorContext
AntiCacheDB* antiCacheDB = table->getAntiCacheDB();
int tuple_length = -1;
bool needs_flush = false;
#ifdef VOLT_INFO_ENABLED
int active_tuple_count = (int)table->activeTupleCount();
#endif
for(int i = 0; i < num_blocks; i++)
{
// get a unique block id from the executorContext
int16_t block_id = antiCacheDB->nextBlockId();
// create a new evicted table tuple based on the schema for the source tuple
TableTuple evicted_tuple = m_evictedTable->tempTuple();
VOLT_DEBUG("Setting %s tuple blockId at offset %d", m_evictedTable->name().c_str(), 0);
evicted_tuple.setNValue(0, ValueFactory::getSmallIntValue(block_id)); // Set the ID for this block
evicted_tuple.setNValue(1, ValueFactory::getIntegerValue(0)); // set the tuple offset of this block
// buffer used for serializing a single tuple
DefaultTupleSerializer serializer;
char* serialized_data = new char[block_size];
ReferenceSerializeOutput out(serialized_data, block_size);
// Iterate through the table and pluck out tuples to put in our block
TableTuple tuple(table->m_schema);
EvictionIterator evict_itr(table);
#ifdef VOLT_INFO_ENABLED
boost::timer timer;
// int64_t origEvictedTableSize = m_evictedTable->activeTupleCount();
#endif
//size_t current_tuple_start_position;
int32_t num_tuples_evicted = 0;
out.writeInt(num_tuples_evicted); // reserve first 4 bytes in buffer for number of tuples in block
VOLT_DEBUG("Starting evictable tuple iterator for %s", name().c_str());
while (evict_itr.hasNext() && (out.size() + MAX_EVICTED_TUPLE_SIZE < block_size)) {
if(!evict_itr.next(tuple))
break;
// If this is the first tuple, then we need to allocate all of the memory and
// what not that we're going to need
if (tuple_length == -1) {
tuple_length = tuple.tupleLength();
}
//current_tuple_start_position = out.position();
// remove the tuple from the eviction chain
removeTuple(table, &tuple);
if (tuple.isEvicted())
{
VOLT_INFO("Tuple %d is already evicted. Skipping", table->getTupleID(tuple.address()));
continue;
}
VOLT_DEBUG("Evicting Tuple: %s", tuple.debug(name()).c_str());
tuple.setEvictedTrue();
// Populate the evicted_tuple with the block id and tuple offset
// Make sure this tuple is marked as evicted, so that we know it is an evicted
// tuple as we iterate through the index
evicted_tuple.setNValue(0, ValueFactory::getSmallIntValue(block_id));
evicted_tuple.setNValue(1, ValueFactory::getIntegerValue(num_tuples_evicted));
evicted_tuple.setEvictedTrue();
VOLT_DEBUG("EvictedTuple: %s", evicted_tuple.debug(m_evictedTable->name()).c_str());
// Then add it to this table's EvictedTable
const void* evicted_tuple_address = static_cast<EvictedTable*>(m_evictedTable)->insertEvictedTuple(evicted_tuple);
// Change all of the indexes to point to our new evicted tuple
table->setEntryToNewAddressForAllIndexes(&tuple, evicted_tuple_address);
// Now copy the raw bytes for this tuple into the serialized buffer
tuple.serializeWithHeaderTo(out);
// At this point it's safe for us to delete this mofo
tuple.freeObjectColumns(); // will return memory for uninlined strings to the heap
table->deleteTupleStorage(tuple);
num_tuples_evicted++;
VOLT_DEBUG("Added new evicted %s tuple to block #%d [tuplesEvicted=%d]",
name().c_str(), block_id, num_tuples_evicted);
} // WHILE
VOLT_DEBUG("Finished evictable tuple iterator for %s [tuplesEvicted=%d]",
table->name().c_str(), num_tuples_evicted);
// write out the block header (i.e. number of tuples in block)
out.writeIntAt(0, num_tuples_evicted);
VOLT_INFO("data before serialization is %x", out.data()[0]);
#ifdef VOLT_INFO_ENABLED
VOLT_DEBUG("Evicted %d tuples / %d bytes.", num_tuples_evicted, (int)out.size());
VOLT_DEBUG("Eviction Time: %.2f sec", timer.elapsed());
timer.restart();
#endif
// Only write out a bock if there are tuples in it
if (num_tuples_evicted >= 0) {
antiCacheDB->writeBlock(table->name(),
block_id,
num_tuples_evicted,
out.data(),
out.size());
needs_flush = true;
// Update Stats
m_tuplesEvicted += num_tuples_evicted;
m_blocksEvicted += 1;
m_bytesEvicted += out.size();
m_tuplesWritten += num_tuples_evicted;
m_blocksWritten += 1;
m_bytesWritten += out.size();
table->setTuplesEvicted(m_tuplesEvicted);
table->setBlocksEvicted(m_blocksEvicted);
table->setBytesEvicted(m_bytesEvicted);
table->setTuplesWritten(m_tuplesWritten);
table->setBlocksWritten(m_blocksWritten);
table->setBytesWritten(m_bytesWritten);
#ifdef VOLT_INFO_ENABLED
VOLT_INFO("AntiCacheDB Time: %.2f sec", timer.elapsed());
VOLT_INFO("Evicted Block #%d for %s [tuples=%d / size=%ld / tupleLen=%d]",
block_id, table->name().c_str(),
num_tuples_evicted, m_bytesEvicted, tuple_length);
// VOLT_INFO("%s EvictedTable [origCount:%ld / newCount:%ld]",
// name().c_str(), (long)origEvictedTableSize, (long)m_evictedTable->activeTupleCount());
#endif
} else {
VOLT_WARN("No tuples were evicted from %s", table->name().c_str());
}
delete [] serialized_data;
} // FOR
if (needs_flush) {
#ifdef VOLT_INFO_ENABLED
boost::timer timer;
#endif
// Tell the AntiCacheDB to flush our new blocks out to disk
// This will block until the blocks are safely written
antiCacheDB->flushBlocks();
#ifdef VOLT_INFO_ENABLED
VOLT_INFO("Flush Time: %.2f sec", timer.elapsed());
#endif
}
VOLT_INFO("Evicted block to disk...active tuple count difference: %d", (active_tuple_count - (int)table->activeTupleCount()));
return true;
}
Table* AntiCacheEvictionManager::evictBlockInBatch(PersistentTable *table, PersistentTable *childTable, long blockSize, int numBlocks) {
int32_t lastTuplesEvicted = table->getTuplesEvicted();
int32_t lastBlocksEvicted = table->getBlocksEvicted();
int64_t lastBytesEvicted = table->getBytesEvicted();
if (table->evictBlockToDisk(blockSize, numBlocks) == false) {
if (evictBlockToDisk(table, blockSize, numBlocks) == false) {
throwFatalException("Failed to evict tuples from table '%s'", table->name().c_str());
}
@@ -48,6 +48,7 @@ class AntiCacheEvictionManager {
bool removeTuple(PersistentTable* table, TableTuple* tuple);
Table* evictBlock(PersistentTable *table, long blockSize, int numBlocks);
bool evictBlockToDisk(PersistentTable *table, const long block_size, int num_blocks);
Table* evictBlockInBatch(PersistentTable *table, PersistentTable *childTable, long blockSize, int numBlocks);
Table* readBlocks(PersistentTable *table, int numBlocks, int16_t blockIds[], int32_t tuple_offsets[]);
View
@@ -376,6 +376,11 @@ class TableTuple {
size_t hashCode(size_t seed) const;
size_t hashCode() const;
inline void setEvictedTrue()
{
// treat the first "value" as a boolean flag
*(reinterpret_cast<char*> (m_data)) |= static_cast<char>(EVICTED_MASK);
}
protected:
inline void setDeletedTrue() {
// treat the first "value" as a boolean flag
@@ -395,12 +400,6 @@ class TableTuple {
*(reinterpret_cast<char*> (m_data)) &= static_cast<char>(~DIRTY_MASK);
}
inline void setEvictedTrue()
{
// treat the first "value" as a boolean flag
*(reinterpret_cast<char*> (m_data)) |= static_cast<char>(EVICTED_MASK);
}
inline void setEvictedFalse()
{
// treat the first "value" as a boolean flag
Oops, something went wrong.

0 comments on commit 23025d2

Please sign in to comment.