Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/voldemort/voldemort
Browse files Browse the repository at this point in the history
# By Vinoth Chandar (16) and others
# Via Siddharth Singh
* 'master' of https://github.com/voldemort/voldemort: (56 commits)
  Upgrade Google Collections lib to Guava lib
  Dropped META-INF/MANIFEST.MF
  Fixes to KeySampler/KeyVersionFetcher
  Add MANIFEST.MF to git ignore
  Small bug fixes and cleanup in R2Store
  Make sure elapsed time is not negative
  Adding a null check for the versioned value object in convertStringToObject in MetadataStore. This was causing a small problem while restarting the Voldemort server
  Fix unused variable
  cleanup on tests
  more cleanup on the test
  Cleaned up the test
  Made the getserverStateLocked explicit adding the new test case this time around
  Added new end to end test for verifying the atomic update is consistent on bootstrap cleaned up code based off last code review
  atomic update of stores and cluster xml during rebalance
  Allow update metadata to take both stores and cluster xml
  more commits on slop fix
  deprecate send hint serial
  additional slop fix
  additional logging
  added end-to-end test for slops
  ...
  • Loading branch information
ctasada committed May 30, 2013
2 parents 266221c + 797b7c6 commit 97289e6
Show file tree
Hide file tree
Showing 152 changed files with 10,359 additions and 3,131 deletions.
2 changes: 1 addition & 1 deletion .classpath
Expand Up @@ -42,7 +42,6 @@
<classpathentry kind="lib" path="lib/commons-pool-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
<classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
<classpathentry kind="lib" path="lib/je-4.1.17.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
Expand All @@ -63,6 +62,7 @@
<classpathentry kind="lib" path="contrib/restclient/lib/data-1.5.10.jar"/>
<classpathentry kind="lib" path="contrib/restclient/lib/pegasus-common-1.5.10.jar"/>
<classpathentry kind="lib" path="contrib/restclient/lib/r2-1.5.10.jar"/>
<classpathentry kind="lib" path="lib/guava-14.0.1.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="classes"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -14,3 +14,4 @@ server.state
.temp
.idea
data/
META-INF/MANIFEST.MF
8 changes: 0 additions & 8 deletions META-INF/MANIFEST.MF

This file was deleted.

2 changes: 1 addition & 1 deletion build.properties
Expand Up @@ -42,5 +42,5 @@ tomcat.context=/voldemort
javac.version=1.5

## Release
curr.release=1.3.1
curr.release=1.3.3

14 changes: 6 additions & 8 deletions contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java
Expand Up @@ -44,7 +44,7 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.AbstractRebalanceTest;
import voldemort.client.rebalance.AbstractNonZonedRebalanceTest;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.RequestRoutingType;
Expand All @@ -55,7 +55,7 @@

/**
*/
public class Ec2RebalanceTest extends AbstractRebalanceTest {
public class Ec2RebalanceTest extends AbstractNonZonedRebalanceTest {

private static int NUM_KEYS;

Expand All @@ -66,7 +66,10 @@ public class Ec2RebalanceTest extends AbstractRebalanceTest {

private Map<Integer, String> nodeIdsInv = new HashMap<Integer, String>();
private List<String> activeHostNames = new ArrayList<String>();
private boolean useDonorBased = true;

public Ec2RebalanceTest() {
super(true, true);
}

@BeforeClass
public static void ec2Setup() throws Exception {
Expand Down Expand Up @@ -209,11 +212,6 @@ protected void stopServer(List<Integer> nodesToStop) throws Exception {
stopCluster(hostsToStop, ec2RebalanceTestConfig);
}

@Override
protected boolean useDonorBased() {
return this.useDonorBased;
}

private static class Ec2RebalanceTestConfig extends Ec2RemoteTestConfig {

private String configDirName;
Expand Down
Expand Up @@ -411,6 +411,11 @@ public void verifySchema(String url) throws Exception {
+ "\nBut expected: "
+ remoteStoreDef);
}
} else {
throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Have: "
+ newStoreDef
+ "\nBut expected: "
+ remoteStoreDef);
}
}

Expand Down Expand Up @@ -788,6 +793,11 @@ public void verifyAvroSchemaAndVersions(String url, boolean isVersioned) throws
+ "\nBut expected: "
+ remoteStoreDef);
}
} else {
throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Have: "
+ newStoreDef
+ "\nBut expected: "
+ remoteStoreDef);
}
}

