Skip to content

Commit

Permalink
HDDS-748. Use strongly typed metadata Table implementation. Contribut…
Browse files Browse the repository at this point in the history
…ed by Elek Marton.
  • Loading branch information
bharatviswa504 committed Dec 2, 2018
1 parent 99e201d commit d15dc43
Show file tree
Hide file tree
Showing 18 changed files with 765 additions and 136 deletions.
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;

import org.apache.hadoop.utils.db.Table.KeyValue;

/**
* Key value for raw Table implementations.
*/
public final class ByteArrayKeyValue implements KeyValue<byte[], byte[]> {
private byte[] key;
private byte[] value;

private ByteArrayKeyValue(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}

/**
* Create a KeyValue pair.
*
* @param key - Key Bytes
* @param value - Value bytes
* @return KeyValue object.
*/
public static ByteArrayKeyValue create(byte[] key, byte[] value) {
return new ByteArrayKeyValue(key, value);
}

/**
* Return key.
*
* @return byte[]
*/
public byte[] getKey() {
byte[] result = new byte[key.length];
System.arraycopy(key, 0, result, 0, key.length);
return result;
}

/**
* Return value.
*
* @return byte[]
*/
public byte[] getValue() {
byte[] result = new byte[value.length];
System.arraycopy(value, 0, result, 0, value.length);
return result;
}
}
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;

/**
* Codec interface to marshall/unmarshall data to/from a byte[] based
* key/value store.
*
* @param <T> Unserialized type
*/
public interface Codec<T> {

/**
* Convert object to raw persisted format.
*/
byte[] toPersistedFormat(T object);

/**
* Convert object from raw persisted format.
*/
T fromPersistedFormat(byte[] rawData);
}
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;

import java.util.HashMap;
import java.util.Map;

