Skip to content

Commit

Permalink
Two bug fix for anticache:
Browse files Browse the repository at this point in the history
1. Unevicted blocks might be evicted again in the uneviction from another table. Now we can only check whether the evicted block ID is still valid.
2. Original EvictedAccessException cannot be used with tuple-merge strategy. There was a hack in that. But still there's a problem with stats.
  • Loading branch information
linmagit committed Sep 14, 2015
1 parent 87465c6 commit d87cefe
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/ee/anticache/AntiCacheDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class AntiCacheDB {
*/
virtual AntiCacheBlock* readBlock(uint16_t blockId, bool isMigrate) = 0;

virtual bool validateBlock(uint16_t blockId) = 0;


/**
* Flush the buffered blocks to disk.
Expand Down
48 changes: 38 additions & 10 deletions src/ee/anticache/AntiCacheEvictionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t

int already_unevicted = table->isAlreadyUnEvicted(block_id);
if (already_unevicted && table->mergeStrategy()) { // this block has already been read
VOLT_WARN("Block 0x%x has already been read.", block_id);
VOLT_WARN("Block %d has already been read.", block_id);
return true;
}

Expand All @@ -1189,14 +1189,26 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t

AntiCacheDB* antiCacheDB = m_db_lookup[ACID];

if (!antiCacheDB->validateBlock(_block_id)) {
VOLT_WARN("Block %d has already been read from another table.", block_id);
return true;
}

if (already_unevicted) { // this block has already been read, but it is tuple-merge strategy
/* This is a HACK!! CHANGE LATER!!
for (int k = 0; k < (int)table->unevictedBlocksSize(); ++k) {
if (table->getMergeTupleOffset(k) == tuple_offset) {
return true;
}
}*/

table->insertUnevictedBlock(table->getUnevictedBlocks(already_unevicted - 1));
table->insertTupleOffset(tuple_offset);

antiCacheDB->removeSingleTupleStats(_block_id);

VOLT_DEBUG("BLOCK %u TUPLE %d - unevicted blocks size is %d",
block_id, tuple_offset, already_unevicted);
block_id, tuple_offset, (int)table->unevictedBlocksSize());

return true;
}
Expand All @@ -1221,6 +1233,8 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t
//AntiCacheDB* antiCacheDB = table->getAntiCacheDB();

try {
VOLT_DEBUG("BLOCK %u %d - unevicted blocks size is %d - alreadyUevicted %d",
_block_id, block_id, static_cast<int>(table->unevictedBlocksSize()), already_unevicted);
AntiCacheBlock* value = antiCacheDB->readBlock(_block_id, 0);

// allocate the memory for this block
Expand Down Expand Up @@ -1250,12 +1264,11 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t
}

table->insertUnevictedBlock(unevicted_tuples);
VOLT_DEBUG("BLOCK %u - unevicted blocks size is %d",
_block_id, static_cast<int>(table->unevictedBlocksSize()));
table->insertTupleOffset(tuple_offset);


table->insertUnevictedBlockID(std::pair<int32_t,int32_t>(block_id, table->unevictedBlocksSize()));
VOLT_DEBUG("after insert: alreadyUnevicted %d - IDs size %ld", table->isAlreadyUnEvicted(block_id), table->getUnevictedBlockIDs().size());

