Skip to content

Commit

Permalink
Fix the race condition between readEvictedBlock and mergeUnevictedTup…
Browse files Browse the repository at this point in the history
…les.
  • Loading branch information
linmagit committed Aug 18, 2015
1 parent ac7c2a9 commit 90efa79
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 9 deletions.
2 changes: 1 addition & 1 deletion properties/benchmarks/ycsb.properties
Expand Up @@ -10,4 +10,4 @@ fixed_size = false
num_records = 100000

# Zipfian skew factor for tuple access
skew_factor = 1.25
skew_factor = 0.8
30 changes: 27 additions & 3 deletions src/ee/anticache/AntiCacheEvictionManager.cpp
Expand Up @@ -1169,12 +1169,22 @@ Table* AntiCacheEvictionManager::evictBlockInBatch(PersistentTable *table, Persi

bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t block_id, int32_t tuple_offset) {

bool already_unevicted = table->isAlreadyUnEvicted(block_id);
int already_unevicted = table->isAlreadyUnEvicted(block_id);
if (already_unevicted && table->mergeStrategy()) { // this block has already been read
VOLT_WARN("Block 0x%x has already been read.", block_id);
return true;
}

if (already_unevicted) { // this block has already been read, but it is tuple-merge strategy
table->insertUnevictedBlock(table->getUnevictedBlocks(already_unevicted - 1));
table->insertTupleOffset(tuple_offset);

VOLT_DEBUG("BLOCK %u TUPLE %d - unevicted blocks size is %d",
block_id, tuple_offset, already_unevicted);

return true;
}

/*
* Finds the AntiCacheDB* instance associated with the needed block_id
*/
Expand Down Expand Up @@ -1239,8 +1249,11 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t
table->insertTupleOffset(tuple_offset);


table->insertUnevictedBlockID(std::pair<int32_t,int16_t>(block_id, 0));
table->insertUnevictedBlockID(std::pair<int32_t,int32_t>(block_id, table->unevictedBlocksSize()));

VOLT_DEBUG("BLOCK %u TUPLE %d - unevicted blocks size is %d",
block_id, tuple_offset, static_cast<int>(table->unevictedBlocksSize()));

delete value;
} catch (UnknownBlockAccessException e) {
throw e;
Expand Down Expand Up @@ -1642,11 +1655,22 @@ bool AntiCacheEvictionManager::mergeUnevictedTuples(PersistentTable *table) {



delete [] table->getUnevictedBlocks(i);
if (table->mergeStrategy())
delete [] table->getUnevictedBlocks(i);
//table->clearUnevictedBlocks(i);
}

VOLT_DEBUG("unevicted blockIDs size %d", static_cast<int>(table->getUnevictedBlockIDs().size()));
VOLT_DEBUG("unevicted blocks size %d", static_cast<int>(table->unevictedBlocksSize()));
if (!table->mergeStrategy()) {
map <int32_t, int32_t> unevictedBlockIDs = table->getUnevictedBlockIDs();
for (map <int32_t, int32_t>::iterator itr = unevictedBlockIDs.begin(); itr != unevictedBlockIDs.end();
itr++) {
//printf("bid:%d idx:%d\n", itr->first, itr->second);
delete [] table->getUnevictedBlocks(itr->second - 1);
}
table->clearUnevictedBlockIDs();
}
table->clearUnevictedBlocks();
table->clearMergeTupleOffsets();

Expand Down
13 changes: 10 additions & 3 deletions src/ee/storage/persistenttable.cpp
Expand Up @@ -317,15 +317,18 @@ std::map<int32_t, int32_t> PersistentTable::getUnevictedBlockIDs()
return m_unevictedBlockIDs;
}

bool PersistentTable::isAlreadyUnEvicted(int32_t blockId)
int PersistentTable::isAlreadyUnEvicted(int32_t blockId)
{
return m_unevictedBlockIDs.find(blockId) != m_unevictedBlockIDs.end();
if (m_unevictedBlockIDs.find(blockId) != m_unevictedBlockIDs.end())
return m_unevictedBlockIDs[blockId];
else
return 0;
}

