Permalink
Browse files

Add throttling to anti-cache retrieval for unblocking clients.

  • Loading branch information...
1 parent f98327b commit 063e0245a167761bee83757a561e41d394990b96 @malin1993ml malin1993ml committed Jan 31, 2016
@@ -90,6 +90,10 @@ AntiCacheEvictionManager::AntiCacheEvictionManager(const VoltDBEngine *engine) {
m_migrate = false;
+ if (pthread_mutex_init(&lock, NULL) != 0) {
+ VOLT_ERROR("Mutex init failed!");
+ }
+
if (pthread_mutex_init(&(prio_lock.cv_mutex), NULL) != 0) {
VOLT_ERROR("Mutex init failed!");
}
@@ -112,6 +116,8 @@ AntiCacheEvictionManager::~AntiCacheEvictionManager() {
delete m_evicted_tuple;
TupleSchema::freeTupleSchema(m_evicted_schema);
+ pthread_mutex_destroy(&lock);
+
pthread_mutex_destroy(&prio_lock.cv_mutex);
pthread_mutex_destroy(&prio_lock.cs_mutex);
pthread_cond_destroy(&prio_lock.cond);
@@ -1364,6 +1370,7 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t
VOLT_DEBUG("num tuples is %d", tuples);
}
+ //pthread_mutex_lock(&lock);
table->insertUnevictedBlock(unevicted_tuples, table->m_read_pivot);
table->insertTupleOffset(tuple_offset, table->m_read_pivot);
table->insertBlockID(block_id, table->m_read_pivot);
@@ -1373,6 +1380,7 @@ bool AntiCacheEvictionManager::readEvictedBlock(PersistentTable *table, int32_t
}
else
table->m_read_pivot++;
+ //pthread_mutex_unlock(&lock);
//if (table->m_read_pivot % 10000 == 0)
@@ -198,6 +198,8 @@ class AntiCacheEvictionManager {
std::vector<int32_t> m_evicted_block_ids_sync;
std::vector<int32_t> m_evicted_offsets_sync;
+ pthread_mutex_t lock;
+
std::map <int32_t, set <int32_t> > m_evicted_filter;
// whether the block to be merged is blockable, that is, all blocks that are needed
// are in blockable tiers
@@ -72,7 +72,7 @@ void BerkeleyAntiCacheDB::initializeDB() {
//DB_AUTO_COMMIT | // Immediately commit every operation
DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache)
// DB_TXN_NOSYNC | // Don't flush to disk every time, we will do that explicitly
- DB_INIT_LOCK | // concurrent data store
+ //DB_INIT_LOCK | // concurrent data store
//DB_PRIVATE |
DB_THREAD | // allow multiple threads
// DB_INIT_TXN |
@@ -133,6 +133,8 @@ public String toString() {
private final double UNEVICTION_RATIO_CLUSTER_THRESHOLD = .1;
private final double ACCESS_RATE_CLUSTER_THRESHOLD = .1;
+ private final long throttleThreshold;
+
/**
*
*/
@@ -286,6 +288,8 @@ public void update(EventObservable<VoltTable> o, VoltTable vt) {
numDBs = levels.length;
}
+
+ this.throttleThreshold = hstore_conf.client.blocking_concurrent * hstore_site.getLocalPartitionIds().size() / 8 * 3;
}
public Collection<Table> getEvictableTables() {
@@ -296,6 +300,10 @@ public Runnable getMemoryMonitorThread() {
return this.memoryMonitor;
}
+ public boolean checkQueueBound() {
+ return this.queue.size() < this.throttleThreshold;
+ }
+
// ----------------------------------------------------------------------------
// TRANSACTION PROCESSING
@@ -2769,7 +2769,7 @@ public void responseSend(LocalTransaction ts, ClientResponseImpl cresponse) {
"The client handle for " + ts + " was not set properly";
assert(status != Status.ABORT_MISPREDICT && status != Status.ABORT_EVICTEDACCESS) :
"Trying to send back a client response for " + ts + " but the status is " + status;
-
+
if (hstore_conf.site.txn_profiling && ts.profiler != null) ts.profiler.startPostClient();
boolean sendResponse = true;
@@ -4587,9 +4587,16 @@ protected void processClientResponse(LocalTransaction ts, ClientResponseImpl cre
if (ts.isPredictSinglePartition()) {
if (ts.isMarkedFinished(this.partitionId) == false)
this.finishTransaction(ts, status);
- //this.hstore_site.responseSend(ts, cresponse);
- //this.hstore_site.transactionRequeue(ts, status);
- this.hstore_site.transactionRestart(ts, status);
+ if (this.hstore_site.getAntiCacheManager().checkQueueBound()) {
+ this.hstore_site.transactionRequeue(ts, status);
+ // this.hstore_site.transactionRestart(ts, status);
+ }
+ else {
+ if (hstore_conf.site.exec_profiling) this.profiler.network_time.start();
+ this.hstore_site.responseSend(ts, cresponse);
+ if (hstore_conf.site.exec_profiling) this.profiler.network_time.stopIfStarted();
+ this.hstore_site.queueDeleteTransaction(ts.getTransactionId(), status);
+ }
}
// Send a message all the partitions involved that the party is over
// and that they need to abort the transaction. We don't actually care when we get the
@@ -634,9 +634,9 @@
description="Reserved eviction time for anti-caching after warmup of the benchmark. This requires that the system "+
"is compiled with ${site.anticache_warmup_eviction_enable} "+
"set to true.",
- //defaultInt = 450000,
+ defaultInt = 480000,
//defaultInt = 0000,
- defaultInt = 10000,
+ //defaultInt = 30000,
experimental=true
)
public int anticache_warmup_eviction_time;

0 comments on commit 063e024

Please sign in to comment.