VOLT_DEBUG("BLOCK %u TUPLE %d - unevicted blocks size is %d",
block_id, tuple_offset, static_cast<int>(table->unevictedBlocksSize()));
Expand Down Expand Up @@ -1575,23 +1588,25 @@ bool AntiCacheEvictionManager::mergeUnevictedTuples(PersistentTable *table) {
#ifdef VOLT_INFO_ENABLED
VOLT_INFO("Merging %d blocks for table %s.", num_blocks, table->name().c_str());
#endif
VOLT_INFO("Merging %d blocks for table %s.", num_blocks, table->name().c_str());

for (int i = 0; i < num_blocks; i++) {
// XXX: have to put block size, which we don't know, so just put something large, like 10MB
ReferenceSerializeInput in(table->getUnevictedBlocks(i), 10485760);

merge_tuple_offset = table->getMergeTupleOffset(i); // what to do about this?
VOLT_DEBUG("Merge Tuple offset is %d", merge_tuple_offset);

// Read in all the meta-data
int num_tables = in.readInt();
std::vector<std::string> tableNames;
std::vector<int> numTuples;
for(int j = 0; j < num_tables; j++){
tableNames.push_back(in.readTextString());
numTuples.push_back(in.readInt());
VOLT_TRACE("%s", tableNames[j].c_str());
}

merge_tuple_offset = table->getMergeTupleOffset(i); // what to do about this?
//VOLT_INFO("Tuple offset is %d", merge_tuple_offset);

int count = 0;
for (std::vector<std::string>::iterator it = tableNames.begin() ; it != tableNames.end(); ++it){
PersistentTable *tableInBlock = dynamic_cast<PersistentTable*>(m_engine->getTable(*it));
Expand Down Expand Up @@ -1686,6 +1701,8 @@ bool AntiCacheEvictionManager::mergeUnevictedTuples(PersistentTable *table) {
}
table->clearUnevictedBlocks();
table->clearMergeTupleOffsets();
VOLT_DEBUG("unevicted blockIDs size %d", static_cast<int>(table->getUnevictedBlockIDs().size()));
VOLT_DEBUG("unevicted blocks size %d", static_cast<int>(table->unevictedBlocksSize()));

//VOLT_ERROR("Active Tuple Count: %d -- %d", (int)active_tuple_count, (int)table->activeTupleCount());
#ifndef ANTICACHE_TIMESTAMPS
Expand Down Expand Up @@ -1721,6 +1738,17 @@ void AntiCacheEvictionManager::recordEvictedAccess(catalog::Table* catalogTable,
if (m_blockable_accesses && !(block_id & 0x00080000)) {
m_blockable_accesses = false;
}

/*
if (m_evicted_filter.find(block_id) != m_evicted_filter.end())
if (m_evicted_filter[block_id].find(tuple_id) != m_evicted_filter[block_id].end()) {
VOLT_ERROR("try skipping %d %d", block_id, tuple_id);
return;
}
(m_evicted_filter[block_id]).insert(tuple_id);*/
//VOLT_ERROR("try reading %d %d", block_id, tuple_id);

m_evicted_tables.push_back(catalogTable);
m_evicted_block_ids.push_back(block_id);
m_evicted_offsets.push_back(tuple_id);
Expand All @@ -1745,14 +1773,14 @@ void AntiCacheEvictionManager::throwEvictedAccessException() {
// copy the block ids into an array
int num_blocks = 0;
for(vector<int32_t>::iterator itr = m_evicted_block_ids.begin(); itr != m_evicted_block_ids.end(); ++itr) {
VOLT_DEBUG("Marking block %d as being needed for uneviction", *itr);
VOLT_TRACE("Marking block %d as being needed for uneviction", *itr);
block_ids[num_blocks++] = *itr;
}

// copy the tuple offsets into an array
int num_tuples = 0;
for(vector<int32_t>::iterator itr = m_evicted_offsets.begin(); itr != m_evicted_offsets.end(); ++itr) {
VOLT_DEBUG("Marking tuple %d from %s as being needed for uneviction", *itr, m_evicted_tables[num_tuples]->name().c_str());
VOLT_TRACE("Marking tuple %d from %s as being needed for uneviction", *itr, m_evicted_tables[num_tuples]->name().c_str());
tuple_ids[num_tuples++] = *itr;
}

Expand All @@ -1761,7 +1789,7 @@ void AntiCacheEvictionManager::throwEvictedAccessException() {

// Do we really want to throw this here?
// FIXME We need to support multiple tables in the exception data
VOLT_INFO("Throwing EvictedTupleAccessException for table %s (%d) "
VOLT_DEBUG("Throwing EvictedTupleAccessException for table %s (%d) "
"[num_blocks=%d / num_tuples=%d]",
catalogTable->name().c_str(), catalogTable->relativeIndex(),
num_blocks, num_tuples);
Expand Down
5 changes: 4 additions & 1 deletion src/ee/anticache/AntiCacheEvictionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include <vector>
#include <map>
#include <pthread.h>

#define MAX_DBS 8

Expand Down Expand Up @@ -84,6 +85,7 @@ class AntiCacheEvictionManager {
m_evicted_tables.clear();
m_evicted_block_ids.clear();
m_evicted_offsets.clear();
m_evicted_filter.clear();
m_blockable_accesses = true;
}
inline bool hasEvictedAccesses() const {
Expand Down Expand Up @@ -119,6 +121,7 @@ class AntiCacheEvictionManager {
std::vector<catalog::Table*> m_evicted_tables;
std::vector<int32_t> m_evicted_block_ids;
std::vector<int32_t> m_evicted_offsets;
std::map <int32_t, set <int32_t> > m_evicted_filter;
// whether the block to be merged is blockable, that is, all blocks that are needed
// are in blockable tiers
bool m_blockable_accesses;
Expand All @@ -132,7 +135,7 @@ class AntiCacheEvictionManager {
// m_numdbs > 1;
bool m_migrate;
//std::map<int16_t, AntiCacheDB*> m_db_lookup_table;

}; // AntiCacheEvictionManager class


Expand Down
4 changes: 4 additions & 0 deletions src/ee/anticache/BerkeleyAntiCacheDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ void BerkeleyAntiCacheDB::writeBlock(const std::string tableName,
delete [] databuf_;
}

bool BerkeleyAntiCacheDB::validateBlock(uint16_t blockId) {
return true;
}

AntiCacheBlock* BerkeleyAntiCacheDB::readBlock(uint16_t blockId, bool isMigrate) {
Dbt key;
key.set_data(&blockId);
Expand Down
3 changes: 3 additions & 0 deletions src/ee/anticache/BerkeleyAntiCacheDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class BerkeleyAntiCacheDB : public AntiCacheDB {
const char* data,
const long size,
const int evictedTupleCount);

bool validateBlock(uint16_t blockID);

private:
DbEnv* m_dbEnv;
Db* m_db;
Expand Down
6 changes: 5 additions & 1 deletion src/ee/anticache/EvictedTupleAccessException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ EvictedTupleAccessException::EvictedTupleAccessException(int tableId, int numBlo
void EvictedTupleAccessException::p_serialize(ReferenceSerializeOutput *output) {

VOLT_TRACE("In EvictedTupleAccessException p_serialize().");
// This is hack. But the string buffer in Java layer has limit.
//if (m_numBlockIds > 10000)
// m_numBlockIds = 10000;

output->writeInt(m_tableId);
output->writeShort(static_cast<short>(m_numBlockIds)); // # of block ids
output->writeInt(m_numBlockIds); // # of block ids
//output->writeShort(static_cast<short>(m_numBlockIds)); // # of block ids
for (int ii = 0; ii < m_numBlockIds; ii++) {
output->writeInt(m_blockIds[ii]);
}
Expand Down
10 changes: 9 additions & 1 deletion src/ee/anticache/NVMAntiCacheDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ void NVMAntiCacheDB::writeBlock(const std::string tableName,
memcpy(block, buffer, bufsize);
delete[] buffer;

VOLT_DEBUG("Writing NVM Block: ID = %u, index = %u, tupleCount = %d, size = %ld", blockId, index, tupleCount, bufsize);
VOLT_DEBUG("Writing NVM Block: ID = %u, index = %u, tupleCount = %d, size = %ld, tableName = %s",
blockId, index, tupleCount, bufsize, tableName.c_str());

tupleInBlock[blockId] = tupleCount;
evictedTupleInBlock[blockId] = evictedTupleCount;
Expand All @@ -235,6 +236,13 @@ void NVMAntiCacheDB::writeBlock(const std::string tableName,
pushBlockLRU(blockId);
}

bool NVMAntiCacheDB::validateBlock(uint16_t blockId) {
if (m_blockMap.find(blockId) == m_blockMap.end())
return 0;
else
return 1;
}

AntiCacheBlock* NVMAntiCacheDB::readBlock(uint16_t blockId, bool isMigrate) {

std::map<uint16_t, std::pair<uint16_t, int32_t> >::iterator itr;
Expand Down
2 changes: 2 additions & 0 deletions src/ee/anticache/NVMAntiCacheDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class NVMAntiCacheDB : public AntiCacheDB {
const long size,
const int evictedTupleCount);

bool validateBlock(uint16_t blockId);

private:
/**
* NVM constants
Expand Down
16 changes: 16 additions & 0 deletions src/ee/execution/VoltDBEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
#include "logging/Logrecord.h"
#include "logging/AriesLogProxy.h"
#include <string>
#include <map>
#include <set>

#define BUFFER_SIZE 1024*1024*300 // 100 MB buffer for reading in log file

Expand Down Expand Up @@ -2032,13 +2034,15 @@ void VoltDBEngine::antiCacheAddDB(std::string dbDir, AntiCacheDBType dbType, boo
int VoltDBEngine::antiCacheReadBlocks(int32_t tableId, int numBlocks, int32_t blockIds[], int32_t tupleOffsets[]) {
int retval = ENGINE_ERRORCODE_SUCCESS;


// Grab the PersistentTable referenced by the given tableId
// This is simply the relativeIndex of the table in the catalog
// We can assume that the ordering hasn't changed.
PersistentTable *table = dynamic_cast<PersistentTable*>(this->getTable(tableId));
if (table == NULL) {
throwFatalException("Invalid table id %d", tableId);
}
VOLT_DEBUG("Read from table: %d %s", tableId, (table->name()).c_str());

#ifdef VOLT_INFO_ENABLED
std::ostringstream buffer;
Expand All @@ -2048,12 +2052,23 @@ int VoltDBEngine::antiCacheReadBlocks(int32_t tableId, int numBlocks, int32_t bl
}
VOLT_INFO("Preparing to read %d evicted blocks: [%s]", numBlocks, buffer.str().c_str());
#endif
VOLT_DEBUG("Preparing to read %d evicted blocks: [%s]", numBlocks, (table->name()).c_str());

// We can now ask it directly to read in the evicted blocks that they want
bool finalResult = true;
AntiCacheEvictionManager* eviction_manager = m_executorContext->getAntiCacheEvictionManager();
std::map <int32_t, set <int32_t> > filter;
try {
for (int i = 0; i < numBlocks; i++) {
VOLT_TRACE("inengine: %d", i);
if (filter.find(blockIds[i]) != filter.end())
if (filter[blockIds[i]].find(tupleOffsets[i]) != filter[blockIds[i]].end()) {
VOLT_WARN("skipping %d %d", blockIds[i], tupleOffsets[i]);
continue;
}

(filter[blockIds[i]]).insert(tupleOffsets[i]);
VOLT_DEBUG("reading %d %d", blockIds[i], tupleOffsets[i]);
finalResult = eviction_manager->readEvictedBlock(table, blockIds[i], tupleOffsets[i]) && finalResult;
} // FOR

Expand All @@ -2069,6 +2084,7 @@ int VoltDBEngine::antiCacheReadBlocks(int32_t tableId, int numBlocks, int32_t bl
retval = ENGINE_ERRORCODE_ERROR;
}

VOLT_DEBUG("Read from finished table: %d %s", tableId, (table->name()).c_str());
return (retval);
}

Expand Down
22 changes: 13 additions & 9 deletions src/frontend/edu/brown/hstore/AntiCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ protected void processingCallback(QueueEntry next) {
LOG.debug(String.format("Asking EE to read in evicted blocks from table %s on partition %d: %s",
next.catalog_tbl.getName(), next.partition, Arrays.toString(next.block_ids)));

//LOG.warn(Arrays.toString(next.block_ids) + "\n" + Arrays.toString(next.tuple_offsets));
ee.antiCacheReadBlocks(next.catalog_tbl, next.block_ids, next.tuple_offsets);

if (debug.val)
Expand Down Expand Up @@ -411,15 +412,18 @@ public boolean queue(AbstractTransaction txn, int partition, Table catalog_tbl,
txn.getBasePartition(), partition));

// HACK
Set<Integer> allBlockIds = new HashSet<Integer>();
for (int block : block_ids) {
allBlockIds.add(block);
}
block_ids = new int[allBlockIds.size()];
int i = 0;
for (int block : allBlockIds) {
block_ids[i++] = block;
}
if (hstore_conf.site.anticache_block_merge) {
//LOG.warn("here!!");
Set<Integer> allBlockIds = new HashSet<Integer>();
for (int block : block_ids) {
allBlockIds.add(block);
}
block_ids = new int[allBlockIds.size()];
int i = 0;
for (int block : allBlockIds) {
block_ids[i++] = block;
}
}

if (txn instanceof LocalTransaction) {
LocalTransaction ts = (LocalTransaction)txn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public EvictedTupleAccessException(ByteBuffer buffer) {
super(buffer);

this.table_id = buffer.getInt();
final int num_blocks = buffer.getShort();
final int num_blocks = buffer.getInt();
assert(num_blocks > 0) :
"Unexpected non-negative block count '" + num_blocks + "'";
this.block_ids = new int[num_blocks];
Expand Down Expand Up @@ -87,7 +87,7 @@ protected int p_getSerializedSize() {
// (4 bytes * # of block_ids)
// (4 bytes * # of tuple offsets)
// 4 bytes for partition id
return (4 + 2 + (4 * this.block_ids.length) + (4 * this.tuple_offsets.length) + 4);
return (4 + 4 + (4 * this.block_ids.length) + (4 * this.tuple_offsets.length) + 4);
}

/**
Expand All @@ -97,7 +97,7 @@ protected int p_getSerializedSize() {
@Override
protected void p_serializeToBuffer(ByteBuffer b) throws IOException {
b.putInt(this.table_id);
b.putShort((short)this.block_ids.length);
b.putInt(this.block_ids.length);
for (int i = 0; i < this.block_ids.length; i++) {
b.putInt(this.block_ids[i]);
} // FOR
Expand Down

0 comments on commit d87cefe

Please sign in to comment.