/**
* Collection of available codecs.
*/
public class CodecRegistry {

private Map<Class, Codec<?>> valueCodecs;

public CodecRegistry() {
valueCodecs = new HashMap<>();
valueCodecs.put(String.class, new StringCodec());
}

/**
* Convert raw value to strongly typed value/key with the help of a codec.
*
* @param rawData original byte array from the db.
* @param format Class of the return value
* @param <T> Type of the return value.
* @return the object with the parsed field data
*/
public <T> T asObject(byte[] rawData, Class<T> format) {
if (valueCodecs.containsKey(format)) {
return (T) valueCodecs.get(format).fromPersistedFormat(rawData);
} else {
throw new IllegalStateException(
"Codec is not registered for type: " + format);
}
}

/**
* Convert strongly typed object to raw data to store it in the kv store.
*
* @param object typed object.
* @param <T> Type of the typed object.
* @return byte array to store it ini the kv store.
*/
public <T> byte[] asRawData(T object) {
Class<T> format = (Class<T>) object.getClass();
if (valueCodecs.containsKey(format)) {
Codec<T> codec = (Codec<T>) valueCodecs.get(format);
return codec.toPersistedFormat(object);
} else {
throw new IllegalStateException(
"Codec is not registered for type: " + format);
}
}
}
Expand Up @@ -41,7 +41,17 @@ public interface DBStore extends AutoCloseable {
* @return - TableStore. * @return - TableStore.
* @throws IOException on Failure * @throws IOException on Failure
*/ */
Table getTable(String name) throws IOException; Table<byte[], byte[]> getTable(String name) throws IOException;

/**
* Gets an existing TableStore with implicit key/value conversion.
*
* @param name - Name of the TableStore to get
* @return - TableStore.
* @throws IOException on Failure
*/
<KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException;


/** /**
* Lists the Known list of Tables in a DB. * Lists the Known list of Tables in a DB.
Expand Down
Expand Up @@ -55,6 +55,7 @@ public class RDBStore implements DBStore {
private final File dbLocation; private final File dbLocation;
private final WriteOptions writeOptions; private final WriteOptions writeOptions;
private final DBOptions dbOptions; private final DBOptions dbOptions;
private final CodecRegistry codecRegistry;
private final Hashtable<String, ColumnFamilyHandle> handleTable; private final Hashtable<String, ColumnFamilyHandle> handleTable;
private ObjectName statMBeanName; private ObjectName statMBeanName;


Expand All @@ -64,7 +65,7 @@ public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families)
Preconditions.checkNotNull(families); Preconditions.checkNotNull(families);
Preconditions.checkArgument(families.size() > 0); Preconditions.checkArgument(families.size() > 0);
handleTable = new Hashtable<>(); handleTable = new Hashtable<>();

codecRegistry = new CodecRegistry();
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(); new ArrayList<>();
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
Expand Down Expand Up @@ -254,14 +255,21 @@ protected ObjectName getStatMBeanName() {
} }


@Override @Override
public Table getTable(String name) throws IOException { public Table<byte[], byte[]> getTable(String name) throws IOException {
ColumnFamilyHandle handle = handleTable.get(name); ColumnFamilyHandle handle = handleTable.get(name);
if (handle == null) { if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name); throw new IOException("No such table in this DB. TableName : " + name);
} }
return new RDBTable(this.db, handle, this.writeOptions); return new RDBTable(this.db, handle, this.writeOptions);
} }


@Override
public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException {
return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
valueType);
}

@Override @Override
public ArrayList<Table> listTables() throws IOException { public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>(); ArrayList<Table> returnList = new ArrayList<>();
Expand Down
Expand Up @@ -19,17 +19,17 @@


package org.apache.hadoop.utils.db; package org.apache.hadoop.utils.db;


import org.apache.hadoop.utils.db.Table.KeyValue;
import org.rocksdb.RocksIterator;

import java.io.IOException; import java.io.IOException;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.function.Consumer; import java.util.function.Consumer;


import org.rocksdb.RocksIterator;

/** /**
* RocksDB store iterator. * RocksDB store iterator.
*/ */
public class RDBStoreIterator implements TableIterator<KeyValue> { public class RDBStoreIterator
implements TableIterator<byte[], ByteArrayKeyValue> {


private RocksIterator rocksDBIterator; private RocksIterator rocksDBIterator;


Expand All @@ -39,7 +39,8 @@ public RDBStoreIterator(RocksIterator iterator) {
} }


@Override @Override
public void forEachRemaining(Consumer<? super KeyValue> action) { public void forEachRemaining(
Consumer<? super ByteArrayKeyValue> action) {
while(hasNext()) { while(hasNext()) {
action.accept(next()); action.accept(next());
} }
Expand All @@ -51,9 +52,10 @@ public boolean hasNext() {
} }


@Override @Override
public Table.KeyValue next() { public ByteArrayKeyValue next() {
if (rocksDBIterator.isValid()) { if (rocksDBIterator.isValid()) {
KeyValue value = KeyValue.create(rocksDBIterator.key(), rocksDBIterator ByteArrayKeyValue value =
ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
.value()); .value());
rocksDBIterator.next(); rocksDBIterator.next();
return value; return value;
Expand All @@ -72,10 +74,10 @@ public void seekToLast() {
} }


@Override @Override
public KeyValue seek(byte[] key) { public ByteArrayKeyValue seek(byte[] key) {
rocksDBIterator.seek(key); rocksDBIterator.seek(key);
if (rocksDBIterator.isValid()) { if (rocksDBIterator.isValid()) {
return KeyValue.create(rocksDBIterator.key(), return ByteArrayKeyValue.create(rocksDBIterator.key(),
rocksDBIterator.value()); rocksDBIterator.value());
} }
return null; return null;
Expand Down
Expand Up @@ -35,7 +35,8 @@
/** /**
* RocksDB implementation of ozone metadata store. * RocksDB implementation of ozone metadata store.
*/ */
public class RDBTable implements Table { public class RDBTable implements Table<byte[], byte[]> {



private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RDBTable.class); LoggerFactory.getLogger(RDBTable.class);
Expand Down Expand Up @@ -108,7 +109,7 @@ public void putWithBatch(BatchOperation batch, byte[] key, byte[] value)


@Override @Override
public boolean isEmpty() throws IOException { public boolean isEmpty() throws IOException {
try (TableIterator<KeyValue> keyIter = iterator()) { try (TableIterator<byte[], ByteArrayKeyValue> keyIter = iterator()) {
keyIter.seekToFirst(); keyIter.seekToFirst();
return !keyIter.hasNext(); return !keyIter.hasNext();
} }
Expand Down Expand Up @@ -145,7 +146,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key)
} }


@Override @Override
public TableIterator<KeyValue> iterator() { public TableIterator<byte[], ByteArrayKeyValue> iterator() {
ReadOptions readOptions = new ReadOptions(); ReadOptions readOptions = new ReadOptions();
return new RDBStoreIterator(db.newIterator(handle, readOptions)); return new RDBStoreIterator(db.newIterator(handle, readOptions));
} }
Expand Down
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;

import org.apache.hadoop.hdfs.DFSUtil;

/**
* Codec to convert String to/from byte array.
*/
public class StringCodec implements Codec<String> {

@Override
public byte[] toPersistedFormat(String object) {
if (object != null) {
return DFSUtil.string2Bytes(object);
} else {
return null;
}
}

@Override
public String fromPersistedFormat(byte[] rawData) {
if (rawData != null) {
return DFSUtil.bytes2String(rawData);
} else {
return null;
}
}
}

0 comments on commit d15dc43

Please sign in to comment.