Permalink
Browse files

multi partition on same site works now. had to do the merge before hand

  • Loading branch information...
Atreyee committed Apr 22, 2014
1 parent 4fdf0cf commit ba759a5c2beeb45bf8e372d47fe7db82a471be13
@@ -326,19 +326,22 @@ protected void processingCallback(QueueEntry next) {
// HACK HACK HACK HACK HACK HACK
// We need to get a new txnId for ourselves, since the one that we
// were given before is now probably too far in the past
if(next.partition != next.ts.getBasePartition()){
ee.antiCacheMergeBlocks(next.catalog_tbl);
}
Long newTxnId = this.hstore_site.getTransactionInitializer().resetTransactionId(next.ts, next.partition);
LOG.info("restartin on local");
this.hstore_site.transactionInit(next.ts);
}else{
RemoteTransaction ts = (RemoteTransaction) next.ts;
RpcCallback<UnevictDataResponse> callback = ts.getUnevictCallback();
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(this.hstore_site.getSiteId())
.setTransactionId(oldTxnId)
.setPartitionId(next.partition)
.setStatus(Status.OK);
callback.run(builder.build());
// RemoteTransaction ts = (RemoteTransaction) next.ts;
// RpcCallback<UnevictDataResponse> callback = ts.getUnevictCallback();
// UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
// .setSenderSite(this.hstore_site.getSiteId())
// .setTransactionId(oldTxnId)
// .setPartitionId(next.partition)
// .setStatus(Status.OK);
// callback.run(builder.build());
}
@@ -845,6 +845,14 @@ public void unevictData(RpcController controller,
for(int i = 0; i < request.getTupleOffsetsList().size(); i++) tuple_offsets[i] = request.getTupleOffsets(i);
hstore_site.getAntiCacheManager().queue(ts, partition, catalog_tbl, block_ids, tuple_offsets);
// TRIAL
RpcCallback<UnevictDataResponse> callback = ts.getUnevictCallback();
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(hstore_site.getSiteId())
.setTransactionId(request.getTransactionId())
.setPartitionId(partition)
.setStatus(Status.OK);
done.run(builder.build());
}
} // END CLASS
@@ -1357,6 +1365,7 @@ public boolean sendUnevictDataMessage(int remote_site_id, LocalTransaction txn,
try {
System.out.println(this.channels[remote_site_id]);
this.channels[remote_site_id].unevictData(new ProtoRpcController(), request, this.unevictCallback);
LOG.info("sent message to remote hstore site");
if (trace.val)
LOG.trace(String.format("Sent %s to %s",
request.getClass().getSimpleName(),
@@ -1917,6 +1917,7 @@ public void transactionStart(LocalTransaction ts) {
}
// We will want to delete this transaction after we reject it if it is a single-partition txn
// Otherwise we will let the normal distributed transaction process clean things up
LOG.info("the reject happened here!!!");
this.transactionReject(ts, status);
if (singlePartitioned) this.queueDeleteTransaction(ts.getTransactionId(), status);
}
@@ -2192,6 +2193,7 @@ public Status transactionRestart(LocalTransaction orig_ts, Status status) {
orig_ts, orig_ts.getRestartCounter());
throw new RuntimeException(msg);
} else {
LOG.info("the reject happened coz of too many restarts");
this.transactionReject(orig_ts, Status.ABORT_REJECT);
return (Status.ABORT_REJECT);
}
@@ -2563,6 +2563,11 @@ private void processWorkFragment(AbstractTransaction ts, WorkFragment fragment,
} finally {
if (error != null) {
// error.printStackTrace();
if(error instanceof EvictedTupleAccessException){
EvictedTupleAccessException ex = (EvictedTupleAccessException) error;
System.out.println(ex.tuple_offsets[0]+"tuple offsets##########");
}
LOG.warn(String.format("%s - Unexpected %s on partition %d",
ts, error.getClass().getSimpleName(), this.partitionId),
error); // (debug.val ? error : null));
@@ -564,7 +564,7 @@ public final ClientResponseImpl call(LocalTransaction txnState, Object... paramL
try {
// ANTI-CACHE TABLE MERGE
if (hstore_conf.site.anticache_enable && txnState.hasAntiCacheMergeTable()) {
LOG.debug("Merging blocks for anticache table.");
LOG.info("Merging blocks for anticache table.");
if (hstore_conf.site.anticache_profiling) {
this.hstore_site.getAntiCacheManager()
@@ -576,6 +576,7 @@ public final ClientResponseImpl call(LocalTransaction txnState, Object... paramL
// have the logic down below for handling various errors from the EE
try {
Table catalog_tbl = txnState.getAntiCacheMergeTable();
System.out.println(catalog_tbl.fullName()+this.partitionId);
this.executor.getExecutionEngine().antiCacheMergeBlocks(catalog_tbl);
} finally {
if (hstore_conf.site.anticache_profiling) {
@@ -245,6 +245,7 @@ else if (m_catalog != null && procName.startsWith("@") == false) {
throw new java.io.InterruptedIOException("Interrupted while waiting for response");
}
if (cb.getResponse().getStatus() != Status.OK) {
LOG.info("the response failed!!!");
throw new ProcCallException(cb.getResponse(), cb.getResponse().getStatusString(), cb.getResponse().getException());
}
// cb.result() throws ProcCallException if procedure failed
@@ -227,8 +227,8 @@ public void unevictData(RpcController controller,
Long oldTxnId = request.getTransactionId();
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(hstore_site.getSiteId())
.setOldTransactionId(oldTxnId)
.setNewTransactionId(oldTxnId+1) // some new id
.setTransactionId(oldTxnId)
.setPartitionId(1) // some id
.setStatus(Status.OK);
done.run(builder.build());
@@ -92,7 +92,7 @@ public void testReadRecords() throws Exception {
System.err.println("-------------------------------");
String procName = GetUsers.class.getSimpleName();
long user1 = 9999;
long user1 = 99999;
long user2 = 1;
Object params[] = { user1, user2 };
ClientResponse cresponse = client.callProcedure(procName, params);
@@ -145,9 +145,14 @@ public static Test suite() {
MultiConfigSuiteBuilder builder = new MultiConfigSuiteBuilder(TestUsersSuite.class);
builder.setGlobalConfParameter("site.exec_voltdb_procinfo", true);
builder.setGlobalConfParameter("site.anticache_enable", true);
builder.setGlobalConfParameter("site.anticache_profiling", true);
builder.setGlobalConfParameter("site.anticache_profiling", false);
builder.setGlobalConfParameter("site.anticache_reset", true);
builder.setGlobalConfParameter("site.anticache_check_interval", Integer.MAX_VALUE);
builder.setGlobalConfParameter("site.network_startup_wait", 60000);
builder.setGlobalConfParameter("site.coordinator_sync_time", false);
builder.setGlobalConfParameter("site.status_enable", false);
builder.setGlobalConfParameter("site.txn_partition_id_managers", true);
UsersProjectBuilder project = new UsersProjectBuilder();
project.addAllDefaults();
@@ -158,19 +163,19 @@ public static Test suite() {
/////////////////////////////////////////////////////////////
// CONFIG #1: 1 Local Site/Partition running on JNI backend
/////////////////////////////////////////////////////////////
/* config = new LocalSingleProcessServer(PREFIX+"-1part.jar", 2, BackendTarget.NATIVE_EE_JNI);
config = new LocalSingleProcessServer(PREFIX+"-1part.jar", 2, BackendTarget.NATIVE_EE_JNI);
success = config.compile(project);
assert(success);
builder.addServerConfig(config);
*/
////////////////////////////////////////////////////////////
// CONFIG #2: cluster of 2 nodes running 2 site each, one replica
////////////////////////////////////////////////////////////
config = new LocalCluster(PREFIX+"-cluster.jar", 2, 1, 0, BackendTarget.NATIVE_EE_JNI);
/* config = new LocalCluster(PREFIX+"-cluster.jar", 2, 1, 0, BackendTarget.NATIVE_EE_JNI);
success = config.compile(project);
assert(success);
builder.addServerConfig(config);
*/
return builder;
}

0 comments on commit ba759a5

Please sign in to comment.