Skip to content

Commit

Permalink
Return future from reduce agent, add testcases
Browse files Browse the repository at this point in the history
  • Loading branch information
fraenkel committed Aug 16, 2011
1 parent d728d0d commit 3a894fa
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -95,14 +99,15 @@ static public <A extends ReduceGridAgent, K extends Serializable, X> X callReduc
return null;
}

static public <A extends ReduceGridAgent, K extends Serializable, V> void callReduceAgentAll(WXSUtils utils, ReduceAgentFactory<A> factory,
Map<K, V> batch, BackingMap bmap) {
static public <A extends ReduceGridAgent, K extends Serializable, V, X> List<Future<X>> callReduceAgentAll(WXSUtils utils,
ReduceAgentFactory<A> factory, Map<K, V> batch, BackingMap bmap) {
int sz = batch.size();
A a;
switch (sz) {
case 0:
return;
case 1: // optimized version, generates a little less garbage.

if (sz == 0) {
return Collections.emptyList();
} else if (sz == 1) {
// optimized version, generates a little less garbage.
// invoke the agent to add the batch of records to the grid
a = factory.newAgent(batch);
// Insert all keys for one partition using the first key as a routing key
Expand All @@ -112,16 +117,13 @@ static public <A extends ReduceGridAgent, K extends Serializable, V> void callRe
} catch (UndefinedMapException e) {
throw new ObjectGridRuntimeException(e);
}
Object rc = am.callReduceAgent(a, Collections.singletonList(factory.getKey(a)));
boolean result = checkReturnValue(Boolean.TRUE, rc);
if (!result) {
logger.log(Level.SEVERE, "putAll failed because of a server side exception");
throw new ObjectGridRuntimeException("putAll failed");
}
break;
default:
X rc = (X) am.callReduceAgent(a, Collections.singletonList(factory.getKey(a)));
Future<X> future = new DoneFuture<X>(rc);
return Arrays.asList(future);
} else {

Map<Integer, SortedMap<K, V>> pmap = convertToPartitionEntryMap(bmap, batch);
ArrayList<Future<Boolean>> results = new ArrayList<Future<Boolean>>(pmap.size());
ArrayList<Future<X>> results = new ArrayList<Future<X>>(pmap.size());
for (Map.Entry<Integer, SortedMap<K, V>> e : pmap.entrySet()) {
// we need one key for partition routing
// so get the first one
Expand All @@ -131,13 +133,23 @@ static public <A extends ReduceGridAgent, K extends Serializable, V> void callRe
// invoke the agent to add the batch of records to the grid
a = factory.newAgent(perPartitionEntries);
// Insert all keys for one partition using the first key as a routing key
Future<Boolean> fv = utils.getExecutorService().submit(new CallReduceAgentThread<Boolean>(utils, bmap.getName(), key, a));
Future<X> fv = utils.getExecutorService().submit(new CallReduceAgentThread<X>(utils, bmap.getName(), key, a));
results.add(fv);

}
return results;
}

}

static public <A extends ReduceGridAgent, K extends Serializable, V, X> void callReduceAgentAll(WXSUtils utils, ReduceAgentFactory<A> factory,
Map<K, V> batch, BackingMap bmap, X result) {
List<Future<X>> r = callReduceAgentAll(utils, factory, batch, bmap);

if (!areAllFutures(Boolean.TRUE, results, ConfigProperties.getAgentTimeout(utils.getConfigProperties()))) {
logger.log(Level.SEVERE, "putAll failed because of a server side exception");
throw new ObjectGridRuntimeException("putAll failed");
if (!r.isEmpty()) {
if (!areAllFutures(result, r, ConfigProperties.getAgentTimeout(utils.getConfigProperties()))) {
logger.log(Level.SEVERE, "Agent failed because of a server side exception");
throw new ObjectGridRuntimeException("Agent failed");
}
}
}
Expand All @@ -159,4 +171,33 @@ static public <A extends ReduceGridAgent, K extends Serializable, X extends Seri

return collectResultsAsMap(results, ConfigProperties.getAgentTimeout(utils.getConfigProperties()));
}

static class DoneFuture<X> implements Future<X> {
X result;

DoneFuture(X r) {
result = r;
}

public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

public boolean isCancelled() {
return false;
}

public boolean isDone() {
return true;
}

public X get() throws InterruptedException, ExecutionException {
return result;
}

public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return result;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.junit.BeforeClass;
import org.junit.Test;

import com.devwebsphere.wxsutils.wxsmap.BigListPushAgent;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testCond_PutAll() {

// try with maps different size, orig = 10, new = 11
try {
Map<String, Boolean> rc = utils.cond_putAll(original, newValues, bmFarMap3);
utils.cond_putAll(original, newValues, bmFarMap3);
Assert.fail("Should have thrown exception");
} catch (ObjectGridRuntimeException e) {
// this is expected
Expand Down Expand Up @@ -764,43 +764,6 @@ public void testPingAllPartitions() {
Assert.assertEquals(ogclient.getMap("Set").getPartitionManager().getNumOfPartitions(), partitionCount);
}

/**
* This does a simple stress test against the grid.
*/
// @Test
public void testPutRate() {
clearMap();
int maxTests = 50;
// run more than one time to allow JIT to settle
// for unit test once is enough
for (int loop = 0; loop < 1; ++loop) {
for (int batchSize = 1000; batchSize <= 32000; batchSize *= 2) {
Map<String, String> batch = new HashMap<String, String>();
for (int i = 0; i < batchSize; ++i)
batch.put(Integer.toString(i), "V" + i);

long start = System.nanoTime();
for (int test = 0; test < maxTests; ++test) {
utils.putAll(batch, ogclient.getMap("FarMap3"));
}
if (false) {
ArrayList<String> keys = new ArrayList<String>();
for (int i = 0; i < batchSize; ++i) {
keys.add(Integer.toString(i));
}
Map<String, String> rc = utils.getAll(keys, ogclient.getMap("FarMap3"));

for (Map.Entry<String, String> e : rc.entrySet()) {
Assert.assertEquals("V" + e.getKey(), e.getValue());
}
}
double duration = (System.nanoTime() - start) / 1000000000.0;
double rate = (double) batchSize * (double) maxTests / duration;
System.out.println("Batch of " + batchSize + " rate is " + rate + " <" + (batch.size() * maxTests) + ":" + duration + ">");
}
}
}

@Test
public void testListEviction() {
WXSMapOfLists<String, String> list = utils.getMapOfLists("EvictionList");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ public void testQuery()
}
block = q.getNextResult();
}
Assert.assertEquals(true, personSet.size() == 0);
Assert.assertEquals(0, personSet.size());
}
}

0 comments on commit 3a894fa

Please sign in to comment.