Expand Down
Expand Up @@ -231,6 +231,29 @@ else if(occurred == Occurred.AFTER)
}
}

@Override
public List<Versioned<byte[]>> multiVersionPut(ByteArray key,
final List<Versioned<byte[]>> values)
throws VoldemortException {
StoreUtils.assertValidKey(key);
List<Versioned<byte[]>> valuesInStorage = null;
List<Versioned<byte[]>> obsoleteVals = null;

synchronized(this.locks.lockFor(key.get())) {
valuesInStorage = this.get(key, null);
obsoleteVals = resolveAndConstructVersionsToPersist(valuesInStorage, values);

try {
datastore.put(key.get(), assembleValues(valuesInStorage));
} catch(Exception e) {
String message = "Failed to put key " + key;
logger.error(message, e);
throw new VoldemortException(message, e);
}
}
return obsoleteVals;
}

/**
* Store the versioned values
*
Expand Down
61 changes: 34 additions & 27 deletions contrib/restclient/src/java/voldemort/restclient/R2Store.java
Expand Up @@ -41,6 +41,7 @@
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.VoldemortException;
import voldemort.coordinator.CoordinatorUtils;
import voldemort.coordinator.VectorClockWrapper;
import voldemort.store.AbstractStore;
import voldemort.utils.ByteArray;
Expand Down Expand Up @@ -70,10 +71,14 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
private static final String POST = "POST";
private static final String DELETE = "DELETE";
private static final String ETAG = "ETag";
public static final String X_VOLD_VECTOR_CLOCK = "X-VOLD-Vector-Clock";
public static final String CONTENT_TYPE = "Content-Type";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String X_VOLD_REQUEST_TIMEOUT_MS = "X-VOLD-Request-Timeout-ms";
public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp";

private static final String LAST_MODIFIED = "Last-Modified";
private static final String MULTIPART_CONTENT_TYPE = "multipart/binary";
private final Logger logger = Logger.getLogger(R2Store.class);
Expand All @@ -82,6 +87,7 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
private HttpClientFactory _clientFactory;
private Client client = null;
private String baseURL;
private ObjectMapper mapper;

public R2Store(String baseURL, String storeName) {
super(storeName);
Expand All @@ -90,6 +96,7 @@ public R2Store(String baseURL, String storeName) {
final TransportClient transportClient = _clientFactory.getClient(new HashMap<String, String>());
client = new TransportClientAdapter(transportClient);
this.baseURL = baseURL;
mapper = new ObjectMapper();
} catch(Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -120,8 +127,8 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(DELETE);
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "0");
rb.setHeader(CONTENT_TYPE, "binary");
rb.setHeader(CONTENT_LENGTH, "0");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = rb.build();
Expand All @@ -137,9 +144,13 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
ve.printStackTrace();
throw ve;
} catch(Exception e) {
e.printStackTrace();
if(!e.getMessage().contains("status=404")) {
logger.error("Specified key to delete does not exist.", e);
return false;
}
}
return false;

return true;
}

@Override
Expand All @@ -154,9 +165,8 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold

// TODO: Form a proper request based on client config
rb.setMethod(GET);
rb.setHeader("Accept", "application/json");
rb.setHeader("Accept", "binary");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");
rb.setHeader(X_VOLD_INCONSISTENCY_RESOLVER, "custom");

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);
Expand All @@ -179,7 +189,7 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
throw ve;
} catch(Exception e) {
if(!e.getMessage().contains("status=404")) {
logger.error("ERROR: " + e);
logger.error("Specified key does not exist." + e);
}
}

Expand All @@ -202,20 +212,30 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// Serialize the Vector clock
VectorClock vc = (VectorClock) value.getVersion();
String serializedVC = null;
if(!vc.getEntries().isEmpty()) {
serializedVC = CoordinatorUtils.getSerializedVectorClock(vc);
}

// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(POST);
rb.setEntity(outputBytes.toByteArray());
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "" + payload.length);
rb.setHeader(CONTENT_TYPE, "binary");
rb.setHeader(CONTENT_LENGTH, "" + payload.length);
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");
rb.setHeader(X_VOLD_INCONSISTENCY_RESOLVER, "custom");
if(serializedVC != null && serializedVC.length() > 0) {
rb.setHeader(X_VOLD_VECTOR_CLOCK, serializedVC);
}

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
RestResponse response = f.get();
String eTag = response.getHeader(ETAG);
final ByteString entity = response.getEntity();
if(entity == null) {
logger.error("Empty response !");
Expand All @@ -224,14 +244,15 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
ve.printStackTrace();
throw ve;
} catch(Exception e) {
logger.error("ERROR: " + e);
if(!e.getMessage().contains("status=412")) {
logger.error("Specified version of the value is Obsolete.", e);
}
}
}

private List<Versioned<byte[]>> readResults(ByteString entity, String eTag, String lastModified)
throws IOException {

ObjectMapper mapper = new ObjectMapper();
logger.debug("Received etag : " + eTag);
logger.debug("Received last modified date : " + lastModified);
VectorClockWrapper vcWrapper = mapper.readValue(eTag, VectorClockWrapper.class);
Expand Down Expand Up @@ -281,9 +302,7 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,

// Parse the response
final ByteString entity = response.getEntity();
String contentType = response.getHeader("Content-Type");
// String eTag = response.getHeader(ETAG);
// String lastModified = response.getHeader(LAST_MODIFIED);
String contentType = response.getHeader(CONTENT_TYPE);
if(entity != null) {
if(contentType.equalsIgnoreCase(MULTIPART_CONTENT_TYPE)) {
resultMap = readResultsGetAll(entity);
Expand Down Expand Up @@ -311,23 +330,11 @@ private Map<ByteArray, List<Versioned<byte[]>>> readResultsGetAll(ByteString ent
Map<ByteArray, List<Versioned<byte[]>>> results = new HashMap<ByteArray, List<Versioned<byte[]>>>();

try {
ObjectMapper mapper = new ObjectMapper();
// VectorClockWrapper vcWrapper = mapper.readValue(eTag,
// VectorClockWrapper.class);

// Build the multipart object
byte[] bytes = new byte[entity.length()];
entity.copyBytes(bytes, 0);

ByteArrayDataSource ds = new ByteArrayDataSource(bytes, "multipart/mixed");
// logger.info("received data = ");
// BufferedReader in = new BufferedReader(new
// InputStreamReader(ds.getInputStream()));
// String inputLine;
// while((inputLine = in.readLine()) != null)
// System.out.println(inputLine);
// in.close();

MimeMultipart mp = new MimeMultipart(ds);
for(int i = 0; i < mp.getCount(); i++) {
MimeBodyPart part = (MimeBodyPart) mp.getBodyPart(i);
Expand Down
Expand Up @@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;

import voldemort.versioning.Versioned;

public class SampleRESTClient {

public static void main(String[] args) {
Expand All @@ -31,8 +33,18 @@ public static void main(String[] args) {
clientStore.put("a", "Howdy!!!!");
clientStore.put("b", "Partner!!!!");

// Do a sample operation:
System.out.println("Received response : " + clientStore.get("a"));
// Do a sample get operation:
Versioned<String> versionedValue = clientStore.get("a");
System.out.println("Received response : " + versionedValue);

// Do a versioned put operation:
versionedValue.setObject("New Value !!!");
clientStore.put("a", versionedValue);

// Do a get again on the last versioned put operation:
versionedValue = clientStore.get("a");
System.out.println("Received response on the versioned put: " + versionedValue);

List<String> keyList = new ArrayList<String>();
keyList.add("a");
keyList.add("b");
Expand Down
Binary file removed lib/google-collect-1.0.jar
Binary file not shown.
Binary file added lib/guava-14.0.1.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions release_notes.txt
@@ -1,3 +1,8 @@
Release 1.3.3 on 04/24/2013
* VoldemortBuildandPush
- Fixed bug with schema check
* Streaming Client
- Fixed issue with redundant callback invocation
Release 1.3.1 on 03/25/2013
* HDFSFetcher
- Fixed the bug in calculating checksums when we entere a retry loop
Expand Down

0 comments on commit 97289e6

Please sign in to comment.