void PersistentTable::insertUnevictedBlockID(std::pair<int32_t,int32_t> pair)
{
VOLT_INFO("Unevicted pair is %d", pair.first);
m_unevictedBlockIDs.insert(pair);
m_unevictedBlockIDs[pair.first] = pair.second;
}

bool PersistentTable::removeUnevictedBlockID(int32_t blockId) {
Expand Down Expand Up @@ -388,6 +391,10 @@ void PersistentTable::clearUnevictedBlocks()
{
m_unevictedBlocks.clear();
}
void PersistentTable::clearUnevictedBlockIDs()
{
m_unevictedBlockIDs.clear();
}
void PersistentTable::clearUnevictedBlocks(int i)
{
m_unevictedBlocks.erase(m_unevictedBlocks.begin()+i);
Expand Down
5 changes: 3 additions & 2 deletions src/ee/storage/persistenttable.h
Expand Up @@ -300,15 +300,16 @@ class PersistentTable : public Table {
bool removeUnevictedBlockID(int32_t blockId);
void insertUnevictedBlock(char* unevicted_tuples);
void insertTupleOffset(int32_t tuple_offset);
bool isAlreadyUnEvicted(int32_t blockId);
int isAlreadyUnEvicted(int32_t blockId);
int32_t getTuplesRead();
void setTuplesRead(int32_t tuplesRead);
void setBatchEvicted(bool batchEvicted);
bool isBatchEvicted();
void clearUnevictedBlocks();
void clearMergeTupleOffsets();
int64_t unevictTuple(ReferenceSerializeInput * in, int j, int merge_tuple_offset, bool blockMerge);
void clearUnevictedBlocks(int i);
void clearUnevictedBlocks(int i);
void clearUnevictedBlockIDs();
char* getUnevictedBlocks(int i);
int unevictedBlocksSize();

Expand Down
5 changes: 5 additions & 0 deletions src/frontend/edu/brown/hstore/AntiCacheManager.java
Expand Up @@ -14,6 +14,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;
import org.voltdb.CatalogContext;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class AntiCacheManager extends AbstractProcessingRunnable<AntiCacheManage
private static final Logger LOG = Logger.getLogger(AntiCacheManager.class);
private static final LoggerBoolean debug = new LoggerBoolean();
private static final LoggerBoolean trace = new LoggerBoolean();
public static final ReentrantLock lock = new ReentrantLock();

static {
LoggerUtil.attachObserver(LOG, debug, trace);
}
Expand Down Expand Up @@ -305,6 +308,7 @@ protected void processingCallback(QueueEntry next) {
// block the AntiCacheManager until each of the requests are finished
if (hstore_conf.site.anticache_profiling)
this.profilers[next.partition].retrieval_time.start();
lock.lock();
try {
if (debug.val)
LOG.debug(String.format("Asking EE to read in evicted blocks from table %s on partition %d: %s",
Expand All @@ -320,6 +324,7 @@ protected void processingCallback(QueueEntry next) {

// merge_needed = false;
} finally {
lock.unlock();
if (hstore_conf.site.anticache_profiling)
this.profilers[next.partition].retrieval_time.stopIfStarted();
}
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/org/voltdb/VoltProcedure.java
Expand Up @@ -60,6 +60,7 @@
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.hstore.util.ParameterSetArrayCache;
import edu.brown.hstore.AntiCacheManager;
import edu.brown.interfaces.DebugContext;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
Expand Down Expand Up @@ -585,10 +586,12 @@ public final ClientResponseImpl call(LocalTransaction txnState, Object... paramL

// Note that I decided to put this in here because we already
// have the logic down below for handling various errors from the EE
AntiCacheManager.lock.lock();
try {
Table catalog_tbl = txnState.getAntiCacheMergeTable();
this.executor.getExecutionEngine().antiCacheMergeBlocks(catalog_tbl);
} finally {
AntiCacheManager.lock.unlock();
if (hstore_conf.site.anticache_profiling) {
this.hstore_site.getAntiCacheManager()
.getDebugContext()
Expand Down

0 comments on commit 90efa79

Please sign in to comment.