Permalink
Browse files

BOOKKEEPER-204: Provide a MetaStore interface, and a mock implementat…

…ion. (Jiannan Wang via ivank)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1407520 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent c32437a commit 9b7b78890c2f0631a2b62bbbaa8de35ede102c0e @ivankelly ivankelly committed Nov 9, 2012
Showing with 2,330 additions and 0 deletions.
  1. +2 −0 CHANGES.txt
  2. +214 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MSException.java
  3. +79 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetaStore.java
  4. +25 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreCallback.java
  5. +84 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreCursor.java
  6. +34 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreException.java
  7. +33 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreFactory.java
  8. +104 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreScannableTable.java
  9. +172 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreTable.java
  10. +70 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreTableItem.java
  11. +154 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/Value.java
  12. +81 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetaStore.java
  13. +98 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetastoreCursor.java
  14. +341 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetastoreTable.java
  15. +59 −0 ...er/src/test/java/org/apache/bookkeeper/metastore/MetastoreScannableTableAsyncToSyncConverter.java
  16. +131 −0 ...eper-server/src/test/java/org/apache/bookkeeper/metastore/MetastoreTableAsyncToSyncConverter.java
  17. +649 −0 bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
View
2 CHANGES.txt
@@ -180,6 +180,8 @@ Trunk (unreleased changes)
BOOKKEEPER-444: Refactor pending read op to make speculative reads possible (ivank)
+ BOOKKEEPER-204: Provide a MetaStore interface, and a mock implementation. (Jiannan Wang via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
View
214 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MSException.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+@SuppressWarnings("serial")
+public abstract class MSException extends Exception {
+
+ /**
+ * return codes
+ */
+ public static enum Code {
+ OK (0, "OK"),
+ BadVersion (-1, "Version conflict"),
+ NoKey (-2, "Key does not exist"),
+ KeyExists (-3, "Key exists"),
+ NoEntries (-4, "No entries found"),
+
+ InterruptedException (-100, "Operation interrupted"),
+ IllegalOp (-101, "Illegal operation"),
+ ServiceDown (-102, "Metadata service is down"),
+ OperationFailure(-103, "Operaion failed on metadata storage server side");
+
+ private static final Map<Integer, Code> codes
+ = new HashMap<Integer, Code>();
+
+ static {
+ for (Code c : EnumSet.allOf(Code.class)) {
+ codes.put(c.code, c);
+ }
+ }
+
+ private final int code;
+ private final String description;
+
+ private Code(int code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ /**
+ * Get the int value for a particular Code.
+ *
+ * @return error code as integer
+ */
+ public int getCode() {
+ return code;
+ }
+
+ /**
+ * Get the description for a particular Code.
+ *
+ * @return error description
+ */
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Get the Code value for a particular integer error code.
+ *
+ * @param code int error code
+ * @return Code value corresponding to specified int code, or null.
+ */
+ public static Code get(int code) {
+ return codes.get(code);
+ }
+ }
+
+ private final Code code;
+
+ MSException(Code code, String errMsg) {
+ super(code.getDescription() + " : " + errMsg);
+ this.code = code;
+ }
+
+ MSException(Code code, String errMsg, Throwable cause) {
+ super(code.getDescription() + " : " + errMsg, cause);
+ this.code = code;
+ }
+
+ public Code getCode() {
+ return this.code;
+ }
+
+ public static MSException create(Code code) {
+ return create(code, "", null);
+ }
+
+ public static MSException create(Code code, String errMsg) {
+ return create(code, errMsg, null);
+ }
+
+ public static MSException create(Code code, String errMsg, Throwable cause) {
+ switch (code) {
+ case BadVersion:
+ return new BadVersionException(errMsg, cause);
+ case NoKey:
+ return new NoKeyException(errMsg, cause);
+ case KeyExists:
+ return new KeyExistsException(errMsg, cause);
+ case InterruptedException:
+ return new MSInterruptedException(errMsg, cause);
+ case IllegalOp:
+ return new IllegalOpException(errMsg, cause);
+ case ServiceDown:
+ return new ServiceDownException(errMsg, cause);
+ case OperationFailure:
+ return new OperationFailureException(errMsg, cause);
+ case OK:
+ default:
+ throw new IllegalArgumentException("Invalid exception code");
+ }
+ }
+
+ public static class BadVersionException extends MSException {
+ public BadVersionException(String errMsg) {
+ super(Code.BadVersion, errMsg);
+ }
+
+ public BadVersionException(String errMsg, Throwable cause) {
+ super(Code.BadVersion, errMsg, cause);
+ }
+ }
+
+ public static class NoKeyException extends MSException {
+ public NoKeyException(String errMsg) {
+ super(Code.NoKey, errMsg);
+ }
+
+ public NoKeyException(String errMsg, Throwable cause) {
+ super(Code.NoKey, errMsg, cause);
+ }
+ }
+
+ // Exception would be thrown in a cursor if no entries found
+ public static class NoEntriesException extends MSException {
+ public NoEntriesException(String errMsg) {
+ super(Code.NoEntries, errMsg);
+ }
+
+ public NoEntriesException(String errMsg, Throwable cause) {
+ super(Code.NoEntries, errMsg, cause);
+ }
+ }
+
+ public static class KeyExistsException extends MSException {
+ public KeyExistsException(String errMsg) {
+ super(Code.KeyExists, errMsg);
+ }
+
+ public KeyExistsException(String errMsg, Throwable cause) {
+ super(Code.KeyExists, errMsg, cause);
+ }
+ }
+
+ public static class MSInterruptedException extends MSException {
+ public MSInterruptedException(String errMsg) {
+ super(Code.InterruptedException, errMsg);
+ }
+
+ public MSInterruptedException(String errMsg, Throwable cause) {
+ super(Code.InterruptedException, errMsg, cause);
+ }
+ }
+
+ public static class IllegalOpException extends MSException {
+ public IllegalOpException(String errMsg) {
+ super(Code.IllegalOp, errMsg);
+ }
+
+ public IllegalOpException(String errMsg, Throwable cause) {
+ super(Code.IllegalOp, errMsg, cause);
+ }
+ }
+
+ public static class ServiceDownException extends MSException {
+ public ServiceDownException(String errMsg) {
+ super(Code.ServiceDown, errMsg);
+ }
+
+ public ServiceDownException(String errMsg, Throwable cause) {
+ super(Code.ServiceDown, errMsg, cause);
+ }
+ }
+
+ public static class OperationFailureException extends MSException {
+ public OperationFailureException(String errMsg) {
+ super(Code.OperationFailure, errMsg);
+ }
+
+ public OperationFailureException(String errMsg, Throwable cause) {
+ super(Code.OperationFailure, errMsg, cause);
+ }
+ }
+}
View
79 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetaStore.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Metadata Store Interface.
+ */
+public interface MetaStore {
+ /**
+ * Return the name of the plugin.
+ *
+ * @return the plugin name.
+ */
+ public String getName();
+
+ /**
+ * Get the plugin verison.
+ *
+ * @return the plugin version.
+ */
+ public int getVersion();
+
+ /**
+ * Initialize the meta store.
+ *
+ * @param config
+ * Configuration object passed to metastore
+ * @param msVersion
+ * Version to initialize the metastore
+ * @throws MetastoreException when failed to initialize
+ */
+ public void init(Configuration config, int msVersion)
+ throws MetastoreException;
+
+ /**
+ * Close the meta store.
+ */
+ public void close();
+
+ /**
+ * Create a metastore table.
+ *
+ * @param name
+ * Table name.
+ * @return a metastore table
+ * @throws MetastoreException when failed to create the metastore table.
+ */
+ public MetastoreTable createTable(String name)
+ throws MetastoreException;
+
+ /**
+ * Create a scannable metastore table.
+ *
+ * @param name
+ * Table name.
+ * @return a metastore scannable table
+ * @throws MetastoreException when failed to create the metastore table.
+ */
+ public MetastoreScannableTable createScannableTable(String name)
+ throws MetastoreException;
+
+}
View
25 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreCallback.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+public interface MetastoreCallback<T> {
+ /**
+ * @see MSException.Code
+ */
+ public void complete(int rc, T value, Object ctx);
+}
View
84 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreCursor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface MetastoreCursor extends Closeable {
+
+ public static MetastoreCursor EMPTY_CURSOR = new MetastoreCursor() {
+ @Override
+ public boolean hasMoreEntries() {
+ return false;
+ }
+
+ @Override
+ public Iterator<MetastoreTableItem> readEntries(int numEntries)
+ throws MSException {
+ throw new MSException.NoEntriesException("No entries left in the cursor.");
+ }
+
+ @Override
+ public void asyncReadEntries(int numEntries, ReadEntriesCallback callback, Object ctx) {
+ callback.complete(MSException.Code.NoEntries.getCode(), null, ctx);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ };
+
+ public static interface ReadEntriesCallback extends
+ MetastoreCallback<Iterator<MetastoreTableItem>> {
+ }
+
+ /**
+ * Is there any entries left in the cursor to read.
+ *
+ * @return true if there is entries left, false otherwise.
+ */
+ public boolean hasMoreEntries();
+
+ /**
+ * Read entries from the cursor, up to the specified <code>numEntries</code>.
+ * The returned list can be smaller.
+ *
+ * @param numEntries
+ * maximum number of entries to read
+ * @return the iterator of returned entries.
+ * @throws MSException when failed to read entries from the cursor.
+ */
+ public Iterator<MetastoreTableItem> readEntries(int numEntries)
+ throws MSException;
+
+ /**
+ * Asynchronously read entries from the cursor, up to the specified <code>numEntries</code>.
+ *
+ * @see #readEntries(int)
+ * @param numEntries
+ * maximum number of entries to read
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ public void asyncReadEntries(int numEntries, ReadEntriesCallback callback, Object ctx);
+}
View
34 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+@SuppressWarnings("serial")
+public class MetastoreException extends Exception {
+
+ public MetastoreException(String message) {
+ super(message);
+ }
+
+ public MetastoreException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public MetastoreException(Throwable t) {
+ super(t);
+ }
+}
View
33 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import org.apache.bookkeeper.util.ReflectionUtils;
+
+public class MetastoreFactory {
+
+ public static MetaStore createMetaStore(String name)
+ throws MetastoreException {
+ try {
+ return ReflectionUtils.newInstance(name, MetaStore.class);
+ } catch (Throwable t) {
+ throw new MetastoreException("Failed to instantiate metastore : " + name);
+ }
+ }
+
+}
View
104 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreScannableTable.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.Set;
+
+public interface MetastoreScannableTable extends MetastoreTable {
+
+ // Used by cursor, etc when they want to start at the beginning of a table
+ public static final String EMPTY_START_KEY = null;
+ // Last row in a table.
+ public static final String EMPTY_END_KEY = null;
+ // the order to loop over a table
+ public static enum Order {
+ ASC,
+ DESC
+ }
+
+ /**
+ * Open a cursor to loop over the entries belonging to a key range,
+ * which returns all fields for each entry.
+ *
+ * <p>
+ * Return Code:<br/>
+ * {@link MSException.Code.OK}: an opened cursor<br/>
+ * {@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}:
+ * other issues
+ * </p>
+ *
+ * @param firstKey
+ * Key to start scanning. If it is {@link EMPTY_START_KEY}, it starts
+ * from first key (inclusive).
+ * @param firstInclusive
+ * true if firstKey is to be included in the returned view.
+ * @param lastKey
+ * Key to stop scanning. If it is {@link EMPTY_END_KEY}, scan ends at
+ * the lastKey of the table (inclusive).
+ * @param lastInclusive
+ * true if lastKey is to be included in the returned view.
+ * @param order
+ * the order to loop over the entries
+ * @param cb
+ * Callback to return an opened cursor.
+ * @param ctx
+ * Callback context
+ */
+ public void openCursor(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order,
+ MetastoreCallback<MetastoreCursor> cb,
+ Object ctx);
+
+ /**
+ * Open a cursor to loop over the entries belonging to a key range,
+ * which returns the specified <code>fields</code> for each entry.
+ *
+ * <p>
+ * Return Code:<br/>
+ * {@link MSException.Code.OK}: an opened cursor<br/>
+ * {@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}:
+ * other issues
+ * </p>
+ *
+ * @param firstKey
+ * Key to start scanning. If it is {@link EMPTY_START_KEY}, it starts
+ * from first key (inclusive).
+ * @param firstInclusive
+ * true if firstKey is to be included in the returned view.
+ * @param lastKey
+ * Key to stop scanning. If it is {@link EMPTY_END_KEY}, scan ends at
+ * the lastKey of the table (inclusive).
+ * @param lastInclusive
+ * true if lastKey is to be included in the returned view.
+ * @param order
+ * the order to loop over the entries
+ * @param fields
+ * Fields to select
+ * @param cb
+ * Callback to return an opened cursor.
+ * @param ctx
+ * Callback context
+ */
+ public void openCursor(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order, Set<String> fields,
+ MetastoreCallback<MetastoreCursor> cb,
+ Object ctx);
+
+}
View
172 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreTable.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+public interface MetastoreTable {
+
+ // select all fields when reading or scanning entries
+ public static final Set<String> ALL_FIELDS = null;
+ // select non fields to return when reading/scanning entries
+ public static final Set<String> NON_FIELDS = new HashSet<String>();
+
+ /**
+ * Get table name.
+ *
+ * @return table name
+ */
+ public String getName();
+
+ /**
+ * Get all fields of a key.
+ *
+ * <p>
+ * Return Code:<ul>
+ * <li>{@link MSException.Code.OK}: success returning the key</li>
+ * <li>{@link MSException.Code.NoKey}: no key found</li>
+ * <li>{@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}: other issues</li>
+ * </ul></p>
+ *
+ * @param key
+ * Key Name
+ * @param cb
+ * Callback to return all fields of the key
+ * @param ctx
+ * Callback context
+ */
+ public void get(String key, MetastoreCallback<Versioned<Value>> cb, Object ctx);
+
+ /**
+ * Get specified fields of a key.
+ *
+ * <p>
+ * Return Code:<ul>
+ * <li>{@link MSException.Code.OK}: success returning the key</li>
+ * <li>{@link MSException.Code.NoKey}: no key found</li>
+ * <li>{@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}: other issues</li>
+ * </ul></p>
+ *
+ * @param key
+ * Key Name
+ * @param fields
+ * Fields to return
+ * @param cb
+ * Callback to return specified fields of the key
+ * @param ctx
+ * Callback context
+ */
+ public void get(String key, Set<String> fields,
+ MetastoreCallback<Versioned<Value>> cb, Object ctx);
+
+ /**
+ * Update a key according to its version.
+ *
+ * <p>
+ * Return Code:<ul>
+ * <li>{@link MSException.Code.OK}: success updating the key</li>
+ * <li>{@link MSException.Code.BadVersion}: failed to update the key due to bad version</li>
+ * <li>{@link MSException.Code.NoKey}: no key found to update data, if not provided {@link Version.NEW}</li>
+ * <li>{@link MSException.Code.KeyExists}: entry exists providing {@link Version.NEW}</li>
+ * <li>{@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}: other issues</li>
+ * </ul></p>
+ *
+ * The key is updated only when the version matches its current version.
+ * In particular, if the provided version is:<ul>
+ * <li>{@link Version.ANY}: update the data without comparing its version.
+ * <b>Note this usage is not encouraged since it may mess up data consistency.</b></li>
+ * <li>{@link Version.NEW}: create the entry if it doesn't exist before;
+ * Otherwise return {@link MSException.Code.KeyExists}.</li>
+ * </ul>
+ *
+ * @param key
+ * Key Name
+ * @param value
+ * Value to update.
+ * @param version
+ * Version specified to update.
+ * @param cb
+ * Callback to return new version after updated.
+ * @param ctx
+ * Callback context
+ */
+ public void put(String key, Value value, Version version, MetastoreCallback<Version> cb, Object ctx);
+
+ /**
+ * Remove a key by its version.
+ *
+ * The key is removed only when the version matches its current version.
+ * If <code>version</code> is {@link Version.ANY}, the key would be removed directly.
+ *
+ * <p>
+ * Return Code:<ul>
+ * <li>{@link MSException.Code.OK}: success updating the key</li>
+ * <li>{@link MSException.Code.NoKey}: if the key doesn't exist.</li>
+ * <li>{@link MSException.Code.BadVersion}: failed to delete the key due to bad version</li>
+ * <li>{@link MSException.Code.IllegalOp}/{@link MSException.Code.ServiceDown}: other issues</li>
+ * </ul></p>
+ *
+ * @param key
+ * Key Name.
+ * @param version
+ * Version specified to remove.
+ * @param cb
+ * Callback to return all fields of the key
+ * @param ctx
+ * Callback context
+ */
+ public void remove(String key, Version version,
+ MetastoreCallback<Void> cb, Object ctx);
+
+ /**
+ * Open a cursor to loop over all the entries of the table,
+ * which returns all fields for each entry.
+ * The returned cursor doesn't need to guarantee any order,
+ * since the underlying might be a hash table or an order table.
+ *
+ * @param cb
+ * Callback to return an opened cursor
+ * @param ctx
+ * Callback context
+ */
+ public void openCursor(MetastoreCallback<MetastoreCursor> cb, Object ctx);
+
+ /**
+ * Open a cursor to loop over all the entries of the table,
+ * which returns the specified <code>fields</code> for each entry.
+ * The returned cursor doesn't need to guarantee any order,
+ * since the underlying might be a hash table or an order table.
+ *
+ * @param fields
+ * Fields to select
+ * @param cb
+ * Callback to return an opened cursor
+ * @param ctx
+ * Callback context
+ */
+ public void openCursor(Set<String> fields,
+ MetastoreCallback<MetastoreCursor> cb, Object ctx);
+
+ /**
+ * Close the table.
+ */
+ public void close();
+}
View
70 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreTableItem.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import org.apache.bookkeeper.versioning.Versioned;
+
+/**
+ * Identify an item in a metastore table.
+ */
+public class MetastoreTableItem {
+
+ private String key;
+ private Versioned<Value> value;
+
+ public MetastoreTableItem(String key, Versioned<Value> value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Get the key of the table item.
+ *
+ * @return key of table item.
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Set the key of the item.
+ *
+ * @param key Key
+ */
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ /**
+ * Get the value of the item.
+ *
+ * @return value of the item.
+ */
+ public Versioned<Value> getValue() {
+ return value;
+ }
+
+ /**
+ * Set the value of the item.
+ *
+ * @return value of the item.
+ */
+ public void setValue(Versioned<Value> value) {
+ this.value = value;
+ }
+}
View
154 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/Value.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
+
+import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
+
+public class Value {
+ static final Comparator<byte[]> comparator =
+ UnsignedBytes.lexicographicalComparator();
+
+ protected Map<String, byte[]> fields;
+
+ public Value() {
+ fields = new HashMap<String, byte[]>();
+ }
+
+ public Value(Value v) {
+ fields = new HashMap<String, byte[]>(v.fields);
+ }
+
+ public byte[] getField(String field) {
+ return fields.get(field);
+ }
+
+ public Value setField(String field, byte[] data) {
+ fields.put(field, data);
+ return this;
+ }
+
+ public Value clearFields() {
+ fields.clear();
+ return this;
+ }
+
+ public Set<String> getFields() {
+ return fields.keySet();
+ }
+
+ public Map<String, byte[]> getFieldsMap() {
+ return Collections.unmodifiableMap(fields);
+ }
+
+ /**
+ * Select parts of fields.
+ *
+ * @param fields
+ * Parts of fields
+ * @return new value with specified fields
+ */
+ public Value project(Set<String> fields) {
+ if (ALL_FIELDS == fields) {
+ return new Value(this);
+ }
+ Value v = new Value();
+ for (String f : fields) {
+ byte[] data = this.fields.get(f);
+ v.setField(f, data);
+ }
+ return v;
+ }
+
+ @Override
+ public int hashCode() {
+ HashFunction hf = Hashing.murmur3_32();
+ Hasher hc = hf.newHasher();
+ for (String key : fields.keySet()) {
+ hc.putString(key);
+ }
+ return hc.hash().asInt();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Value)) {
+ return false;
+ }
+ Value other = (Value) o;
+ if (fields.size() != other.fields.size()) {
+ return false;
+ }
+ for (String f : fields.keySet()) {
+ byte[] v1 = fields.get(f);
+ byte[] v2 = other.fields.get(f);
+ if (0 != comparator.compare(v1, v2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Merge other value.
+ *
+ * @param other
+ * Other Value
+ */
+ public Value merge(Value other) {
+ for (Map.Entry<String, byte[]> entry : other.fields.entrySet()) {
+ if (null == entry.getValue()) {
+ fields.remove(entry.getKey());
+ } else {
+ fields.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (Map.Entry<String, byte[]> entry : fields.entrySet()) {
+ String f = entry.getKey();
+ if (null == f) {
+ f = "NULL";
+ }
+ String value;
+ if (null == entry.getValue()) {
+ value = "NONE";
+ } else {
+ value = new String(entry.getValue());
+ }
+ sb.append("('").append(f).append("'=").append(value).append(")");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
View
81 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetaStore.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.bookkeeper.metastore.MetaStore;
+import org.apache.bookkeeper.metastore.MetastoreException;
+import org.apache.bookkeeper.metastore.MetastoreTable;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.commons.configuration.Configuration;
+
+public class MockMetaStore implements MetaStore {
+
+ static final int CUR_VERSION = 1;
+
+ static Map<String, MockMetastoreTable> tables =
+ new HashMap<String, MockMetastoreTable>();
+
+ // for test
+ public static void reset() {
+ tables.clear();
+ }
+
+ @Override
+ public String getName() {
+ return getClass().getName();
+ }
+
+ @Override
+ public int getVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public void init(Configuration conf, int msVersion)
+ throws MetastoreException {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public MetastoreTable createTable(String name) {
+ return createMockTable(name);
+ }
+
+ @Override
+ public MetastoreScannableTable createScannableTable(String name) {
+ return createMockTable(name);
+ }
+
+ private MockMetastoreTable createMockTable(String name) {
+ MockMetastoreTable t = tables.get(name);
+ if (t == null) {
+ t = new MockMetastoreTable(this, name);
+ tables.put(name, t);
+ }
+ return t;
+ }
+
+}
View
98 ...keeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetastoreCursor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore.mock;
+
+import static org.apache.bookkeeper.metastore.mock.MockMetastoreTable.cloneValue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.metastore.MSException;
+import org.apache.bookkeeper.metastore.MSException.Code;
+import org.apache.bookkeeper.metastore.MetastoreCursor;
+import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.Value;
+import org.apache.bookkeeper.versioning.Versioned;
+
+class MockMetastoreCursor implements MetastoreCursor {
+
+ private final ScheduledExecutorService scheduler;
+ private final Iterator<Map.Entry<String, Versioned<Value>>> iter;
+ private final Set<String> fields;
+
+ public MockMetastoreCursor(NavigableMap<String, Versioned<Value>> map, Set<String> fields,
+ ScheduledExecutorService scheduler) {
+ this.iter = map.entrySet().iterator();
+ this.fields = fields;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public boolean hasMoreEntries() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Iterator<MetastoreTableItem> readEntries(int numEntries)
+ throws MSException {
+ if (numEntries < 0) {
+ throw MSException.create(Code.IllegalOp);
+ }
+ return unsafeReadEntries(numEntries);
+ }
+
+ @Override
+ public void asyncReadEntries(final int numEntries, final ReadEntriesCallback cb, final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ if (numEntries < 0) {
+ cb.complete(Code.IllegalOp.getCode(), null, ctx);
+ return;
+ }
+ Iterator<MetastoreTableItem> result = unsafeReadEntries(numEntries);
+ cb.complete(Code.OK.getCode(), result, ctx);
+ }
+ });
+ }
+
+ private Iterator<MetastoreTableItem> unsafeReadEntries(int numEntries) {
+ List<MetastoreTableItem> entries = new ArrayList<MetastoreTableItem>();
+ int nCount = 0;
+ while (iter.hasNext() && nCount < numEntries) {
+ Map.Entry<String, Versioned<Value>> entry = iter.next();
+ Versioned<Value> value = entry.getValue();
+ Versioned<Value> vv = cloneValue(value.getValue(), value.getVersion(), fields);
+ String key = entry.getKey();
+ entries.add(new MetastoreTableItem(key, vv));
+ ++nCount;
+ }
+ return entries.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
View
341 bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/mock/MockMetastoreTable.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore.mock;
+
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.metastore.MSException.Code;
+import org.apache.bookkeeper.metastore.MetastoreCallback;
+import org.apache.bookkeeper.metastore.MetastoreCursor;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.Value;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+public class MockMetastoreTable implements MetastoreScannableTable {
+
+ public static class MockVersion implements Version {
+ int version;
+
+ public MockVersion(int v) {
+ this.version = v;
+ }
+
+ public MockVersion(MockVersion v) {
+ this.version = v.version;
+ }
+
+ public synchronized MockVersion incrementVersion() {
+ ++version;
+ return this;
+ }
+
+ @Override
+ public Occurred compare(Version v) {
+ if (null == v) {
+ throw new NullPointerException("Version is not allowed to be null.");
+ }
+ if (v == Version.NEW) {
+ return Occurred.AFTER;
+ } else if (v == Version.ANY) {
+ return Occurred.CONCURRENTLY;
+ } else if (!(v instanceof MockVersion)) {
+ throw new IllegalArgumentException("Invalid version type");
+ }
+ MockVersion mv = (MockVersion)v;
+ int res = version - mv.version;
+ if (res == 0) {
+ return Occurred.CONCURRENTLY;
+ } else if (res < 0) {
+ return Occurred.BEFORE;
+ } else {
+ return Occurred.AFTER;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (null == obj ||
+ !(obj instanceof MockVersion)) {
+ return false;
+ }
+ MockVersion v = (MockVersion)obj;
+ return 0 == (version - v.version);
+ }
+
+ @Override
+ public String toString() {
+ return "version=" + version;
+ }
+
+ @Override
+ public int hashCode() {
+ return version;
+ }
+ }
+
+ private String name;
+ private TreeMap<String, Versioned<Value>> map = null;
+ private ScheduledExecutorService scheduler;
+
+ public MockMetastoreTable(MockMetaStore metastore, String name) {
+ this.map = new TreeMap<String, Versioned<Value>>();
+ this.name = name;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public String getName () {
+ return this.name;
+ }
+
+ static Versioned<Value> cloneValue(Value value, Version version, Set<String> fields) {
+ if (null != value) {
+ Value newValue = new Value();
+ if (ALL_FIELDS == fields) {
+ fields = value.getFields();
+ }
+ for (String f : fields) {
+ newValue.setField(f, value.getField(f));
+ }
+ value = newValue;
+ }
+
+ if (null == version) {
+ throw new NullPointerException("Version isn't allowed to be null.");
+ }
+ if (Version.ANY != version && Version.NEW != version) {
+ if (version instanceof MockVersion) {
+ version = new MockVersion(((MockVersion)version).version);
+ } else {
+ throw new IllegalStateException("Wrong version type.");
+ }
+ }
+ return new Versioned<Value>(value, version);
+ }
+
+ @Override
+ public void get(final String key, final MetastoreCallback<Versioned<Value>> cb, final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ scheduleGet(key, ALL_FIELDS, cb, ctx);
+ }
+ });
+ }
+
+ @Override
+ public void get(final String key, final Set<String> fields, final MetastoreCallback<Versioned<Value>> cb,
+ final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ scheduleGet(key, fields, cb, ctx);
+ }
+ });
+ }
+
+ public synchronized void scheduleGet(String key, Set<String> fields, MetastoreCallback<Versioned<Value>> cb,
+ Object ctx) {
+ if (null == key) {
+ cb.complete(Code.IllegalOp.getCode(), null, ctx);
+ return;
+ }
+ Versioned<Value> vv = mockGet(key);
+ int rc = null == vv ? Code.NoKey.getCode() : Code.OK.getCode();
+ if (vv != null) {
+ vv = cloneValue(vv.getValue(), vv.getVersion(), fields);
+ }
+ cb.complete(rc, vv, ctx);
+ }
+
+ @Override
+ public void put(final String key, final Value value, final Version version, final MetastoreCallback<Version> cb,
+ final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ if (null == key || null == value || null == version) {
+ cb.complete(Code.IllegalOp.getCode(), null, ctx);
+ return;
+ }
+ Result<Version> result = mockPut(key, value, version);
+ cb.complete(result.code.getCode(), result.value, ctx);
+ }
+ });
+ }
+
+ @Override
+ public void remove(final String key, final Version version, final MetastoreCallback<Void> cb, final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ if (null == key || null == version) {
+ cb.complete(Code.IllegalOp.getCode(), null, ctx);
+ return;
+ }
+ Code code = mockRemove(key, version);
+ cb.complete(code.getCode(), null, ctx);
+ }
+ });
+ }
+
+ @Override
+ public void openCursor(MetastoreCallback<MetastoreCursor> cb, Object ctx) {
+ openCursor(EMPTY_START_KEY, true, EMPTY_END_KEY, true, Order.ASC,
+ ALL_FIELDS, cb, ctx);
+ }
+
+ @Override
+ public void openCursor(Set<String> fields,
+ MetastoreCallback<MetastoreCursor> cb, Object ctx) {
+ openCursor(EMPTY_START_KEY, true, EMPTY_END_KEY, true, Order.ASC,
+ fields, cb, ctx);
+ }
+
+ @Override
+ public void openCursor(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order, MetastoreCallback<MetastoreCursor> cb,
+ Object ctx) {
+ openCursor(firstKey, firstInclusive, lastKey, lastInclusive,
+ order, ALL_FIELDS, cb, ctx);
+ }
+
+ @Override
+ public void openCursor(final String firstKey, final boolean firstInclusive,
+ final String lastKey, final boolean lastInclusive,
+ final Order order, final Set<String> fields,
+ final MetastoreCallback<MetastoreCursor> cb, final Object ctx) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ Result<MetastoreCursor> result = mockOpenCursor(firstKey, firstInclusive, lastKey, lastInclusive,
+ order, fields);
+ cb.complete(result.code.getCode(), result.value, ctx);
+ }
+ });
+ }
+
+ private synchronized Versioned<Value> mockGet(String key) {
+ return map.get(key);
+ }
+
+ private synchronized Code mockRemove(String key, Version version) {
+ Versioned<Value> vv = map.get(key);
+ if (null == vv) {
+ return Code.NoKey;
+ }
+ if (Version.Occurred.CONCURRENTLY != vv.getVersion().compare(version)) {
+ return Code.BadVersion;
+ }
+ map.remove(key);
+ return Code.OK;
+ }
+
+ static class Result<T> {
+ Code code;
+ T value;
+
+ public Result(Code code, T value) {
+ this.code = code;
+ this.value = value;
+ }
+ }
+
+ private synchronized Result<Version> mockPut(String key, Value value, Version version) {
+ Versioned<Value> vv = map.get(key);
+ if (vv == null) {
+ if (Version.NEW != version) {
+ return new Result<Version>(Code.NoKey, null);
+ }
+ vv = cloneValue(value, version, ALL_FIELDS);
+ vv.setVersion(new MockVersion(0));
+ map.put(key, vv);
+ return new Result<Version>(Code.OK, new MockVersion(0));
+ }
+ if (Version.NEW == version) {
+ return new Result<Version>(Code.KeyExists, null);
+ }
+ if (Version.Occurred.CONCURRENTLY != vv.getVersion().compare(version)) {
+ return new Result<Version>(Code.BadVersion, null);
+ }
+ vv.setVersion(((MockVersion)vv.getVersion()).incrementVersion());
+ vv.setValue(vv.getValue().merge(value));
+ return new Result<Version>(Code.OK, new MockVersion((MockVersion)vv.getVersion()));
+ }
+
+ private synchronized Result<MetastoreCursor> mockOpenCursor(
+ String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order, Set<String> fields) {
+ if (0 == map.size()) {
+ return new Result<MetastoreCursor>(Code.OK, MetastoreCursor.EMPTY_CURSOR);
+ }
+
+ boolean isLegalCursor = false;
+ NavigableMap<String, Versioned<Value>> myMap = null;
+ if (Order.ASC == order) {
+ myMap = map;
+ if (EMPTY_END_KEY == lastKey ||
+ lastKey.compareTo(myMap.lastKey()) > 0) {
+ lastKey = myMap.lastKey();
+ lastInclusive = true;
+ }
+ if (EMPTY_START_KEY == firstKey ||
+ firstKey.compareTo(myMap.firstKey()) < 0) {
+ firstKey = myMap.firstKey();
+ firstInclusive = true;
+ }
+ if (firstKey.compareTo(lastKey) <= 0) {
+ isLegalCursor = true;
+ }
+ } else if (Order.DESC == order) {
+ myMap = map.descendingMap();
+ if (EMPTY_START_KEY == lastKey ||
+ lastKey.compareTo(myMap.lastKey()) < 0) {
+ lastKey = myMap.lastKey();
+ lastInclusive = true;
+ }
+ if (EMPTY_END_KEY == firstKey ||
+ firstKey.compareTo(myMap.firstKey()) > 0) {
+ firstKey = myMap.firstKey();
+ firstInclusive = true;
+ }
+ if (firstKey.compareTo(lastKey) >= 0) {
+ isLegalCursor = true;
+ }
+ }
+
+ if (!isLegalCursor || null == myMap) {
+ return new Result<MetastoreCursor>(Code.IllegalOp, null);
+ }
+ MetastoreCursor cursor = new MockMetastoreCursor(
+ myMap.subMap(firstKey, firstInclusive, lastKey, lastInclusive), fields, scheduler);
+ return new Result<MetastoreCursor>(Code.OK, cursor);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+}
View
59 ...est/java/org/apache/bookkeeper/metastore/MetastoreScannableTableAsyncToSyncConverter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.Set;
+
+import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
+
+public class MetastoreScannableTableAsyncToSyncConverter extends
+ MetastoreTableAsyncToSyncConverter {
+
+ private MetastoreScannableTable scannableTable;
+
+ public MetastoreScannableTableAsyncToSyncConverter(
+ MetastoreScannableTable table) {
+ super(table);
+ this.scannableTable = table;
+ }
+
+ public MetastoreCursor openCursor(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order)
+ throws MSException {
+ HeldValue<MetastoreCursor> retValue = new HeldValue<MetastoreCursor>();
+ // make the actual async call
+ this.scannableTable.openCursor(firstKey, firstInclusive, lastKey, lastInclusive,
+ order, retValue, null);
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+ public MetastoreCursor openCursor(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order, Set<String> fields)
+ throws MSException {
+ HeldValue<MetastoreCursor> retValue = new HeldValue<MetastoreCursor>();
+ // make the actual async call
+ this.scannableTable.openCursor(firstKey, firstInclusive, lastKey, lastInclusive,
+ order, fields, retValue, null);
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+}
View
131 ...ver/src/test/java/org/apache/bookkeeper/metastore/MetastoreTableAsyncToSyncConverter.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.metastore.MetastoreCallback;
+import org.apache.bookkeeper.metastore.MetastoreTable;
+import org.apache.bookkeeper.metastore.MSException;
+import org.apache.bookkeeper.metastore.MSException.Code;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+// Converts async calls to sync calls for MetastoreTable. Currently not
+// intended to be used other than for simple functional tests, however,
+// could be developed into a sync API.
+
+public class MetastoreTableAsyncToSyncConverter {
+
+ static class HeldValue<T> implements MetastoreCallback<T> {
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+ private int code;
+ private T value = null;
+
+ void waitCallback() throws MSException {
+ try {
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ throw MSException.create(Code.InterruptedException);
+ }
+
+ if (Code.OK.getCode() != code) {
+ throw MSException.create(Code.get(code));
+ }
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ @Override
+ public void complete(int rc, T value, Object ctx) {
+ this.code = rc;
+ this.value = value;
+ countDownLatch.countDown();
+ }
+ }
+
+ protected MetastoreTable table;
+
+ public MetastoreTableAsyncToSyncConverter(MetastoreTable table) {
+ this.table = table;
+ }
+
+ public Versioned<Value> get(String key) throws MSException {
+ HeldValue<Versioned<Value>> retValue =
+ new HeldValue<Versioned<Value>>();
+
+ // make the actual async call
+ this.table.get(key, retValue, null);
+
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+ public Versioned<Value> get(String key, Set<String> fields)
+ throws MSException {
+ HeldValue<Versioned<Value>> retValue =
+ new HeldValue<Versioned<Value>>();
+
+ // make the actual async call
+ this.table.get(key, fields, retValue, null);
+
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+ public void remove(String key, Version version) throws MSException {
+ HeldValue<Void> retValue = new HeldValue<Void>();
+
+ // make the actual async call
+ this.table.remove(key, version, retValue, null);
+
+ retValue.waitCallback();
+ }
+
+ public Version put(String key, Value value, Version version)
+ throws MSException {
+ HeldValue<Version> retValue = new HeldValue<Version>();
+
+ // make the actual async call
+ this.table.put(key, value, version, retValue, null);
+
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+ public MetastoreCursor openCursor() throws MSException {
+ HeldValue<MetastoreCursor> retValue = new HeldValue<MetastoreCursor>();
+ // make the actual async call
+ this.table.openCursor(retValue, null);
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+ public MetastoreCursor openCursor(Set<String> fields) throws MSException {
+ HeldValue<MetastoreCursor> retValue = new HeldValue<MetastoreCursor>();
+ // make the actual async call
+ this.table.openCursor(fields, retValue, null);
+ retValue.waitCallback();
+ return retValue.getValue();
+ }
+
+}
View
649 bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
@@ -0,0 +1,649 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_END_KEY;
+import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_START_KEY;
+import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
+import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.metastore.MSException.Code;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
+import org.apache.bookkeeper.metastore.mock.MockMetaStore;
+import org.apache.bookkeeper.metastore.mock.MockMetastoreTable.MockVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestMetaStore extends TestCase {
+ final static Logger logger = LoggerFactory.getLogger(TestMetaStore.class);
+
+ protected final static String TABLE = "myTable";
+ protected final static String RECORDID = "test";
+ protected final static String FIELD_NAME = "name";
+ protected final static String FIELD_COUNTER = "counter";
+
+ protected String getFieldFromValue(Value value, String field) {
+ byte[] v = value.getField(field);
+ return v == null ? null : new String(v);
+ }
+
+ protected static Value makeValue(String name, Integer counter) {
+ Value data = new Value();
+
+ if (name != null) {
+ data.setField(FIELD_NAME, name.getBytes());
+ }
+
+ if (counter != null) {
+ data.setField(FIELD_COUNTER, counter.toString().getBytes());
+ }
+
+ return data;
+ }
+
+ protected class Record {
+ String name;
+ Integer counter;
+ Version version;
+
+ public Record() {
+ }
+
+ public Record(String name, Integer counter, Version version) {
+ this.name = name;
+ this.counter = counter;
+ this.version = version;
+ }
+
+ public Record(Versioned<Value> vv) {
+ version = vv.getVersion();
+
+ Value value = vv.getValue();
+ if (value == null) {
+ return;
+ }
+
+ name = getFieldFromValue(value, FIELD_NAME);
+ String c = getFieldFromValue(value, FIELD_COUNTER);
+ if (c != null) {
+ counter = new Integer(c);
+ }
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public Value getValue() {
+ return TestMetaStore.makeValue(name, counter);
+ }
+
+ public Versioned<Value> getVersionedValue() {
+ return new Versioned<Value>(getValue(), version);
+ }
+
+ public void merge(String name, Integer counter, Version version) {
+ if (name != null) {
+ this.name = name;
+ }
+ if (counter != null) {
+ this.counter = counter;
+ }
+ if (version != null) {
+ this.version = version;
+ }
+ }
+
+ public void merge(Record record) {
+ merge(record.name, record.counter, record.version);
+ }
+
+ public void checkEqual(Versioned<Value> vv) {
+ Version v = vv.getVersion();
+ Value value = vv.getValue();
+
+ assertEquals(name, getFieldFromValue(value, FIELD_NAME));
+
+ String c = getFieldFromValue(value, FIELD_COUNTER);
+ if (counter == null) {
+ assertNull(c);
+ } else {
+ assertEquals(counter.toString(), c);
+ }
+
+ assertTrue(isEqualVersion(version, v));
+ }
+
+ }
+
+ protected MetaStore metastore;
+ protected MetastoreScannableTable myActualTable;
+ protected MetastoreScannableTableAsyncToSyncConverter myTable;
+
+ protected String getMetaStoreName() {
+ return MockMetaStore.class.getName();
+ }
+
+ protected Configuration getConfiguration() {
+ return new CompositeConfiguration();
+ }
+
+ protected Version newBadVersion() {
+ return new MockVersion(-1);
+ }
+
+ protected Version nextVersion(Version version) {
+ if (Version.NEW == version) {
+ return new MockVersion(0);
+ }
+ if (Version.ANY == version) {
+ return Version.ANY;
+ }
+ assertTrue(version instanceof MockVersion);
+ return new MockVersion(((MockVersion) version).incrementVersion());
+ }
+
+ private void checkVersion(Version v) {
+ assertNotNull(v);
+ if (v != Version.NEW && v != Version.ANY) {
+ assertTrue(v instanceof MockVersion);
+ }
+ }
+
+ protected boolean isEqualVersion(Version v1, Version v2) {
+ checkVersion(v1);
+ checkVersion(v2);
+ return v1.compare(v2) == Version.Occurred.CONCURRENTLY;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ metastore = MetastoreFactory.createMetaStore(getMetaStoreName());
+ Configuration config = getConfiguration();
+ metastore.init(config, metastore.getVersion());
+
+ myActualTable = metastore.createScannableTable(TABLE);
+ myTable = new MetastoreScannableTableAsyncToSyncConverter(myActualTable);
+
+ // setup a clean environment
+ clearTable();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ // also clear table after test
+ clearTable();
+
+ myActualTable.close();
+ metastore.close();
+ }
+
+ void checkExpectedValue(Versioned<Value> vv, String expectedName,
+ Integer expectedCounter, Version expectedVersion) {
+ Record expected = new Record(expectedName, expectedCounter, expectedVersion);
+ expected.checkEqual(vv);
+ }
+
+ protected Integer getRandom() {
+ return (int)Math.random()*65536;
+ }
+
+ protected Versioned<Value> getRecord(String recordId) throws Exception {
+ try {
+ return myTable.get(recordId);
+ } catch (MSException.NoKeyException nke) {
+ return null;
+ }
+ }
+
+ /**
+ * get record with specific fields, assume record EXIST!
+ */
+ protected Versioned<Value> getExistRecordFields(String recordId, Set<String> fields)
+ throws Exception {
+ Versioned<Value> retValue = myTable.get(recordId, fields);
+ return retValue;
+ }
+
+ /**
+ * put and check fields
+ */
+ protected void putAndCheck(String recordId, String name,
+ Integer counter, Version version,
+ Record expected, Code expectedCode)
+ throws Exception {
+ Version retVersion = null;
+ Code code = Code.OperationFailure;
+ try {
+ retVersion = myTable.put(recordId, makeValue(name, counter), version);
+ code = Code.OK;
+ } catch (MSException.BadVersionException bve) {
+ code = Code.BadVersion;
+ } catch (MSException.NoKeyException nke) {
+ code = Code.NoKey;
+ } catch (MSException.KeyExistsException kee) {
+ code = Code.KeyExists;
+ }
+ assertEquals(expectedCode, code);
+
+ // get and check all fields of record
+ if (Code.OK == code) {
+ assertTrue(isEqualVersion(retVersion, nextVersion(version)));
+ expected.merge(name, counter, retVersion);
+ }
+
+ Versioned<Value> existedVV = getRecord(recordId);
+ if (null == expected) {
+ assertNull(existedVV);
+ } else {
+ expected.checkEqual(existedVV);
+ }
+ }
+
+ protected void clearTable() throws Exception {
+ MetastoreCursor cursor = myTable.openCursor();
+ if (!cursor.hasMoreEntries()) {
+ return;
+ }
+ while (cursor.hasMoreEntries()) {
+ Iterator<MetastoreTableItem> iter = cursor.readEntries(99);
+ while (iter.hasNext()) {
+ MetastoreTableItem item = iter.next();
+ String key = item.getKey();
+ myTable.remove(key, Version.ANY);
+ }
+ }
+ cursor.close();
+ }
+
+ /**
+ * Test (get, get partial field, remove) on non-existent element.
+ */
+ @Test
+ public void testNonExistent() throws Exception {
+ // get
+ try {
+ myTable.get(RECORDID);
+ fail("Should fail to get a non-existent key");
+ } catch (MSException.NoKeyException nke) {
+ }
+
+ // get partial field
+ Set<String> fields =
+ new HashSet<String>(Arrays.asList(new String[] { FIELD_COUNTER }));
+ try {
+ myTable.get(RECORDID, fields);
+ fail("Should fail to get a non-existent key with specified fields");
+ } catch (MSException.NoKeyException nke) {
+ }
+
+ // remove
+ try {
+ myTable.remove(RECORDID, Version.ANY);
+ fail("Should fail to delete a non-existent key");
+ } catch (MSException.NoKeyException nke) {
+ }
+ }
+
+ /**
+ * Test usage of get operation on (full and partial) fields.
+ */
+ @Test
+ public void testGet() throws Exception {
+ Versioned<Value> vv;
+
+ final Set<String> fields =
+ new HashSet<String>(Arrays.asList(new String[] { FIELD_NAME }));
+
+ final String name = "get";
+ final Integer counter = getRandom();
+
+ // put test item
+ Version version = myTable.put(RECORDID, makeValue(name, counter), Version.NEW);
+ assertNotNull(version);
+
+ // fetch with all fields
+ vv = getExistRecordFields(RECORDID, ALL_FIELDS);
+ checkExpectedValue(vv, name, counter, version);
+
+ // partial get name
+ vv = getExistRecordFields(RECORDID, fields);
+ checkExpectedValue(vv, name, null, version);
+
+ // non fields
+ vv = getExistRecordFields(RECORDID, NON_FIELDS);
+ checkExpectedValue(vv, null, null, version);
+
+ // get null key should fail
+ try {
+ getExistRecordFields(null, NON_FIELDS);
+ fail("Should fail to get null key with NON fields");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ try {
+ getExistRecordFields(null, ALL_FIELDS);
+ fail("Should fail to get null key with ALL fields.");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ try {
+ getExistRecordFields(null, fields);
+ fail("Should fail to get null key with fields " + fields);
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ }
+
+ /**
+ * Test usage of put operation with (full and partial) fields.
+ */
+ @Test
+ public void testPut() throws Exception {
+ final Integer counter = getRandom();
+ final String name = "put";
+
+ Version version;
+
+ /**
+ * test correct version put
+ */
+ // put test item
+ version = myTable.put(RECORDID, makeValue(name, counter), Version.NEW);
+ assertNotNull(version);
+ Record expected = new Record(name, counter, version);
+
+ // correct version put with only name field changed
+ putAndCheck(RECORDID, "name1", null, expected.getVersion(), expected, Code.OK);
+
+ // correct version put with only counter field changed
+ putAndCheck(RECORDID, null, counter + 1, expected.getVersion(), expected, Code.OK);
+
+ // correct version put with all fields filled
+ putAndCheck(RECORDID, "name2", counter + 2, expected.getVersion(), expected, Code.OK);
+
+ // test put exist entry with Version.ANY
+ checkPartialPut("put exist entry with Version.ANY", Version.ANY, expected, Code.OK);
+
+ /**
+ * test bad version put
+ */
+ // put to existed entry with Version.NEW
+ badVersionedPut(Version.NEW, Code.KeyExists);
+ // put to existed entry with bad version
+ badVersionedPut(newBadVersion(), Code.BadVersion);
+
+ // remove the entry
+ myTable.remove(RECORDID, Version.ANY);
+
+ // put to non-existent entry with bad version
+ badVersionedPut(newBadVersion(), Code.NoKey);
+ // put to non-existent entry with Version.ANY
+ badVersionedPut(Version.ANY, Code.NoKey);
+
+ /**
+ * test illegal arguments
+ */
+ illegalPut(null, Version.NEW);
+ illegalPut(makeValue("illegal value", getRandom()), null);
+ illegalPut(null, null);
+ }
+
+ protected void badVersionedPut(Version badVersion, Code expectedCode) throws Exception {
+ Versioned<Value> vv = getRecord(RECORDID);
+ Record expected = null;
+
+ if (expectedCode != Code.NoKey) {
+ assertNotNull(vv);
+ expected = new Record(vv);
+ }
+
+ checkPartialPut("badVersionedPut", badVersion, expected, expectedCode);
+ }
+
+ protected void checkPartialPut(String name, Version version, Record expected, Code expectedCode)
+ throws Exception {
+ Integer counter;
+
+ // bad version put with all fields filled
+ counter = getRandom();
+ putAndCheck(RECORDID, name + counter, counter, version, expected, expectedCode);
+
+ // bad version put with only name field changed
+ counter = getRandom();
+ putAndCheck(RECORDID, name + counter, null, version, expected, expectedCode);
+
+ // bad version put with only counter field changed
+ putAndCheck(RECORDID, null, counter, version, expected, expectedCode);
+ }
+
+ protected void illegalPut(Value value, Version version) throws MSException {
+ try {
+ myTable.put(RECORDID, value, version);
+ fail("Should fail to do versioned put with illegal arguments");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ }
+
+ /**
+ * Test usage of (unconditional remove, BadVersion remove, CorrectVersion
+ * remove) operation.
+ */
+ @Test
+ public void testRemove() throws Exception {
+ final Integer counter = getRandom();
+ final String name = "remove";
+ Version version;
+
+ // insert test item
+ version = myTable.put(RECORDID, makeValue(name, counter), Version.NEW);
+ assertNotNull(version);
+
+ // test unconditional remove
+ myTable.remove(RECORDID, Version.ANY);
+
+ // insert test item
+ version = myTable.put(RECORDID, makeValue(name, counter), Version.NEW);
+ assertNotNull(version);
+
+ // test remove with bad version
+ try {
+ myTable.remove(RECORDID, Version.NEW);
+ fail("Should fail to remove a given key with bad version");
+ } catch (MSException.BadVersionException bve) {
+ }
+ try {
+ myTable.remove(RECORDID, newBadVersion());
+ fail("Should fail to remove a given key with bad version");
+ } catch (MSException.BadVersionException bve) {
+ }
+
+ // test remove with correct version
+ myTable.remove(RECORDID, version);
+ }
+
+ protected void openCursorTest(MetastoreCursor cursor, Map<String, Value> expectedValues,
+ int numEntriesPerScan) throws Exception {
+ try {
+ Map<String, Value> entries = Maps.newHashMap();
+ while (cursor.hasMoreEntries()) {
+ Iterator<MetastoreTableItem> iter = cursor.readEntries(numEntriesPerScan);
+ while (iter.hasNext()) {
+ MetastoreTableItem item = iter.next();
+ entries.put(item.getKey(), item.getValue().getValue());
+ }
+ }
+ MapDifference<String, Value> diff = Maps.difference(expectedValues, entries);
+ assertTrue(diff.areEqual());
+ } finally {
+ cursor.close();
+ }
+ }
+
+ void openRangeCursorTest(String firstKey, boolean firstInclusive,
+ String lastKey, boolean lastInclusive,
+ Order order, Set<String> fields,
+ Iterator<Map.Entry<String, Value>> expectedValues,
+ int numEntriesPerScan) throws Exception {
+ MetastoreCursor cursor = myTable.openCursor(firstKey, firstInclusive,
+ lastKey, lastInclusive,
+ order, fields);
+ try {
+ while (cursor.hasMoreEntries()) {
+ Iterator<MetastoreTableItem> iter = cursor.readEntries(numEntriesPerScan);
+ while (iter.hasNext()) {
+ assertTrue(expectedValues.hasNext());
+ MetastoreTableItem item = iter.next();
+ Map.Entry<String, Value> expectedItem = expectedValues.next();
+ assertEquals(expectedItem.getKey(), item.getKey());
+ assertEquals(expectedItem.getValue(), item.getValue().getValue());
+ }
+ }
+ assertFalse(expectedValues.hasNext());
+ } finally {
+ cursor.close();
+ }
+ }
+
+ /**
+ * Test usage of (scan) operation on (full and partial) fields.
+ */
+ @Test
+ public void testOpenCursor() throws Exception {
+
+ TreeMap<String, Value> allValues = Maps.newTreeMap();
+ TreeMap<String, Value> partialValues = Maps.newTreeMap();
+ TreeMap<String, Value> nonValues = Maps.newTreeMap();
+
+ Set<String> counterFields = Sets.newHashSet(FIELD_COUNTER);
+
+ for (int i=5; i<24; i++) {
+ char c = (char)('a' + i);
+ String key = String.valueOf(c);
+ Value v = makeValue("value" + i, i);
+ Value cv = v.project(counterFields);
+ Value nv = v.project(NON_FIELDS);
+
+ myTable.put(key, new Value(v), Version.NEW);
+ allValues.put(key, v);
+ partialValues.put(key, cv);
+ nonValues.put(key, nv);
+ }
+
+ // test open cursor
+ MetastoreCursor cursor = myTable.openCursor(ALL_FIELDS);
+ openCursorTest(cursor, allValues, 7);
+
+ cursor = myTable.openCursor(counterFields);
+ openCursorTest(cursor, partialValues, 7);
+
+ cursor = myTable.openCursor(NON_FIELDS);
+ openCursorTest(cursor, nonValues, 7);
+
+ // test order inclusive exclusive
+ Iterator<Map.Entry<String, Value>> expectedIterator;
+
+ expectedIterator = allValues.subMap("l", true, "u", true).entrySet().iterator();
+ openRangeCursorTest("l", true, "u", true, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.descendingMap().subMap("u", true, "l", true)
+ .entrySet().iterator();
+ openRangeCursorTest("u", true, "l", true, Order.DESC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.subMap("l", false, "u", false).entrySet().iterator();
+ openRangeCursorTest("l", false, "u", false, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.descendingMap().subMap("u", false, "l", false)
+ .entrySet().iterator();
+ openRangeCursorTest("u", false, "l", false, Order.DESC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.subMap("l", true, "u", false).entrySet().iterator();
+ openRangeCursorTest("l", true, "u", false, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.descendingMap().subMap("u", true, "l", false)
+ .entrySet().iterator();
+ openRangeCursorTest("u", true, "l", false, Order.DESC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.subMap("l", false, "u", true).entrySet().iterator();
+ openRangeCursorTest("l", false, "u", true, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.descendingMap().subMap("u", false, "l", true)
+ .entrySet().iterator();
+ openRangeCursorTest("u", false, "l", true, Order.DESC, ALL_FIELDS, expectedIterator, 7);
+
+ // test out of range
+ String firstKey = "f";
+ String lastKey = "x";
+ expectedIterator = allValues.subMap(firstKey, true, lastKey, true)
+ .entrySet().iterator();
+ openRangeCursorTest("a", true, "z", true, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.subMap("l", true, lastKey, true).entrySet().iterator();
+ openRangeCursorTest("l", true, "z", true, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ expectedIterator = allValues.subMap(firstKey, true, "u", true).entrySet().iterator();
+ openRangeCursorTest("a", true, "u", true, Order.ASC, ALL_FIELDS, expectedIterator, 7);
+
+ // test EMPTY_START_KEY and EMPTY_END_KEY
+ expectedIterator = allValues.subMap(firstKey, true, "u", true).entrySet().iterator();
+ openRangeCursorTest(EMPTY_START_KEY, true, "u", true, Order.ASC, ALL_FIELDS,
+ expectedIterator, 7);
+
+ expectedIterator = allValues.descendingMap().subMap(lastKey, true, "l", true)
+ .entrySet().iterator();
+ openRangeCursorTest(EMPTY_END_KEY, true, "l", true, Order.DESC, ALL_FIELDS,
+ expectedIterator, 7);
+
+ // test illegal arguments
+ try {
+ myTable.openCursor("a", true, "z", true, Order.DESC, ALL_FIELDS);
+ fail("Should fail with wrong range");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ try {
+ myTable.openCursor("z", true, "a", true, Order.ASC, ALL_FIELDS);
+ fail("Should fail with wrong range");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ try {
+ myTable.openCursor("a", true, "z", true, null, ALL_FIELDS);
+ fail("Should fail with null order");
+ } catch (MSException.IllegalOpException ioe) {
+ }
+ }
+
+}

0 comments on commit 9b7b788

Please sign in to comment.