Skip to content

Commit

Permalink
Added functionality to get store schema
Browse files Browse the repository at this point in the history
Example:
==========

 curl http://localhost:8080/schemata/dGVzdA==
{"key-serializer": "SerializerDefinition(name = string, schema-info =
{}, compression = null)", "value-serializer": "SerializerDefinition(name
= string, schema-info = {}, compression = null)"}
  • Loading branch information
abh1nay committed Jun 6, 2013
1 parent 96ff6d9 commit b8af33d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
12 changes: 10 additions & 2 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory {
private final HashSet<SchedulerService> clientAsyncServiceRepo;

private Cluster cluster;
private List<StoreDefinition> storeDefs;

public AbstractStoreClientFactory(ClientConfig config) {
this.config = config;
Expand Down Expand Up @@ -221,8 +222,7 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
logger.debug("Obtained stores metadata xml" + storesXml);
}

List<StoreDefinition> storeDefs = storeMapper.readStoreList(new StringReader(storesXml),
false);
storeDefs = storeMapper.readStoreList(new StringReader(storesXml), false);
StoreDefinition storeDef = null;
for(StoreDefinition d: storeDefs)
if(d.getName().equals(storeName))
Expand Down Expand Up @@ -536,4 +536,12 @@ private void stopClientAsyncSchedulers() {
protected String getClientContext() {
return clientContextName;
}

public Cluster getCluster() {
return cluster;
}

public List<StoreDefinition> getStoreDefs() {
return storeDefs;
}
}
19 changes: 19 additions & 0 deletions src/java/voldemort/coordinator/FatClientWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ public void submitDeleteRequest(CompositeVoldemortRequest<ByteArray, byte[]> del

}

/**
* Perform a get schemata operation without going over the wire
*
*/
void submitGetSchemataRequest(final MessageEvent getRequestMessageEvent) {
try {

this.fatClientExecutor.submit(new GetSchemataRequestExecutor(getRequestMessageEvent,
storeName,
storeClientFactory));
if(logger.isDebugEnabled()) {
logger.debug("Submitted a get schemata request");
}

} catch(RejectedExecutionException rej) {
handleRejectedException(rej, getRequestMessageEvent);
}
}

// TODO: Add a custom HTTP Error status 429: Too many requests
private void handleRejectedException(RejectedExecutionException rej, MessageEvent getRequest) {
this.errorStats.reportException(rej);
Expand Down
81 changes: 81 additions & 0 deletions src/java/voldemort/coordinator/GetSchemataRequestExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package voldemort.coordinator;

import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TRANSFER_ENCODING;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;

import voldemort.client.SocketStoreClientFactory;
import voldemort.store.StoreDefinition;
import voldemort.utils.StoreDefinitionUtils;

public class GetSchemataRequestExecutor implements Runnable {

private SocketStoreClientFactory storeClientFactory;
private String storeName;
private final Logger logger = Logger.getLogger(GetSchemataRequestExecutor.class);
private ChannelBuffer responseContent;
private MessageEvent getRequestMessageEvent;

String SCHEMATAJSON = "{\"type\": \"record\", \"name\": \"storeserializer\",\"fields\": ["
+ "{ \"name\": \"key-serializer\", \"type\": \"string\" } ,"
+ "{ \"name\": \"value-serializer\", \"type\": \"string\"}]}";

public GetSchemataRequestExecutor(MessageEvent requestEvent,
String storeName,
SocketStoreClientFactory storeClientFactory) {
this.storeClientFactory = storeClientFactory;
this.storeName = storeName;
this.getRequestMessageEvent = requestEvent;
}

@Override
public void run() {

StoreDefinition storeDef = StoreDefinitionUtils.getStoreDefinitionWithName(storeClientFactory.getStoreDefs(),
storeName);

String keySerializer = storeDef.getKeySerializer().toString();
String valueSerializer = storeDef.getValueSerializer().toString();
GenericData.Record record = new GenericData.Record(Schema.parse(SCHEMATAJSON));

record.put("key-serializer", keySerializer);
record.put("value-serializer", valueSerializer);
writeResponse(record.toString().getBytes());
}

public void writeResponse(byte[] responseValue) {

this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length);
this.responseContent.writeBytes(responseValue);

// 1. Create the Response object
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);

// 2. Set the right headers
response.setHeader(CONTENT_TYPE, "binary");
response.setHeader(CONTENT_TRANSFER_ENCODING, "binary");

// 3. Copy the data into the payload
response.setContent(responseContent);
response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes());

if(logger.isDebugEnabled()) {
logger.debug("Response = " + response);
}

// Write the response to the Netty Channel
this.getRequestMessageEvent.getChannel().write(response);
}

}
25 changes: 23 additions & 2 deletions src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import voldemort.store.CompositeVersionedPutVoldemortRequest;
import voldemort.store.CompositeVoldemortRequest;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

Expand All @@ -66,6 +67,7 @@ public class VoldemortHttpRequestHandler extends SimpleChannelUpstreamHandler {
private static final String X_VOLD_VECTOR_CLOCK = "X-VOLD-Vector-Clock";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp";
public static final String SCHEMATA = "schemata";

private CoordinatorErrorStats errorStats = null;

Expand Down Expand Up @@ -209,7 +211,25 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
String storeName = getStoreName(requestURI);
FatClientWrapper fatClientWrapper = null;
if(storeName != null) {
fatClientWrapper = this.fatClientMap.get(storeName);
// /schemata/<store_name>
if(storeName.equalsIgnoreCase(SCHEMATA)) {

String queryStore = ByteUtils.getString(requestObject.getKey().get(),
"UTF-8");
fatClientWrapper = this.fatClientMap.get(queryStore);

if(queryStore == null || fatClientWrapper == null) {
this.errorStats.reportException(new IllegalArgumentException());
handleBadRequest(e, "Invalid store name. Critical error.");
return;
}

fatClientWrapper.submitGetSchemataRequest(e);
return;

} else {
fatClientWrapper = this.fatClientMap.get(storeName);
}
}

if(storeName == null || fatClientWrapper == null) {
Expand Down Expand Up @@ -386,7 +406,8 @@ private List<ByteArray> readKey(String requestURI) {
private String getStoreName(String requestURI) {
String storeName = null;
String[] parts = requestURI.split("/");
if(parts.length > 1 && this.fatClientMap.containsKey(parts[1])) {
if(parts.length > 1
&& (this.fatClientMap.containsKey(parts[1]) || parts[1].equalsIgnoreCase(SCHEMATA))) {
storeName = parts[1];
}

Expand Down

0 comments on commit b8af33d

Please sign in to comment.