Skip to content

Commit

Permalink
Further optimizations and progress on the new dtxn protocol:
Browse files Browse the repository at this point in the history
+ Reduced the amount of data that is allocated per LocalTransaction and AbstractTransaction handle.
+ Implemented the LocalTransactionInitCallback that is used to initialize distributed transactions on remote HStoreSites
+ Moved more things into HStoreObjectPools
+ Removed the intialization latch from LocalTransaction.

I'm going to head home to start laundry and eat dinner. My goal tonight is to get single-partition txns working again...


git-svn-id: https://database.cs.brown.edu/svn/hstore/branches/newdtxn-branch@2379 7a8f3d5b-8c5f-44cb-9125-2414a256df87
  • Loading branch information
apavlo committed Oct 5, 2011
1 parent adb3325 commit c0c1b02
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 262 deletions.
7 changes: 7 additions & 0 deletions src/frontend/edu/mit/hstore/HStoreConf.java
Expand Up @@ -565,6 +565,13 @@ public final class SiteConf extends Conf {
experimental=false
)
public int pool_forwardtxnresponses_idle;

@ConfigProperty(
description="The max number of LocalTransactionInitCallbacks to keep idle in the pool.",
defaultInt=2500,
experimental=false
)
public int pool_localtxninit_idle;
}

