Skip to content

Commit

Permalink
HBASE-16854 Refactor the org.apache.hadoop.hbase.client.Action (ChiaP…
Browse files Browse the repository at this point in the history
…ing Tsai)
  • Loading branch information
tedyu committed Oct 18, 2016
1 parent 317136e commit bb6cc4d
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 125 deletions.
Expand Up @@ -22,21 +22,18 @@
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;


/** /**
* A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by * A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by
* {@link Table#batch} to associate the action with it's region and maintain * {@link Table#batch} to associate the action with it's region and maintain
* the index from the original request. * the index from the original request.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
//TODO: R is never used public class Action implements Comparable<Action> {
public class Action<R> implements Comparable<R> { private final Row action;
// TODO: This class should not be visible outside of the client package. private final int originalIndex;
private Row action;
private int originalIndex;
private long nonce = HConstants.NO_NONCE; private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;


public Action(Row action, int originalIndex) { public Action(Row action, int originalIndex) {
super();
this.action = action; this.action = action;
this.originalIndex = originalIndex; this.originalIndex = originalIndex;
} }
Expand All @@ -46,15 +43,13 @@ public Action(Row action, int originalIndex) {
* @param action Original action. * @param action Original action.
* @param replicaId Replica id for the new action. * @param replicaId Replica id for the new action.
*/ */
public Action(Action<R> action, int replicaId) { public Action(Action action, int replicaId) {
super();
this.action = action.action; this.action = action.action;
this.nonce = action.nonce; this.nonce = action.nonce;
this.originalIndex = action.originalIndex; this.originalIndex = action.originalIndex;
this.replicaId = replicaId; this.replicaId = replicaId;
} }



public void setNonce(long nonce) { public void setNonce(long nonce) {
this.nonce = nonce; this.nonce = nonce;
} }
Expand All @@ -75,10 +70,9 @@ public int getReplicaId() {
return replicaId; return replicaId;
} }


@SuppressWarnings("rawtypes")
@Override @Override
public int compareTo(Object o) { public int compareTo(Action other) {
return action.compareTo(((Action) o).getAction()); return action.compareTo(other.getAction());
} }


@Override @Override
Expand All @@ -89,9 +83,10 @@ public int hashCode() {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) return true; if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false; if (obj instanceof Action) {
Action<?> other = (Action<?>) obj; return compareTo((Action) obj) == 0;
return compareTo(other) == 0; }
return false;
} }


public long getNonce() { public long getNonce() {
Expand Down
Expand Up @@ -347,9 +347,9 @@ public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName table
return NO_REQS_RESULT; return NO_REQS_RESULT;
} }


Map<ServerName, MultiAction<Row>> actionsByServer = Map<ServerName, MultiAction> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>(); new HashMap<ServerName, MultiAction>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); List<Action> retainedActions = new ArrayList<Action>(rows.size());


NonceGenerator ng = this.connection.getNonceGenerator(); NonceGenerator ng = this.connection.getNonceGenerator();
long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
Expand Down Expand Up @@ -388,7 +388,7 @@ public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName table
LOG.error("Failed to get region location ", ex); LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list. // This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state. // We will then add it to ars in an already-failed state.
retainedActions.add(new Action<Row>(r, ++posInList)); retainedActions.add(new Action(r, ++posInList));
locationErrors.add(ex); locationErrors.add(ex);
locationErrorRows.add(posInList); locationErrorRows.add(posInList);
it.remove(); it.remove();
Expand All @@ -400,7 +400,7 @@ public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName table
break; break;
} }
if (code == ReturnCode.INCLUDE) { if (code == ReturnCode.INCLUDE) {
Action<Row> action = new Action<Row>(r, ++posInList); Action action = new Action(r, ++posInList);
setNonce(ng, r, action); setNonce(ng, r, action);
retainedActions.add(action); retainedActions.add(action);
// TODO: replica-get is not supported on this path // TODO: replica-get is not supported on this path
Expand Down Expand Up @@ -431,9 +431,9 @@ private RowCheckerHost createRowCheckerHost() {
)); ));
} }
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName, <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors, Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer,
ExecutorService pool) { ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
Expand Down Expand Up @@ -467,11 +467,11 @@ public void setOperationTimeout(int operationTimeout) {
* @param actionsByServer the multiaction per server * @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group. * @param nonceGroup Nonce group.
*/ */
static void addAction(ServerName server, byte[] regionName, Action<Row> action, static void addAction(ServerName server, byte[] regionName, Action action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) { Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
MultiAction<Row> multiAction = actionsByServer.get(server); MultiAction multiAction = actionsByServer.get(server);
if (multiAction == null) { if (multiAction == null) {
multiAction = new MultiAction<Row>(); multiAction = new MultiAction();
actionsByServer.put(server, multiAction); actionsByServer.put(server, multiAction);
} }
if (action.hasNonce() && !multiAction.hasNonceGroup()) { if (action.hasNonce() && !multiAction.hasNonceGroup()) {
Expand Down Expand Up @@ -499,7 +499,7 @@ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName ta
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
CancellableRegionServerCallable callable, int rpcTimeout) { CancellableRegionServerCallable callable, int rpcTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); List<Action> actions = new ArrayList<Action>(rows.size());


// The position will be used by the processBatch to match the object array returned. // The position will be used by the processBatch to match the object array returned.
int posInList = -1; int posInList = -1;
Expand All @@ -512,7 +512,7 @@ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName ta
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
} }
} }
Action<Row> action = new Action<Row>(r, posInList); Action action = new Action(r, posInList);
setNonce(ng, r, action); setNonce(ng, r, action);
actions.add(action); actions.add(action);
} }
Expand All @@ -523,13 +523,13 @@ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName ta
return ars; return ars;
} }


private void setNonce(NonceGenerator ng, Row r, Action<Row> action) { private void setNonce(NonceGenerator ng, Row r, Action action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return; if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
} }


protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults, Batch.Callback<CResult> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int rpcTimeout) { CancellableRegionServerCallable callable, int rpcTimeout) {
return new AsyncRequestFutureImpl<CResult>( return new AsyncRequestFutureImpl<CResult>(
Expand Down

0 comments on commit bb6cc4d

Please sign in to comment.