Skip to content

Commit

Permalink
Small bug fixes and cleanup in R2Store
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed May 21, 2013
1 parent 81f3975 commit fc89e8b
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions contrib/restclient/src/java/voldemort/restclient/R2Store.java
Expand Up @@ -72,6 +72,8 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
private static final String DELETE = "DELETE";
private static final String ETAG = "ETag";
public static final String X_VOLD_VECTOR_CLOCK = "X-VOLD-Vector-Clock";
public static final String CONTENT_TYPE = "Content-Type";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String X_VOLD_REQUEST_TIMEOUT_MS = "X-VOLD-Request-Timeout-ms";
public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
Expand All @@ -85,6 +87,7 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
private HttpClientFactory _clientFactory;
private Client client = null;
private String baseURL;
private ObjectMapper mapper;

public R2Store(String baseURL, String storeName) {
super(storeName);
Expand All @@ -93,6 +96,7 @@ public R2Store(String baseURL, String storeName) {
final TransportClient transportClient = _clientFactory.getClient(new HashMap<String, String>());
client = new TransportClientAdapter(transportClient);
this.baseURL = baseURL;
mapper = new ObjectMapper();
} catch(Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -123,8 +127,8 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(DELETE);
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "0");
rb.setHeader(CONTENT_TYPE, "binary");
rb.setHeader(CONTENT_LENGTH, "0");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = rb.build();
Expand All @@ -140,9 +144,13 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
ve.printStackTrace();
throw ve;
} catch(Exception e) {
e.printStackTrace();
if(!e.getMessage().contains("status=404")) {
logger.error("Specified key to delete does not exist.", e);
return false;
}
}
return false;

return true;
}

@Override
Expand All @@ -157,9 +165,8 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold

// TODO: Form a proper request based on client config
rb.setMethod(GET);
rb.setHeader("Accept", "application/json");
rb.setHeader("Accept", "binary");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");
rb.setHeader(X_VOLD_INCONSISTENCY_RESOLVER, "custom");

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);
Expand All @@ -182,7 +189,7 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
throw ve;
} catch(Exception e) {
if(!e.getMessage().contains("status=404")) {
logger.error("ERROR: " + e);
logger.error("Specified key does not exist." + e);
}
}

Expand All @@ -206,16 +213,22 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
+ "/" + base64Key));

// Serialize the Vector clock
String serializedVC = CoordinatorUtils.getSerializedVectorClock((VectorClock) value.getVersion());
VectorClock vc = (VectorClock) value.getVersion();
String serializedVC = null;
if(!vc.getEntries().isEmpty()) {
serializedVC = CoordinatorUtils.getSerializedVectorClock(vc);
}

// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(POST);
rb.setEntity(outputBytes.toByteArray());
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "" + payload.length);
rb.setHeader(CONTENT_TYPE, "binary");
rb.setHeader(CONTENT_LENGTH, "" + payload.length);
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");
rb.setHeader(X_VOLD_VECTOR_CLOCK, serializedVC);
if(serializedVC != null && serializedVC.length() > 0) {
rb.setHeader(X_VOLD_VECTOR_CLOCK, serializedVC);
}

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);
Expand All @@ -231,14 +244,15 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
ve.printStackTrace();
throw ve;
} catch(Exception e) {
logger.error("ERROR: " + e);
if(!e.getMessage().contains("status=412")) {
logger.error("Specified version of the value is Obsolete.", e);
}
}
}

private List<Versioned<byte[]>> readResults(ByteString entity, String eTag, String lastModified)
throws IOException {

ObjectMapper mapper = new ObjectMapper();
logger.debug("Received etag : " + eTag);
logger.debug("Received last modified date : " + lastModified);
VectorClockWrapper vcWrapper = mapper.readValue(eTag, VectorClockWrapper.class);
Expand Down Expand Up @@ -288,9 +302,7 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,

// Parse the response
final ByteString entity = response.getEntity();
String contentType = response.getHeader("Content-Type");
// String eTag = response.getHeader(ETAG);
// String lastModified = response.getHeader(LAST_MODIFIED);
String contentType = response.getHeader(CONTENT_TYPE);
if(entity != null) {
if(contentType.equalsIgnoreCase(MULTIPART_CONTENT_TYPE)) {
resultMap = readResultsGetAll(entity);
Expand Down Expand Up @@ -318,23 +330,11 @@ private Map<ByteArray, List<Versioned<byte[]>>> readResultsGetAll(ByteString ent
Map<ByteArray, List<Versioned<byte[]>>> results = new HashMap<ByteArray, List<Versioned<byte[]>>>();

try {
ObjectMapper mapper = new ObjectMapper();
// VectorClockWrapper vcWrapper = mapper.readValue(eTag,
// VectorClockWrapper.class);

// Build the multipart object
byte[] bytes = new byte[entity.length()];
entity.copyBytes(bytes, 0);

ByteArrayDataSource ds = new ByteArrayDataSource(bytes, "multipart/mixed");
// logger.info("received data = ");
// BufferedReader in = new BufferedReader(new
// InputStreamReader(ds.getInputStream()));
// String inputLine;
// while((inputLine = in.readLine()) != null)
// System.out.println(inputLine);
// in.close();

MimeMultipart mp = new MimeMultipart(ds);
for(int i = 0; i < mp.getCount(); i++) {
MimeBodyPart part = (MimeBodyPart) mp.getBodyPart(i);
Expand Down

0 comments on commit fc89e8b

Please sign in to comment.