Permalink
Browse files

Extracting out the logic for serialization of tuples while evicting d…

…ata. The functionality now resides in a separate class instead of the AntiCacheEvictionManager. First cut
  • Loading branch information...
Atreyee committed Apr 14, 2014
1 parent 725e10b commit f03faf69a0fc09726a1dfb22ef59b73a0e4d1f21
Showing with 61 additions and 19 deletions.
  1. +4 −0 src/ee/anticache/AntiCacheDB.cpp
  2. +39 −0 src/ee/anticache/AntiCacheDB.h
  3. +18 −19 src/ee/anticache/AntiCacheEvictionManager.cpp
@@ -70,6 +70,10 @@ AntiCacheBlock::~AntiCacheBlock() {
delete m_buf;
}
}
BerkeleyDBBlock::~BerkeleyDBBlock() {
delete [] serialized_data;
}
AntiCacheDB::AntiCacheDB(ExecutorContext *ctx, std::string db_dir, long blockSize) :
m_executorContext(ctx),
@@ -28,6 +28,7 @@
#include <db_cxx.h>
#include "common/debuglog.h"
#include "common/DefaultTupleSerializer.h"
#define ANTICACHE_DB_NAME "anticache.db"
@@ -62,6 +63,7 @@ class AntiCacheBlock {
inline char* getData() const {
return m_block;
}
struct payload{
int16_t blockId;
std::string tableName;
@@ -79,6 +81,43 @@ class AntiCacheBlock {
}; // CLASS
// Encapsulates a block that is flushed out to BerkeleyDB
// TODO: merge it with AntiCacheBlock
class BerkeleyDBBlock{
public:
~BerkeleyDBBlock();
inline void initialize(long blockSize, std::string tableName, int16_t blockId, int numTuplesEvicted){
DefaultTupleSerializer serializer;
// buffer used for serializing a single tuple
serialized_data = new char[blockSize];
out.initializeWithPosition(serialized_data, blockSize, 0);
out.writeInt(numTuplesEvicted);// reserve first 4 bytes in buffer for number of tuples in block
}
inline void addTuple(TableTuple tuple){
// Now copy the raw bytes for this tuple into the serialized buffer
tuple.serializeWithHeaderTo(out);
}
inline void writeHeader(int num_tuples_evicted){
// write out the block header (i.e. number of tuples in block)
out.writeIntAt(0, num_tuples_evicted);
}
inline int getSerializedSize(){
return (int)out.size();
}
inline const char* getSerializedData(){
return out.data();
}
private:
ReferenceSerializeOutput out;
char * serialized_data;
};
/**
*
*/
@@ -36,6 +36,7 @@
#include "boost/timer.hpp"
#include "anticache/EvictedTable.h"
#include "anticache/UnknownBlockAccessException.h"
#include "anticache/AntiCacheDB.h"
#include <string>
#include <vector>
#include <time.h>
@@ -504,11 +505,6 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
evicted_tuple.setNValue(0, ValueFactory::getSmallIntValue(block_id)); // Set the ID for this block
evicted_tuple.setNValue(1, ValueFactory::getIntegerValue(0)); // set the tuple offset of this block
// buffer used for serializing a single tuple
DefaultTupleSerializer serializer;
char* serialized_data = new char[block_size];
ReferenceSerializeOutput out(serialized_data, block_size);
// Iterate through the table and pluck out tuples to put in our block
TableTuple tuple(table->m_schema);
EvictionIterator evict_itr(table);
@@ -521,10 +517,13 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
//size_t current_tuple_start_position;
int32_t num_tuples_evicted = 0;
out.writeInt(num_tuples_evicted); // reserve first 4 bytes in buffer for number of tuples in block
BerkeleyDBBlock block;
block.initialize(block_size, table->name(),
block_id,
num_tuples_evicted);
VOLT_DEBUG("Starting evictable tuple iterator for %s", name().c_str());
while (evict_itr.hasNext() && (out.size() + MAX_EVICTED_TUPLE_SIZE < block_size)) {
while (evict_itr.hasNext() && (block.getSerializedSize() + MAX_EVICTED_TUPLE_SIZE < block_size)) {
if(!evict_itr.next(tuple))
break;
@@ -560,8 +559,7 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
// Change all of the indexes to point to our new evicted tuple
table->setEntryToNewAddressForAllIndexes(&tuple, evicted_tuple_address);
// Now copy the raw bytes for this tuple into the serialized buffer
tuple.serializeWithHeaderTo(out);
block.addTuple(tuple);
// At this point it's safe for us to delete this mofo
tuple.freeObjectColumns(); // will return memory for uninlined strings to the heap
@@ -575,34 +573,35 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
VOLT_DEBUG("Finished evictable tuple iterator for %s [tuplesEvicted=%d]",
table->name().c_str(), num_tuples_evicted);
// write out the block header (i.e. number of tuples in block)
out.writeIntAt(0, num_tuples_evicted);
//VOLT_INFO("data before serialization is %x", out.data()[0]);
block.writeHeader(num_tuples_evicted);
#ifdef VOLT_INFO_ENABLED
VOLT_DEBUG("Evicted %d tuples / %d bytes.", num_tuples_evicted, (int)out.size());
VOLT_DEBUG("Evicted %d tuples / %d bytes.", num_tuples_evicted, block.getSerializedSize());
VOLT_DEBUG("Eviction Time: %.2f sec", timer.elapsed());
timer.restart();
#endif
// Only write out a bock if there are tuples in it
if (num_tuples_evicted >= 0) {
// TODO: make this look like
// block.flush();
// antiCacheDB->writeBlock(block);
antiCacheDB->writeBlock(table->name(),
block_id,
num_tuples_evicted,
out.data(),
out.size());
block.getSerializedData(),
block.getSerializedSize());
needs_flush = true;
// Update Stats
m_tuplesEvicted += num_tuples_evicted;
m_blocksEvicted += 1;
m_bytesEvicted += out.size();
m_bytesEvicted += block.getSerializedSize();
m_tuplesWritten += num_tuples_evicted;
m_blocksWritten += 1;
m_bytesWritten += out.size();
m_bytesWritten += block.getSerializedSize();
table->setTuplesEvicted(m_tuplesEvicted);
table->setBlocksEvicted(m_blocksEvicted);
@@ -622,7 +621,7 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
} else {
VOLT_WARN("No tuples were evicted from %s", table->name().c_str());
}
delete [] serialized_data;
} // FOR
if (needs_flush) {

0 comments on commit f03faf6

Please sign in to comment.