Skip to content

Commit

Permalink
refactor: using abstract Batch class to refactor all batch api (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer committed Sep 16, 2020
1 parent 1b602ba commit 4cf307c
Show file tree
Hide file tree
Showing 22 changed files with 1,114 additions and 12 deletions.
2 changes: 2 additions & 0 deletions dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
<Package name="com.xiaomi.infra.pegasus.client"/>
<Package name="com.xiaomi.infra.pegasus.metrics"/>
<Package name="com.xiaomi.infra.pegasus.tools"/>
<Package name="com.xiaomi.infra.pegasus.client.request"/>
<Package name="com.xiaomi.infra.pegasus.example"/>
</Or>
</Match>

Expand Down
1 change: 1 addition & 0 deletions scripts/format-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/test/java/com/xiaomi/infra/pegasus/client/*.java
Expand Down
69 changes: 58 additions & 11 deletions src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;

public class FutureGroup<Result> {
private List<Future<Result>> asyncTasks;

public FutureGroup(int initialCapacity) {
asyncTasks = new ArrayList<>(initialCapacity);
Expand All @@ -28,7 +30,7 @@ public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException {
*/
public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) throws PException {
int timeLimit = timeoutMillis;
long duration = 0;
long duration;
for (int i = 0; i < asyncTasks.size(); i++) {
Future<Result> fu = asyncTasks.get(i);
try {
Expand All @@ -40,21 +42,66 @@ public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) th
throw new PException("async task #[" + i + "] await failed: " + e.toString());
}

if (fu.isSuccess() && timeLimit >= 0) {
if (timeLimit < 0) {
throw new PException(
String.format("async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis));
}

if (fu.isSuccess()) {
if (results != null) {
results.set(i, fu.getNow());
results.add(fu.getNow());
}
} else {
Throwable cause = fu.cause();
if (cause == null) {
throw new PException(
String.format(
"async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis));
}
throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause);
throw new PException("async task #[" + i + "] failed: ", fu.cause());
}
}
}

private List<Future<Result>> asyncTasks;
/**
* wait for all requests done even if some error occurs
*
* @param results if one request success, it should be pair(null, result), otherwise,
* pair(PException, null)
* @param timeoutMillis timeout
*/
public void waitAllComplete(List<Pair<PException, Result>> results, int timeoutMillis) {
assert results != null : "result != null";
int timeLimit = timeoutMillis;
long duration;

for (int i = 0; i < asyncTasks.size(); i++) {
Future<Result> fu = asyncTasks.get(i);
long startTs = System.currentTimeMillis();
try {
fu.await(timeLimit);
} catch (Exception e) {
results.add(
Pair.of(new PException("async task #[" + i + "] await failed: " + e.toString()), null));
} finally {
duration = System.currentTimeMillis() - startTs;
timeLimit -= duration;
}

if (timeLimit < 0) {
for (int j = i; j < asyncTasks.size(); j++) {
results.add(
Pair.of(
new PException(
String.format(
"async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis)),
null));
}
break;
}

if (fu.isSuccess()) {
results.add(Pair.of(null, fu.getNow()));
} else {
results.add(
Pair.of(
new PException("async task #[" + i + "] await failed: " + fu.cause().getMessage()),
null));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* * administrator
* administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
Expand Down Expand Up @@ -110,6 +110,9 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio
/**
* Batch get values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param values output values; should be created by caller; if succeed, the size of values will
Expand All @@ -119,12 +122,17 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchGet(String tableName, List<Pair<byte[], byte[]>> keys, List<byte[]> values)
throws PException;

/**
* Batch get values of different keys. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand All @@ -136,6 +144,7 @@ public void batchGet(String tableName, List<Pair<byte[], byte[]>> keys, List<byt
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchGet2(
String tableName, List<Pair<byte[], byte[]>> keys, List<Pair<PException, byte[]>> results)
throws PException;
Expand Down Expand Up @@ -210,6 +219,9 @@ public boolean multiGet(
* Batch get multiple values under the same hash key. Will terminate immediately if any error
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand All @@ -219,6 +231,7 @@ public boolean multiGet(
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchMultiGet(
String tableName, List<Pair<byte[], List<byte[]>>> keys, List<HashKeyData> values)
throws PException;
Expand All @@ -227,6 +240,10 @@ public void batchMultiGet(
* Batch get multiple values under the same hash key. Will wait for all requests done even if some
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
* List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
Expand All @@ -239,6 +256,7 @@ public void batchMultiGet(
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchMultiGet2(
String tableName,
List<Pair<byte[], List<byte[]>>> keys,
Expand Down Expand Up @@ -285,17 +303,24 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
/**
* Batch set lots of values. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @throws PException throws exception if any error occurs.
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchSet(String tableName, List<SetItem> items) throws PException;

/**
* Batch set lots of values. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param results output results; should be created by caller; after call done, the size of
Expand All @@ -307,6 +332,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchSet2(String tableName, List<SetItem> items, List<PException> results)
throws PException;

Expand All @@ -330,22 +356,30 @@ public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>
* Batch set multiple value under the same hash key. Will terminate immediately if any error
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
* @throws PException throws exception if any error occurs.
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchMultiSet(String tableName, List<HashKeyData> items, int ttlSeconds)
throws PException;

@Deprecated
public void batchMultiSet(String tableName, List<HashKeyData> items) throws PException;

/**
* Batch set multiple value under the same hash key. Will wait for all requests done even if some
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
Expand All @@ -358,10 +392,12 @@ public void batchMultiSet(String tableName, List<HashKeyData> items, int ttlSeco
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchMultiSet2(
String tableName, List<HashKeyData> items, int ttlSeconds, List<PException> results)
throws PException;

@Deprecated
public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PException> results)
throws PException;

Expand All @@ -380,18 +416,25 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
/**
* Batch delete values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @throws PException throws exception if any error occurs.
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchDel(String tableName, List<Pair<byte[], byte[]>> keys) throws PException;

/**
* Batch delete values of different keys. Will wait for all requests done even if some error
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. TThe latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
Expand All @@ -403,6 +446,7 @@ public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PExcep
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchDel2(String tableName, List<Pair<byte[], byte[]>> keys, List<PException> results)
throws PException;

Expand Down Expand Up @@ -439,19 +483,26 @@ public void delRange(
* Batch delete specified sort keys under the same hash key. Will terminate immediately if any
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @throws PException throws exception if any error occurs.
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> keys)
throws PException;

/**
* Batch delete specified sort keys under the same hash key. Will wait for all requests done even
* if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
* com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @param results output results; should be created by caller; after call done, the size of
Expand All @@ -463,6 +514,7 @@ public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> key
* <p>Notice: the method is not atomic, that means, maybe some keys succeed but some keys
* failed.
*/
@Deprecated
public int batchMultiDel2(
String tableName, List<Pair<byte[], List<byte[]>>> keys, List<PException> results)
throws PException;
Expand Down
Loading

0 comments on commit 4cf307c

Please sign in to comment.