// ============================================================================
Expand Down
30 changes: 17 additions & 13 deletions src/frontend/edu/mit/hstore/HStoreMessenger.java
Expand Up @@ -452,7 +452,6 @@ protected ProtoRpcController getProtoRpcController(LocalTransaction ts, int site
private final MessageRouter<TransactionFinishRequest, TransactionFinishResponse> router_transactionFinish = new MessageRouter<TransactionFinishRequest, TransactionFinishResponse>() {
protected void sendLocal(long txn_id, TransactionFinishRequest msg, Collection<Integer> partitions) {
hstore_site.transactionFinish(txn_id, msg.getStatus(), partitions);
hstore_site.completeTransaction(txn_id, msg.getStatus());
}
protected void sendRemote(HStoreService channel, ProtoRpcController controller, TransactionFinishRequest msg, RpcCallback<TransactionFinishResponse> callback) {
channel.transactionFinish(controller, msg, callback);
Expand Down Expand Up @@ -620,10 +619,18 @@ public void shutdown(RpcController controller, ShutdownRequest request,
// ----------------------------------------------------------------------------
// TRANSACTION METHODS
// ----------------------------------------------------------------------------

public void transactionInit(Hstore.TransactionInitRequest request, RpcCallback<Hstore.TransactionInitRequest> callback) {

// TODO

/**
*
* @param ts
* @param callback
*/
public void transactionInit(LocalTransaction ts, RpcCallback<Hstore.TransactionInitResponse> callback) {
Hstore.TransactionInitRequest request = Hstore.TransactionInitRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.addAllPartitions(ts.getPredictTouchedPartitions())
.build();
this.router_transactionInit.sendMessages(ts, request, callback, request.getPartitionsList());

}

Expand All @@ -650,15 +657,12 @@ public void transactionWork(LocalTransaction ts, Map<Integer, Hstore.Transaction
* @param callback
* @param partitions
*/
public void transactionPrepare(LocalTransaction ts, Collection<Integer> partitions) {
public void transactionPrepare(LocalTransaction ts, RpcCallback<Hstore.TransactionPrepareResponse> callback, Collection<Integer> partitions) {
Hstore.TransactionPrepareRequest request = Hstore.TransactionPrepareRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.addAllPartitions(ts.getDonePartitions())
.build();
this.router_transactionPrepare.sendMessages(ts,
request,
ts.getPrepareCallback(),
partitions);
this.router_transactionPrepare.sendMessages(ts, request, callback, partitions);
}

/**
Expand All @@ -683,18 +687,18 @@ public void transactionFinish(LocalTransaction ts, Hstore.Status status, RpcCall
/**
* Forward a StoredProcedureInvocation request to a remote site for execution
* @param serializedRequest
* @param done
* @param callback
* @param partition
*/
public void transactionRedirect(byte[] serializedRequest, RpcCallback<Hstore.TransactionRedirectResponse> done, int partition) {
public void transactionRedirect(byte[] serializedRequest, RpcCallback<Hstore.TransactionRedirectResponse> callback, int partition) {
int dest_site_id = hstore_site.getSiteIdForPartitionId(partition);
if (debug.get()) LOG.debug("Redirecting transaction request to partition #" + partition + " on " + HStoreSite.formatSiteName(dest_site_id));
ByteString bs = ByteString.copyFrom(serializedRequest);
Hstore.TransactionRedirectRequest mr = Hstore.TransactionRedirectRequest.newBuilder()
.setSenderId(this.local_site_id)
.setWork(bs)
.build();
this.channels.get(dest_site_id).transactionRedirect(new ProtoRpcController(), mr, done);
this.channels.get(dest_site_id).transactionRedirect(new ProtoRpcController(), mr, callback);
}

// ----------------------------------------------------------------------------
Expand Down
33 changes: 21 additions & 12 deletions src/frontend/edu/mit/hstore/HStoreObjectPools.java
@@ -1,6 +1,7 @@
package edu.mit.hstore;

import edu.brown.utils.TypedStackObjectPool;
import edu.mit.hstore.callbacks.LocalTransactionInitCallback;
import edu.mit.hstore.callbacks.TransactionPrepareCallback;
import edu.mit.hstore.callbacks.TransactionRedirectCallback;
import edu.mit.hstore.callbacks.TransactionRedirectResponseCallback;
Expand Down Expand Up @@ -30,6 +31,11 @@ public abstract class HStoreObjectPools {
*/
public static TypedStackObjectPool<TransactionPrepareCallback> POOL_TXNPREPARE;

/**
*
*/
public static TypedStackObjectPool<LocalTransactionInitCallback> POOL_TXN_LOCALINIT;

/**
* RemoteTransaction Object Pool
*/
Expand All @@ -52,23 +58,26 @@ public synchronized static void initialize(HStoreSite hstore_site) {
if (POOL_TXNREDIRECT_REQUEST == null) {
HStoreConf hstore_conf = hstore_site.getHStoreConf();
POOL_TXNREDIRECT_REQUEST = TypedStackObjectPool.factory(TransactionRedirectCallback.class,
hstore_conf.site.pool_forwardtxnrequests_idle,
hstore_conf.site.pool_profiling);
hstore_conf.site.pool_forwardtxnrequests_idle,
hstore_conf.site.pool_profiling);
POOL_FORWARDTXN_RESPONSE = TypedStackObjectPool.factory(TransactionRedirectResponseCallback.class,
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling);
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling);
POOL_TXNWORK = TypedStackObjectPool.factory(TransactionWorkCallback.class,
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling);
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling);
POOL_TXNPREPARE = TypedStackObjectPool.factory(TransactionPrepareCallback.class,
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling, hstore_site);
hstore_conf.site.pool_forwardtxnresponses_idle,
hstore_conf.site.pool_profiling, hstore_site);
POOL_TXN_LOCALINIT = TypedStackObjectPool.factory(LocalTransactionInitCallback.class,
hstore_conf.site.pool_localtxninit_idle,
hstore_conf.site.pool_profiling, hstore_site);
remoteTxnPool = TypedStackObjectPool.factory(RemoteTransaction.class,
hstore_conf.site.pool_remotetxnstate_idle,
hstore_conf.site.pool_profiling);
hstore_conf.site.pool_remotetxnstate_idle,
hstore_conf.site.pool_profiling);
localTxnPool = TypedStackObjectPool.factory(LocalTransaction.class,
hstore_conf.site.pool_localtxnstate_idle,
hstore_conf.site.pool_profiling);
hstore_conf.site.pool_localtxnstate_idle,
hstore_conf.site.pool_profiling);
}
}
}

0 comments on commit c0c1b02

Please sign in to comment.