diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index 756a650c5d..1af5334fd9 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -34,6 +34,8 @@ import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; +import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,6 +57,7 @@ import voldemort.serialization.Serializer; import voldemort.serialization.SerializerDefinition; import voldemort.serialization.SerializerFactory; +import voldemort.serialization.StringSerializer; import voldemort.server.rebalance.RebalancerState; import voldemort.store.StoreDefinition; import voldemort.store.compress.CompressionStrategy; @@ -130,6 +133,11 @@ public static void main(String[] args) throws Exception { .describedAs("store-names") .withValuesSeparatedBy(',') .ofType(String.class); + parser.accepts("store", "Store name for querying keys") + .withRequiredArg() + .describedAs("store-name") + .withValuesSeparatedBy(',') + .ofType(String.class); parser.accepts("add-stores", "Add stores in this stores.xml") .withRequiredArg() .describedAs("stores.xml containing just the new stores") @@ -226,6 +234,11 @@ public static void main(String[] args) throws Exception { .withRequiredArg() .describedAs("size-in-mb") .ofType(Long.class); + parser.accepts("query-keys", "Get values of keys on specific nodes") + .withRequiredArg() + .describedAs("query-keys") + .withValuesSeparatedBy(',') + .ofType(String.class); OptionSet options = parser.parse(args); @@ -323,6 +336,9 @@ public static void main(String[] args) throws Exception { } ops += "f"; } + if(options.has("query-keys")) { + ops += "q"; + } if(ops.length() < 1) { Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, " + "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, " @@ -496,6 +512,16 @@ public static void main(String[] args) throws Exception { long reserveMB = (Long) options.valueOf("reserve-memory"); adminClient.reserveMemory(nodeId, storeNames, reserveMB); } + if(ops.contains("q")) { + List keyList = null; + String storeName = (String) options.valueOf("store"); + if(storeName == null) { + throw new VoldemortException("Must specify store name using --store option (NOT --stores)"); + } + if(options.hasArgument("query-keys")) + keyList = (List) options.valuesOf("query-keys"); + executeQueryKeys(nodeId, adminClient, storeName, keyList); + } } catch(Exception e) { e.printStackTrace(); Utils.croak(e.getMessage()); @@ -599,6 +625,8 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-entries --url [url] --node [node-id]"); stream.println("\t9) Update entries for a set of stores using the output from a binary dump fetch entries"); stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]"); + stream.println("\t10) Query a store for a set of keys on a specific node. Notice that the --store option is not prural"); + stream.println("\t\t./bin/voldemort-admin-tool.sh --query-keys [comma-separated list of keys] --url [url] --node [node-id] --store [store name]"); stream.println(); stream.println("READ-ONLY OPERATIONS"); stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores"); @@ -1336,4 +1364,99 @@ private static void executeDeletePartitions(Integer nodeId, adminClient.deletePartitions(nodeId, store, partitionIdList, null); } } + + private static void executeQueryKeys(Integer nodeId, + AdminClient adminClient, + String storeName, + List keys) throws IOException { + Serializer serializer = new StringSerializer(); + List listKeys = new ArrayList(); + for(String key: keys) { + listKeys.add(new ByteArray(serializer.toBytes(key))); + } + final Iterator>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(), + storeName, + listKeys.iterator()); + List storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId) + .getValue(); + StoreDefinition storeDefinition = null; + for(StoreDefinition storeDef: storeDefinitionList) { + if(storeDef.getName().equals(storeName)) + storeDefinition = storeDef; + } + + // k-v serializer + SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer(); + SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer(); + SerializerFactory serializerFactory = new DefaultSerializerFactory(); + @SuppressWarnings("unchecked") + final Serializer keySerializer = (Serializer) serializerFactory.getSerializer(keySerializerDef); + @SuppressWarnings("unchecked") + final Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(valueSerializerDef); + + // compression strategy + final CompressionStrategy keyCompressionStrategy; + final CompressionStrategy valueCompressionStrategy; + if(keySerializerDef != null && keySerializerDef.hasCompression()) { + keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression()); + } else { + keyCompressionStrategy = null; + } + if(valueSerializerDef != null && valueSerializerDef.hasCompression()) { + valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression()); + } else { + valueCompressionStrategy = null; + } + + // write to stdout + writeAscii(null, new Writable() { + + @Override + public void writeTo(BufferedWriter out) throws IOException { + final StringWriter stringWriter = new StringWriter(); + final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter); + + while(iterator.hasNext()) { + Pair>, Exception>> kvPair = iterator.next(); + // unserialize and write key + byte[] keyBytes = kvPair.getFirst().get(); + Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes + : keyCompressionStrategy.inflate(keyBytes)); + generator.writeObject(keyObject); + + // iterate through, unserialize and write values + List> values = kvPair.getSecond().getFirst(); + if(values != null) { + for(Versioned versioned: values) { + VectorClock version = (VectorClock) versioned.getVersion(); + byte[] valueBytes = versioned.getValue(); + Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes + : valueCompressionStrategy.inflate(valueBytes)); + + stringWriter.write(", "); + stringWriter.write(version.toString()); + stringWriter.write('['); + stringWriter.write(new Date(version.getTimestamp()).toString()); + stringWriter.write(']'); + generator.writeObject(valueObject); + + } + } + // write out exception + if(kvPair.getSecond().getSecond() != null) { + stringWriter.write(", "); + stringWriter.write(kvPair.getSecond().getSecond().toString()); + } + + StringBuffer buf = stringWriter.getBuffer(); + if(buf.charAt(0) == ' ') { + buf.setCharAt(0, '\n'); + } + out.write(buf.toString()); + buf.setLength(0); + } + out.write('\n'); + } + }); + } } diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 21c0116904..179b5ee959 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -58,10 +58,12 @@ import voldemort.cluster.Node; import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; +import voldemort.server.RequestRoutingType; import voldemort.server.protocol.admin.AsyncOperationStatus; import voldemort.server.rebalance.RebalancerState; import voldemort.server.rebalance.VoldemortRebalancingException; import voldemort.store.ErrorCodeMapper; +import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.metadata.MetadataStore; import voldemort.store.metadata.MetadataStore.VoldemortState; @@ -73,6 +75,7 @@ import voldemort.store.slop.Slop; import voldemort.store.slop.Slop.Operation; import voldemort.store.socket.SocketDestination; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; import voldemort.store.views.ViewStorageConfiguration; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; @@ -510,6 +513,67 @@ public Pair> computeNext() { } + /** + * Fetch key/value tuples belonging to a node with given key values + * + *

