Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fixed a bug that will make GetAll to go to one more node than preferred
  • Loading branch information
zhongjiewu committed Sep 6, 2012
1 parent 33c809e commit 2b048f3
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 27 deletions.
Expand Up @@ -16,6 +16,7 @@

package voldemort.store.routed.action;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -137,24 +138,26 @@ public void requestComplete(Object result, long requestTime) {
successCount.increment();

List<Versioned<byte[]>> retrieved = values.get(key);
if(retrieved == null) {
retrieved = new ArrayList<Versioned<byte[]>>();
}
/*
* retrieved can be null if there are no values for the key
* provided
*/
if(retrieved != null) {
List<Versioned<byte[]>> existing = pipelineData.getResult().get(key);
List<Versioned<byte[]>> existing = pipelineData.getResult().get(key);

if(existing == null)
pipelineData.getResult().put(key, Lists.newArrayList(retrieved));
else
existing.addAll(retrieved);
}
if(existing == null)
pipelineData.getResult().put(key, Lists.newArrayList(retrieved));
else
existing.addAll(retrieved);

HashSet<Integer> zoneResponses = null;
if(pipelineData.getKeyToZoneResponse().containsKey(key)) {
zoneResponses = pipelineData.getKeyToZoneResponse().get(key);
} else {
zoneResponses = new HashSet<Integer>();
pipelineData.getKeyToZoneResponse().put(key, zoneResponses);
}
zoneResponses.add(response.getNode().getZoneId());
}
Expand Down
Expand Up @@ -79,13 +79,13 @@ public void execute(Pipeline pipeline) {
boolean zoneRequirement = false;
MutableInt successCount = pipelineData.getSuccessCount(key);

if(logger.isDebugEnabled())
logger.debug("GETALL for key " + key + " (keyRef: " + System.identityHashCode(key)
+ ") successes: " + successCount.intValue() + " preferred: " + preferred
+ " required: " + required);
if(logger.isDebugEnabled())
logger.debug("GETALL for key " + key + " (keyRef: " + System.identityHashCode(key)
+ ") successes: " + successCount.intValue() + " preferred: "
+ preferred + " required: " + required);

if(successCount.intValue() >= preferred) {
if(pipelineData.getZonesRequired() != null) {
if(pipelineData.getZonesRequired() != null && pipelineData.getZonesRequired() > 0) {

if(pipelineData.getKeyToZoneResponse().containsKey(key)) {
int zonesSatisfied = pipelineData.getKeyToZoneResponse().get(key).size();
Expand Down Expand Up @@ -149,6 +149,7 @@ public void execute(Pipeline pipeline) {
zoneResponses = pipelineData.getKeyToZoneResponse().get(key);
} else {
zoneResponses = new HashSet<Integer>();
pipelineData.getKeyToZoneResponse().put(key, zoneResponses);
}
zoneResponses.add(response.getNode().getZoneId());

Expand Down
41 changes: 26 additions & 15 deletions test/unit/voldemort/store/AbstractStoreTest.java
Expand Up @@ -125,8 +125,8 @@ public void testNullKeys() throws Exception {
// this is good
}
try {
store.getAll(Collections.<K> singleton(null), Collections.<K, T> singletonMap(null,
null));
store.getAll(Collections.<K> singleton(null),
Collections.<K, T> singletonMap(null, null));
fail("Store should not getAll null keys!");
} catch(IllegalArgumentException e) {
// this is good
Expand All @@ -141,12 +141,13 @@ public void testNullKeys() throws Exception {

@Test
public void testPutNullValue() {
// Store<K,V> store = getStore();
// K key = getKey();
// store.put(key, new Versioned<V>(null));
// List<Versioned<V>> found = store.get(key);
// assertEquals("Wrong number of values.", 1, found.size());
// assertEquals("Returned non-null value.", null, found.get(0).getValue());
// Store<K,V> store = getStore();
// K key = getKey();
// store.put(key, new Versioned<V>(null));
// List<Versioned<V>> found = store.get(key);
// assertEquals("Wrong number of values.", 1, found.size());
// assertEquals("Returned non-null value.", null,
// found.get(0).getValue());
}

@Test
Expand All @@ -155,12 +156,8 @@ public void testGetAndDeleteNonExistentKey() throws Exception {
Store<K, V, T> store = getStore();
List<Versioned<V>> found = store.get(key, null);
assertEquals("Found non-existent key: " + found, 0, found.size());
assertTrue("Delete of non-existent key succeeded.", !store.delete(key, getClock(1,
1,
2,
2,
3,
3)));
assertTrue("Delete of non-existent key succeeded.",
!store.delete(key, getClock(1, 1, 2, 2, 3, 3)));
}

private void testObsoletePutFails(String message,
Expand Down Expand Up @@ -315,7 +312,21 @@ public void testGetAll() throws Exception {
public void testGetAllWithAbsentKeys() throws Exception {
Store<K, V, T> store = getStore();
Map<K, List<Versioned<V>>> result = store.getAll(getKeys(3), null);
assertEquals(0, result.size());
boolean resultZero = (result.size() == 0);
boolean resultEmpty = true;
if(!resultZero) {
if(result.get(result.keySet().toArray()[0]).size() != 0) {
resultEmpty = false;
}
if(result.get(result.keySet().toArray()[1]).size() != 0) {
resultEmpty = false;
}
if(result.get(result.keySet().toArray()[2]).size() != 0) {
resultEmpty = false;
}
}
assertTrue(resultZero || resultEmpty);

}

@Test
Expand Down
91 changes: 91 additions & 0 deletions test/unit/voldemort/store/routed/GetallNodeReachTest.java
@@ -0,0 +1,91 @@
package voldemort.store.routed;

import static org.junit.Assert.assertEquals;
import static voldemort.VoldemortTestConstants.getFourNodeClusterWithZones;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;

import org.junit.Before;
import org.junit.Test;

import voldemort.TestUtils;
import voldemort.client.RoutingTier;
import voldemort.client.TimeoutConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.NoopFailureDetector;
import voldemort.routing.RoutingStrategyType;
import voldemort.serialization.SerializerDefinition;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.versioning.Versioned;

import com.google.common.collect.Maps;

public class GetallNodeReachTest {

private Cluster cluster;

@Before
public void setUp() throws Exception {
cluster = getFourNodeClusterWithZones();
}

@Test
public void testGetallTouchOne() throws Exception {
RoutedStore store = null;
HashMap<Integer, Integer> zoneReplicationFactor = new HashMap<Integer, Integer>();
zoneReplicationFactor.put(0, 2);
zoneReplicationFactor.put(1, 1);
zoneReplicationFactor.put(2, 1);
StoreDefinition storeDef = new StoreDefinitionBuilder().setName("test")
.setType(InMemoryStorageConfiguration.TYPE_NAME)
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
.setReplicationFactor(4)
.setZoneReplicationFactor(zoneReplicationFactor)
.setKeySerializer(new SerializerDefinition("string"))
.setValueSerializer(new SerializerDefinition("string"))
.setPreferredReads(2)
.setRequiredReads(1)
.setPreferredWrites(1)
.setRequiredWrites(1)
.setZoneCountReads(0)
.setZoneCountWrites(0)
.build();
Map<Integer, Store<ByteArray, byte[], byte[]>> subStores = Maps.newHashMap();
for(Node n: cluster.getNodes()) {
Store<ByteArray, byte[], byte[]> subStore = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test");
subStores.put(n.getId(), subStore);

}
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
Executors.newFixedThreadPool(2),
new TimeoutConfig(1000L,
false));

store = routedStoreFactory.create(cluster,
storeDef,
subStores,
true,
new NoopFailureDetector());
Versioned<byte[]> v = Versioned.value("v".getBytes());
subStores.get(0).put(TestUtils.toByteArray("k011"), v, null);
subStores.get(1).put(TestUtils.toByteArray("k011"), v, null);
subStores.get(2).put(TestUtils.toByteArray("k100"), v, null);
List<ByteArray> keys011 = new ArrayList<ByteArray>();
keys011.add(TestUtils.toByteArray("k011"));
List<ByteArray> keys100 = new ArrayList<ByteArray>();
keys100.add(TestUtils.toByteArray("k100"));
assertEquals(store.getAll(keys011, null).get(TestUtils.toByteArray("k011")).size(), 2);
assertEquals(store.getAll(keys100, null).get(TestUtils.toByteArray("k100")).size(), 0);
}
}

0 comments on commit 2b048f3

Please sign in to comment.