diff --git a/contrib/restclient/src/java/voldemort/restclient/R2Store.java b/contrib/restclient/src/java/voldemort/restclient/R2Store.java index bb04c8ea35..15ce9d4d5a 100644 --- a/contrib/restclient/src/java/voldemort/restclient/R2Store.java +++ b/contrib/restclient/src/java/voldemort/restclient/R2Store.java @@ -78,6 +78,7 @@ public class R2Store extends AbstractStore { public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver"; public static final String CUSTOM_RESOLVING_STRATEGY = "custom"; public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp"; + public static final String SCHEMATA_STORE_NAME = "schemata"; private static final String LAST_MODIFIED = "Last-Modified"; private static final String MULTIPART_CONTENT_TYPE = "multipart/binary"; @@ -153,27 +154,27 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException return true; } + private RestResponse fetchGetResponse(RestRequestBuilder requestBuilder) throws Exception { + // TODO: Form a proper request based on client config + requestBuilder.setMethod(GET); + requestBuilder.setHeader("Accept", "binary"); + requestBuilder.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000"); + + RestRequest request = requestBuilder.build(); + Future f = client.restRequest(request); + + // This will block + return f.get(); + } + @Override public List> get(ByteArray key, byte[] transforms) throws VoldemortException { - List> resultList = new ArrayList>(); - + String base64Key = new String(Base64.encodeBase64(key.get())); + RestRequestBuilder rb = null; try { - String base64Key = new String(Base64.encodeBase64(key.get())); - RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName() - + "/" + base64Key)); - - // TODO: Form a proper request based on client config - rb.setMethod(GET); - rb.setHeader("Accept", "binary"); - rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000"); - - RestRequest request = rb.build(); - Future f = client.restRequest(request); - - // This will block - RestResponse response = f.get(); - + rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName() + "/" + base64Key)); + RestResponse response = fetchGetResponse(rb); // Parse the response final ByteString entity = response.getEntity(); String eTag = response.getHeader(ETAG); @@ -183,19 +184,35 @@ public List> get(ByteArray key, byte[] transforms) throws Vold } else { logger.error("Did not get any response!"); } - } catch(VoldemortException ve) { - ve.printStackTrace(); + logger.error("Error performing get", ve); throw ve; } catch(Exception e) { - if(!e.getMessage().contains("status=404")) { - logger.error("Specified key does not exist." + e); + // TODO this needs to be revisited.. Not sure if we rethrow + // exception when its not 404 + String errorMsg = e.getMessage(); + if(errorMsg != null && !errorMsg.contains("status=404")) { + logger.error("Specified key does not exist.", e); } } - return resultList; } + public String getSerializerInfoXml() throws VoldemortException { + RestRequestBuilder rb = null; + try { + String base64Key = new String(Base64.encodeBase64(getName().getBytes("UTF-8"))); + rb = new RestRequestBuilder(new URI(this.baseURL + "/" + SCHEMATA_STORE_NAME + "/" + + base64Key)); + RestResponse response = fetchGetResponse(rb); + // Parse the response + return response.getEntity().asString("UTF-8"); + } catch(Exception e) { + logger.error("Error in get serializer info request", e); + throw new VoldemortException(e); + } + } + @Override public void put(ByteArray key, Versioned value, byte[] transform) throws VoldemortException { diff --git a/contrib/restclient/src/java/voldemort/restclient/RESTClient.java b/contrib/restclient/src/java/voldemort/restclient/RESTClient.java index 531a35178b..99af24b59e 100644 --- a/contrib/restclient/src/java/voldemort/restclient/RESTClient.java +++ b/contrib/restclient/src/java/voldemort/restclient/RESTClient.java @@ -20,18 +20,19 @@ import java.util.Map; import java.util.Map.Entry; -import voldemort.client.RoutingTier; +import org.apache.log4j.Logger; + import voldemort.client.StoreClient; import voldemort.client.UpdateAction; import voldemort.cluster.Node; -import voldemort.routing.RoutingStrategyType; +import voldemort.coordinator.CoordinatorUtils; import voldemort.serialization.DefaultSerializerFactory; import voldemort.serialization.Serializer; import voldemort.serialization.SerializerDefinition; import voldemort.serialization.SerializerFactory; import voldemort.store.Store; -import voldemort.store.StoreDefinition; -import voldemort.store.StoreDefinitionBuilder; +import voldemort.store.compress.CompressingStore; +import voldemort.store.compress.CompressionStrategyFactory; import voldemort.store.serialized.SerializingStore; import voldemort.store.versioned.InconsistencyResolvingStore; import voldemort.utils.ByteArray; @@ -51,11 +52,12 @@ public class RESTClient implements StoreClient { private Store clientStore = null; private SerializerFactory serializerFactory = new DefaultSerializerFactory(); - private StoreDefinition storeDef; private String storeName; + private static Logger logger = Logger.getLogger(RESTClient.class); + /** - * A REST ful equivalent of the DefaultStoreClient. This uses the R2Store to + * A RESTful equivalent of the DefaultStoreClient. This uses the R2Store to * interact with the RESTful Coordinator * * @param bootstrapURL The bootstrap URL of the Voldemort cluster @@ -63,36 +65,36 @@ public class RESTClient implements StoreClient { */ public RESTClient(String bootstrapURL, String storeName) { + this.storeName = storeName; String baseURL = "http://" + bootstrapURL.split(":")[1].substring(2) + ":8080"; // The lowest layer : Transporting request to coordinator - Store store = new R2Store(baseURL, storeName); + R2Store r2store = new R2Store(baseURL, storeName); - // TODO - // Get the store definition so that we can learn the Serialization - // and - // compression properties + // bootstrap from the coordinator and obtain all the serialization + // information. + String serializerInfoXml = r2store.getSerializerInfoXml(); + SerializerDefinition keySerializerDefinition = CoordinatorUtils.parseKeySerializerDefinition(serializerInfoXml); + SerializerDefinition valueSerializerDefinition = CoordinatorUtils.parseValueSerializerDefinition(serializerInfoXml); + + logger.info("Bootstrapping for " + getName() + ": Key serializer " + + keySerializerDefinition); + logger.info("Bootstrapping for " + getName() + ": Value serializer " + + valueSerializerDefinition); + + // Start building the stack.. + // First, the transport layer + Store store = r2store; - // TODO // Add compression layer + if(keySerializerDefinition.hasCompression() || valueSerializerDefinition.hasCompression()) { + store = new CompressingStore(store, + new CompressionStrategyFactory().get(keySerializerDefinition.getCompression()), + new CompressionStrategyFactory().get(valueSerializerDefinition.getCompression())); + } // Add Serialization layer - - // Set the following values although we don't need them - // TODO: Fix this, so that we only need to set the needed parameters - storeDef = new StoreDefinitionBuilder().setName(storeName) - .setType("bdb") - .setKeySerializer(new SerializerDefinition("string")) - .setValueSerializer(new SerializerDefinition("string")) - .setRoutingPolicy(RoutingTier.CLIENT) - .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) - .setReplicationFactor(1) - .setPreferredReads(1) - .setRequiredReads(1) - .setPreferredWrites(1) - .setRequiredWrites(1) - .build(); - Serializer keySerializer = (Serializer) serializerFactory.getSerializer(storeDef.getKeySerializer()); - Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(storeDef.getValueSerializer()); + Serializer keySerializer = (Serializer) serializerFactory.getSerializer(keySerializerDefinition); + Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(valueSerializerDefinition); clientStore = SerializingStore.wrap(store, keySerializer, valueSerializer, null); // Add inconsistency Resolving layer @@ -100,8 +102,6 @@ public RESTClient(String bootstrapURL, String storeName) { clientStore = new InconsistencyResolvingStore(clientStore, new ChainedResolver>(new VectorClockInconsistencyResolver(), secondaryResolver)); - - this.storeName = storeName; } @Override @@ -236,6 +236,11 @@ public List getResponsibleNodes(K key) { } public void close() { + // TODO understand why the client hangs around even after close() this.clientStore.close(); } + + public String getName() { + return this.storeName; + } } diff --git a/src/java/voldemort/coordinator/CoordinatorUtils.java b/src/java/voldemort/coordinator/CoordinatorUtils.java index d8003ca655..6f566d15f1 100644 --- a/src/java/voldemort/coordinator/CoordinatorUtils.java +++ b/src/java/voldemort/coordinator/CoordinatorUtils.java @@ -1,8 +1,21 @@ package voldemort.coordinator; +import java.io.IOException; +import java.io.StringReader; + import org.codehaus.jackson.map.ObjectMapper; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.jdom.output.Format; +import org.jdom.output.XMLOutputter; +import voldemort.serialization.SerializerDefinition; +import voldemort.store.StoreDefinition; import voldemort.versioning.VectorClock; +import voldemort.xml.MappingException; +import voldemort.xml.StoreDefinitionsMapper; public class CoordinatorUtils { @@ -43,4 +56,65 @@ public static VectorClock deserializeVectorClock(String serializedVC) { return vc; } + + /** + * Given a storedefinition, constructs the xml string to be sent out in + * response to a "schemata" fetch request + * + * @param storeDefinition + * @return + */ + public static String constructSerializerInfoXml(StoreDefinition storeDefinition) { + Element store = new Element(StoreDefinitionsMapper.STORE_ELMT); + store.addContent(new Element(StoreDefinitionsMapper.STORE_NAME_ELMT).setText(storeDefinition.getName())); + Element keySerializer = new Element(StoreDefinitionsMapper.STORE_KEY_SERIALIZER_ELMT); + StoreDefinitionsMapper.addSerializer(keySerializer, storeDefinition.getKeySerializer()); + store.addContent(keySerializer); + + Element valueSerializer = new Element(StoreDefinitionsMapper.STORE_VALUE_SERIALIZER_ELMT); + StoreDefinitionsMapper.addSerializer(valueSerializer, storeDefinition.getValueSerializer()); + store.addContent(valueSerializer); + + XMLOutputter serializer = new XMLOutputter(Format.getPrettyFormat()); + return serializer.outputString(store); + } + + /** + * Given an xml string containing the store's serialization information, + * obtains the key serializer definition + * + * @param serializerInfoXml + * @return + */ + public static SerializerDefinition parseKeySerializerDefinition(String serializerInfoXml) { + return parseSerializerDefinition(serializerInfoXml, + StoreDefinitionsMapper.STORE_KEY_SERIALIZER_ELMT); + } + + /** + * Given an xml string containing the store's serialization information, + * obtains the value serializer definition + * + * @param serializerInfoXml + * @return + */ + public static SerializerDefinition parseValueSerializerDefinition(String serializerInfoXml) { + return parseSerializerDefinition(serializerInfoXml, + StoreDefinitionsMapper.STORE_VALUE_SERIALIZER_ELMT); + } + + private static SerializerDefinition parseSerializerDefinition(String serializerInfoXml, + String elementName) { + SAXBuilder builder = new SAXBuilder(); + try { + Document doc = builder.build(new StringReader(serializerInfoXml)); + Element root = doc.getRootElement(); + Element serializerElement = root.getChild(elementName); + return StoreDefinitionsMapper.readSerializer(serializerElement); + } catch(JDOMException e) { + throw new MappingException(e); + } catch(IOException e) { + throw new MappingException(e); + } + } } diff --git a/src/java/voldemort/coordinator/GetSchemataRequestExecutor.java b/src/java/voldemort/coordinator/GetSchemataRequestExecutor.java index e8e435944f..ccc170701b 100644 --- a/src/java/voldemort/coordinator/GetSchemataRequestExecutor.java +++ b/src/java/voldemort/coordinator/GetSchemataRequestExecutor.java @@ -8,8 +8,6 @@ import java.io.UnsupportedEncodingException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -46,15 +44,9 @@ public void run() { StoreDefinition storeDef = StoreDefinitionUtils.getStoreDefinitionWithName(storeClientFactory.getStoreDefs(), storeName); - - String keySerializer = storeDef.getKeySerializer().toString(); - String valueSerializer = storeDef.getValueSerializer().toString(); - GenericData.Record record = new GenericData.Record(Schema.parse(SCHEMATAJSON)); - - record.put("key-serializer", keySerializer); - record.put("value-serializer", valueSerializer); + String serializerInfoXml = CoordinatorUtils.constructSerializerInfoXml(storeDef); try { - writeResponse(record.toString().getBytes("UTF-8")); + writeResponse(serializerInfoXml.getBytes("UTF-8")); } catch(UnsupportedEncodingException e) { logger.error(e); } diff --git a/src/java/voldemort/xml/StoreDefinitionsMapper.java b/src/java/voldemort/xml/StoreDefinitionsMapper.java index 6f958fd3a3..0094551d29 100644 --- a/src/java/voldemort/xml/StoreDefinitionsMapper.java +++ b/src/java/voldemort/xml/StoreDefinitionsMapper.java @@ -367,7 +367,7 @@ private StoreDefinition readView(Element store, List stores) { .build(); } - private SerializerDefinition readSerializer(Element elmt) { + public static SerializerDefinition readSerializer(Element elmt) { String name = elmt.getChild(STORE_SERIALIZATION_TYPE_ELMT).getText(); boolean hasVersion = true; Map schemaInfosByVersion = new HashMap(); @@ -528,7 +528,7 @@ private Element viewToElement(StoreDefinition storeDefinition) { return store; } - private void addSerializer(Element parent, SerializerDefinition def) { + public static void addSerializer(Element parent, SerializerDefinition def) { parent.addContent(new Element(STORE_SERIALIZATION_TYPE_ELMT).setText(def.getName())); if(def.hasSchemaInfo()) { for(Map.Entry entry: def.getAllSchemaInfoVersions().entrySet()) {