+ * Entries are being queried synchronously as the iteration happens + * i.e. the whole result set is not buffered in memory. + * + * @param nodeId Id of the node to fetch from + * @param storeName Name of the store + * @param keys An Iterable of keys + * @return An iterator which allows entries to be streamed as they're being + * iterated over. + */ + public Iterator>, Exception>>> queryKeys(int nodeId, + String storeName, + final Iterator keys) { + + Node node = this.getAdminClientCluster().getNodeById(nodeId); + ClientConfig clientConfig = new ClientConfig(); + final Store store; + final ClientRequestExecutorPool clientPool = new ClientRequestExecutorPool(clientConfig.getSelectors(), + clientConfig.getMaxConnectionsPerNode(), + clientConfig.getConnectionTimeout(TimeUnit.MILLISECONDS), + clientConfig.getSocketTimeout(TimeUnit.MILLISECONDS), + clientConfig.getSocketBufferSize(), + clientConfig.getSocketKeepAlive()); + try { + store = clientPool.create(storeName, + node.getHost(), + node.getSocketPort(), + clientConfig.getRequestFormatType(), + RequestRoutingType.IGNORE_CHECKS); + + } catch(Exception e) { + clientPool.close(); + throw new VoldemortException(e); + } + + return new AbstractIterator>, Exception>>>() { + + @Override + public Pair>, Exception>> computeNext() { + ByteArray key; + Exception exception = null; + List> value = null; + if(!keys.hasNext()) { + clientPool.close(); + return endOfData(); + } else { + key = keys.next(); + } + try { + value = store.get(key, null); + } catch(Exception e) { + exception = e; + } + return Pair.create(key, Pair.create(value, exception)); + } + }; + } + /** * Legacy interface for fetching entries. See * {@link AdminClient#fetchKeys(int, String, HashMap, VoldemortFilter, boolean, Cluster, long)} diff --git a/test/unit/voldemort/client/AdminServiceBasicTest.java b/test/unit/voldemort/client/AdminServiceBasicTest.java index 8343bd6855..3adaf6aac0 100644 --- a/test/unit/voldemort/client/AdminServiceBasicTest.java +++ b/test/unit/voldemort/client/AdminServiceBasicTest.java @@ -21,6 +21,7 @@ import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -54,6 +55,7 @@ import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; import voldemort.server.VoldemortServer; +import voldemort.store.InvalidMetadataException; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinitionBuilder; @@ -67,6 +69,7 @@ import voldemort.store.socket.SocketStoreFactory; import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; import voldemort.utils.Utils; @@ -1255,6 +1258,245 @@ public void testFetch() { } + @Test + public void testQuery() { + HashMap belongToAndInsideServer0 = new HashMap(); + HashMap belongToAndInsideServer1 = new HashMap(); + HashMap notBelongServer0ButInsideServer0 = new HashMap(); + HashMap belongToServer0ButOutsideBoth = new HashMap(); + HashMap notBelongToServer0AndOutsideBoth = new HashMap(); + + Store store0 = getStore(0, testStoreName); + Store store1 = getStore(1, testStoreName); + + HashMap entrySet = null; + Iterator keys = null; + RoutingStrategy strategy = servers[0].getMetadataStore().getRoutingStrategy(testStoreName); + while(true) { + ByteArray key; + byte[] value; + if(keys == null || !keys.hasNext()) { + entrySet = ServerTestUtils.createRandomKeyValuePairs(100); + keys = entrySet.keySet().iterator(); + } + key = keys.next(); + value = entrySet.get(key); + List routedNodes = strategy.routeRequest(key.get()); + boolean keyShouldBeInNode0 = false; + boolean keyShouldBeInNode1 = false; + for(Node node: routedNodes) { + keyShouldBeInNode0 = keyShouldBeInNode0 || (node.getId() == 0); + keyShouldBeInNode1 = keyShouldBeInNode1 || (node.getId() == 1); + } + + if(belongToAndInsideServer0.size() < 10) { + if(keyShouldBeInNode0) { + belongToAndInsideServer0.put(key, value); + store0.put(key, new Versioned(value), null); + } + } else if(belongToAndInsideServer1.size() < 10) { + if(keyShouldBeInNode1) { + belongToAndInsideServer1.put(key, value); + store1.put(key, new Versioned(value), null); + } + } else if(notBelongServer0ButInsideServer0.size() < 5) { + if(!keyShouldBeInNode0) { + notBelongServer0ButInsideServer0.put(key, value); + store0.put(key, new Versioned(value), null); + } + } else if(belongToServer0ButOutsideBoth.size() < 5) { + if(keyShouldBeInNode0) { + belongToServer0ButOutsideBoth.put(key, value); + } + } else if(notBelongToServer0AndOutsideBoth.size() < 5) { + if(!keyShouldBeInNode0) { + notBelongToServer0AndOutsideBoth.put(key, value); + } + } else { + break; + } + } + + ArrayList belongToAndInsideServer0Keys = new ArrayList(belongToAndInsideServer0.keySet()); + ArrayList belongToAndInsideServer1Keys = new ArrayList(belongToAndInsideServer1.keySet()); + ArrayList notBelongServer0ButInsideServer0Keys = new ArrayList(notBelongServer0ButInsideServer0.keySet()); + ArrayList belongToServer0ButOutsideBothKeys = new ArrayList(belongToServer0ButOutsideBoth.keySet()); + ArrayList notBelongToServer0AndOutsideBothKeys = new ArrayList(notBelongToServer0AndOutsideBoth.keySet()); + + List queryKeys; + Iterator>, Exception>>> results; + Pair>, Exception>> entry; + // test one key on store 0 + queryKeys = new ArrayList(); + queryKeys.add(belongToAndInsideServer0Keys.get(0)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertEquals(queryKeys.get(0), entry.getFirst()); + assertNull("There should not be exception in response", entry.getSecond().getSecond()); + assertEquals("There should be only 1 value in versioned list", 1, entry.getSecond() + .getFirst() + .size()); + assertEquals("Two byte[] should be equal", + 0, + ByteUtils.compare(belongToAndInsideServer0.get(queryKeys.get(0)), + entry.getSecond().getFirst().get(0).getValue())); + assertFalse("There should be only one result", results.hasNext()); + + // test one key belongs to but not exists in server 0 + queryKeys = new ArrayList(); + queryKeys.add(belongToServer0ButOutsideBothKeys.get(0)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertFalse("There should not be more results", results.hasNext()); + assertEquals("Not the right key", queryKeys.get(0), entry.getFirst()); + assertNotNull("Response should be non-null", entry.getSecond()); + assertEquals("Value should be empty list", 0, entry.getSecond().getFirst().size()); + assertNull("There should not be exception", entry.getSecond().getSecond()); + + // test one key not exist and does not belong to server 0 + queryKeys = new ArrayList(); + queryKeys.add(notBelongToServer0AndOutsideBothKeys.get(0)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertFalse("There should not be more results", results.hasNext()); + assertEquals("Not the right key", queryKeys.get(0), entry.getFirst()); + assertNotNull("Response should be non-null", entry.getSecond()); + assertNull("Value should be null", entry.getSecond().getFirst()); + assertTrue("There should be InvalidMetadataException exception", + entry.getSecond().getSecond() instanceof InvalidMetadataException); + + // test one key that exists on server 0 but does not belong to server 0 + queryKeys = new ArrayList(); + queryKeys.add(notBelongServer0ButInsideServer0Keys.get(0)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertFalse("There should not be more results", results.hasNext()); + assertEquals("Not the right key", queryKeys.get(0), entry.getFirst()); + assertNotNull("Response should be non-null", entry.getSecond()); + assertNull("Value should be null", entry.getSecond().getFirst()); + assertTrue("There should be InvalidMetadataException exception", + entry.getSecond().getSecond() instanceof InvalidMetadataException); + + // test one key deleted + store0.delete(belongToAndInsideServer0Keys.get(4), null); + queryKeys = new ArrayList(); + queryKeys.add(belongToAndInsideServer0Keys.get(4)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertFalse("There should not be more results", results.hasNext()); + assertEquals("Not the right key", queryKeys.get(0), entry.getFirst()); + assertNotNull("Response should be non-null", entry.getSecond()); + assertEquals("Value should be empty list", 0, entry.getSecond().getFirst().size()); + assertNull("There should not be exception", entry.getSecond().getSecond()); + + // test empty request + queryKeys = new ArrayList(); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertFalse("Results should be empty", results.hasNext()); + + // test null key + queryKeys = new ArrayList(); + queryKeys.add(null); + assertEquals(1, queryKeys.size()); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + entry = results.next(); + assertFalse("There should not be more results", results.hasNext()); + assertNotNull("Response should be non-null", entry.getSecond()); + assertNull("Value should be null", entry.getSecond().getFirst()); + assertTrue("There should be IllegalArgumentException exception", + entry.getSecond().getSecond() instanceof IllegalArgumentException); + + // test multiple keys (3) on store 1 + queryKeys = new ArrayList(); + queryKeys.add(belongToAndInsideServer1Keys.get(0)); + queryKeys.add(belongToAndInsideServer1Keys.get(1)); + queryKeys.add(belongToAndInsideServer1Keys.get(2)); + results = getAdminClient().queryKeys(1, testStoreName, queryKeys.iterator()); + assertTrue("Results should not be empty", results.hasNext()); + Map>> entries = new HashMap>>(); + int resultCount = 0; + while(results.hasNext()) { + resultCount++; + entry = results.next(); + assertNull("There should not be exception in response", entry.getSecond().getSecond()); + assertNotNull("Value should not be null for Key: ", entry.getSecond().getFirst()); + entries.put(entry.getFirst(), entry.getSecond().getFirst()); + } + assertEquals("There should 3 and only 3 results", 3, resultCount); + for(ByteArray key: queryKeys) { + // this loop and the count ensure one-to-one mapping + assertNotNull("This key should exist in the results: " + key, entries.get(key)); + assertEquals("Two byte[] should be equal for key: " + key, + 0, + ByteUtils.compare(belongToAndInsideServer1.get(key), + entries.get(key).get(0).getValue())); + } + + // test multiple keys, mixed situation + // key 0: Exists and belongs to + // key 1: Exists but does not belong to + // key 2: Does not exist but belongs to + // key 3: Does not belong and not exist + // key 4: Same situation with key0 + // key 5: Deleted + // key 6: Same situation with key2 + store0.delete(belongToAndInsideServer0Keys.get(5), null); + queryKeys = new ArrayList(); + queryKeys.add(belongToAndInsideServer0Keys.get(2)); + queryKeys.add(notBelongServer0ButInsideServer0Keys.get(1)); + queryKeys.add(belongToServer0ButOutsideBothKeys.get(1)); + queryKeys.add(notBelongToServer0AndOutsideBothKeys.get(1)); + queryKeys.add(belongToAndInsideServer0Keys.get(3)); + queryKeys.add(belongToAndInsideServer0Keys.get(5)); + queryKeys.add(notBelongServer0ButInsideServer0Keys.get(2)); + results = getAdminClient().queryKeys(0, testStoreName, queryKeys.iterator()); + // key 0 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(0).get(), entry.getFirst().get())); + assertEquals(0, ByteUtils.compare(belongToAndInsideServer0.get(queryKeys.get(0)), + entry.getSecond().getFirst().get(0).getValue())); + assertNull(entry.getSecond().getSecond()); + // key 1 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(1).get(), entry.getFirst().get())); + assertTrue("There should be InvalidMetadataException exception", + entry.getSecond().getSecond() instanceof InvalidMetadataException); + // key 2 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(2).get(), entry.getFirst().get())); + assertEquals(0, entry.getSecond().getFirst().size()); + assertNull(entry.getSecond().getSecond()); + // key 3 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(3).get(), entry.getFirst().get())); + assertTrue("There should be InvalidMetadataException exception", + entry.getSecond().getSecond() instanceof InvalidMetadataException); + // key 4 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(4).get(), entry.getFirst().get())); + assertEquals(0, ByteUtils.compare(belongToAndInsideServer0.get(queryKeys.get(4)), + entry.getSecond().getFirst().get(0).getValue())); + assertNull(entry.getSecond().getSecond()); + // key 5 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(5).get(), entry.getFirst().get())); + assertEquals(0, entry.getSecond().getFirst().size()); + assertNull(entry.getSecond().getSecond()); + // key 6 + entry = results.next(); + assertEquals(0, ByteUtils.compare(queryKeys.get(6).get(), entry.getFirst().get())); + assertTrue("There should be InvalidMetadataException exception", + entry.getSecond().getSecond() instanceof InvalidMetadataException); + // no more keys + assertFalse(results.hasNext()); + } + @Test public void testUpdate() { final HashMap entrySet = ServerTestUtils.createRandomKeyValuePairs(TEST_STREAM_KEYS_SIZE);