Permalink
Browse files

Fetching tuples from child table while evicting out to an anticache b…

…lock. First cut. wip
  • Loading branch information...
Atreyee committed Apr 17, 2014
1 parent f03faf6 commit 6e5e32024cf3b52bbf3494b514433fd1d5144b79
Showing with 220 additions and 1 deletion.
  1. +219 −1 src/ee/anticache/AntiCacheEvictionManager.cpp
  2. +1 −0 src/ee/anticache/AntiCacheEvictionManager.h
@@ -27,6 +27,7 @@
#include "common/types.h"
#include "common/FatalException.hpp"
#include "common/ValueFactory.hpp"
#include "common/ValuePeeker.hpp"
#include "common/debuglog.h"
#include "storage/table.h"
#include "storage/persistenttable.h"
@@ -642,12 +643,229 @@ bool AntiCacheEvictionManager::evictBlockToDisk(PersistentTable *table, const lo
return true;
}
bool AntiCacheEvictionManager::evictBlockToDiskInBatch(PersistentTable *table, PersistentTable *childTable, const long block_size, int num_blocks) {
voltdb::Table* m_evictedTable = table->getEvictedTable();
int m_tuplesEvicted = table->getTuplesEvicted();
int m_blocksEvicted = table->getBlocksEvicted();
int64_t m_bytesEvicted = table->getBytesEvicted();
int m_tuplesWritten = table->getTuplesWritten();
int m_blocksWritten = table->getBlocksWritten();
int64_t m_bytesWritten = table->getBytesWritten();
if (m_evictedTable == NULL) {
throwFatalException("Trying to evict block from table '%s' before its "\
"EvictedTable has been initialized", table->name().c_str());
}
VOLT_DEBUG("Evicting a block of size %ld bytes from table '%s' with %d tuples",
block_size, table->name().c_str(), (int)table->allocatedTupleCount());
VOLT_DEBUG("%s Table Schema:\n%s",
m_evictedTable->name().c_str(), m_evictedTable->schema()->debug().c_str());
// get the AntiCacheDB instance from the executorContext
AntiCacheDB* antiCacheDB = table->getAntiCacheDB();
int tuple_length = -1;
bool needs_flush = false;
#ifdef VOLT_INFO_ENABLED
int active_tuple_count = (int)table->activeTupleCount();
#endif
TableIndex * pkeyIndex = table->primaryKeyIndex();
int columnIndex = pkeyIndex->getColumnIndices().front();
TableIndex * foreignKeyIndex = childTable->allIndexes().at(1); // Fix get the foreign key index
int foreignKeyIndexColumn = foreignKeyIndex->getColumnIndices().front();
for(int i = 0; i < num_blocks; i++)
{
// get a unique block id from the executorContext
int16_t block_id = antiCacheDB->nextBlockId();
// create a new evicted table tuple based on the schema for the source tuple
TableTuple evicted_tuple = m_evictedTable->tempTuple();
VOLT_DEBUG("Setting %s tuple blockId at offset %d", m_evictedTable->name().c_str(), 0);
evicted_tuple.setNValue(0, ValueFactory::getSmallIntValue(block_id)); // Set the ID for this block
evicted_tuple.setNValue(1, ValueFactory::getIntegerValue(0)); // set the tuple offset of this block
// Iterate through the table and pluck out tuples to put in our block
TableTuple tuple(table->m_schema);
EvictionIterator evict_itr(table);
#ifdef VOLT_INFO_ENABLED
boost::timer timer;
// int64_t origEvictedTableSize = m_evictedTable->activeTupleCount();
#endif
//size_t current_tuple_start_position;
int32_t num_tuples_evicted = 0;
BerkeleyDBBlock block;
block.initialize(block_size, table->name(),
block_id,
num_tuples_evicted);
VOLT_DEBUG("Starting evictable tuple iterator for %s", name().c_str());
while (evict_itr.hasNext() && (block.getSerializedSize() + MAX_EVICTED_TUPLE_SIZE < block_size)) {
if(!evict_itr.next(tuple))
break;
// If this is the first tuple, then we need to allocate all of the memory and
// what not that we're going to need
if (tuple_length == -1) {
tuple_length = tuple.tupleLength();
}
//current_tuple_start_position = out.position();
// remove the tuple from the eviction chain
removeTuple(table, &tuple);
if (tuple.isEvicted())
{
VOLT_INFO("Tuple %d is already evicted. Skipping", table->getTupleID(tuple.address()));
continue;
}
VOLT_DEBUG("Evicting Tuple: %s", tuple.debug(name()).c_str());
tuple.setEvictedTrue();
// Populate the evicted_tuple with the block id and tuple offset
// Make sure this tuple is marked as evicted, so that we know it is an evicted
// tuple as we iterate through the index
evicted_tuple.setNValue(0, ValueFactory::getSmallIntValue(block_id));
evicted_tuple.setNValue(1, ValueFactory::getIntegerValue(num_tuples_evicted));
evicted_tuple.setEvictedTrue();
VOLT_DEBUG("EvictedTuple: %s", evicted_tuple.debug(m_evictedTable->name()).c_str());
// Then add it to this table's EvictedTable
const void* evicted_tuple_address = static_cast<EvictedTable*>(m_evictedTable)->insertEvictedTuple(evicted_tuple);
// Change all of the indexes to point to our new evicted tuple
table->setEntryToNewAddressForAllIndexes(&tuple, evicted_tuple_address);
block.addTuple(tuple);
// value of the foreign key column
int32_t pkeyValue = ValuePeeker::peekAsInteger(tuple.getNValue(columnIndex));
VOLT_INFO("after peek !!!! %d", pkeyValue);
vector<ValueType> keyColumnTypes(1, VALUE_TYPE_BIGINT);
vector<int32_t>
keyColumnLengths(1, NValue::getTupleStorageSize(VALUE_TYPE_BIGINT));
vector<bool> keyColumnAllowNull(1, true);
TupleSchema* keySchema =
TupleSchema::createTupleSchema(keyColumnTypes,
keyColumnLengths,
keyColumnAllowNull,
true);
TableTuple searchkey(keySchema);
VOLT_INFO("after search key init !!!!");
searchkey.move(new char[searchkey.tupleLength()]);
VOLT_INFO("after search key memory init !!!!");
searchkey.
setNValue(0, ValueFactory::getBigIntValue(static_cast<int64_t>(pkeyValue)));
VOLT_INFO("after search key set value !!!! %d",ValuePeeker::peekAsInteger(ValueFactory::getIntegerValue(pkeyValue)));
bool found = foreignKeyIndex->moveToKey(&searchkey);
int count = 0;
VOLT_INFO("found child tuples!!!! %d", found);
TableTuple childTuple(childTable->m_schema);
if(found){
while (!(childTuple = foreignKeyIndex->nextValueAtKey()).isNullTuple())
{
++count;
VOLT_INFO("tuple foreign key id %d", ValuePeeker::peekAsInteger(childTuple.getNValue(foreignKeyIndexColumn)));
cout << "tuple foreign key id" << ValuePeeker::peekAsInteger(childTuple.getNValue(foreignKeyIndexColumn)) << endl;
//write out to block
}
}
// At this point it's safe for us to delete this mofo
tuple.freeObjectColumns(); // will return memory for uninlined strings to the heap
table->deleteTupleStorage(tuple);
num_tuples_evicted++;
VOLT_DEBUG("Added new evicted %s tuple to block #%d [tuplesEvicted=%d]",
name().c_str(), block_id, num_tuples_evicted);
} // WHILE
VOLT_DEBUG("Finished evictable tuple iterator for %s [tuplesEvicted=%d]",
table->name().c_str(), num_tuples_evicted);
block.writeHeader(num_tuples_evicted);
#ifdef VOLT_INFO_ENABLED
VOLT_DEBUG("Evicted %d tuples / %d bytes.", num_tuples_evicted, block.getSerializedSize());
VOLT_DEBUG("Eviction Time: %.2f sec", timer.elapsed());
timer.restart();
#endif
// Only write out a bock if there are tuples in it
if (num_tuples_evicted >= 0) {
// TODO: make this look like
// block.flush();
// antiCacheDB->writeBlock(block);
antiCacheDB->writeBlock(table->name(),
block_id,
num_tuples_evicted,
block.getSerializedData(),
block.getSerializedSize());
needs_flush = true;
// Update Stats
m_tuplesEvicted += num_tuples_evicted;
m_blocksEvicted += 1;
m_bytesEvicted += block.getSerializedSize();
m_tuplesWritten += num_tuples_evicted;
m_blocksWritten += 1;
m_bytesWritten += block.getSerializedSize();
table->setTuplesEvicted(m_tuplesEvicted);
table->setBlocksEvicted(m_blocksEvicted);
table->setBytesEvicted(m_bytesEvicted);
table->setTuplesWritten(m_tuplesWritten);
table->setBlocksWritten(m_blocksWritten);
table->setBytesWritten(m_bytesWritten);
#ifdef VOLT_INFO_ENABLED
VOLT_INFO("AntiCacheDB Time: %.2f sec", timer.elapsed());
VOLT_INFO("Evicted Block #%d for %s [tuples=%d / size=%ld / tupleLen=%d]",
block_id, table->name().c_str(),
num_tuples_evicted, m_bytesEvicted, tuple_length);
// VOLT_INFO("%s EvictedTable [origCount:%ld / newCount:%ld]",
// name().c_str(), (long)origEvictedTableSize, (long)m_evictedTable->activeTupleCount());
#endif
} else {
VOLT_WARN("No tuples were evicted from %s", table->name().c_str());
}
} // FOR
if (needs_flush) {
#ifdef VOLT_INFO_ENABLED
boost::timer timer;
#endif
// Tell the AntiCacheDB to flush our new blocks out to disk
// This will block until the blocks are safely written
antiCacheDB->flushBlocks();
#ifdef VOLT_INFO_ENABLED
VOLT_INFO("Flush Time: %.2f sec", timer.elapsed());
#endif
}
VOLT_INFO("Evicted block to disk...active tuple count difference: %d", (active_tuple_count - (int)table->activeTupleCount()));
return true;
}
Table* AntiCacheEvictionManager::evictBlockInBatch(PersistentTable *table, PersistentTable *childTable, long blockSize, int numBlocks) {
int32_t lastTuplesEvicted = table->getTuplesEvicted();
int32_t lastBlocksEvicted = table->getBlocksEvicted();
int64_t lastBytesEvicted = table->getBytesEvicted();
if (evictBlockToDisk(table, blockSize, numBlocks) == false) {
if (evictBlockToDiskInBatch(table, childTable, blockSize, numBlocks) == false) {
throwFatalException("Failed to evict tuples from table '%s'", table->name().c_str());
}
@@ -49,6 +49,7 @@ class AntiCacheEvictionManager {
Table* evictBlock(PersistentTable *table, long blockSize, int numBlocks);
bool evictBlockToDisk(PersistentTable *table, const long block_size, int num_blocks);
bool evictBlockToDiskInBatch(PersistentTable *table, PersistentTable *childTable, const long block_size, int num_blocks);
Table* evictBlockInBatch(PersistentTable *table, PersistentTable *childTable, long blockSize, int numBlocks);
Table* readBlocks(PersistentTable *table, int numBlocks, int16_t blockIds[], int32_t tuple_offsets[]);
bool mergeUnevictedTuples(PersistentTable *table);

0 comments on commit 6e5e320

Please sign in to comment.