Skip to content

Commit

Permalink
BDB Cache partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Jul 13, 2012
1 parent ffff0f7 commit 986c8f2
Show file tree
Hide file tree
Showing 32 changed files with 1,553 additions and 164 deletions.
99 changes: 94 additions & 5 deletions clients/python/voldemort/protocol/voldemort_admin_pb2.py

Large diffs are not rendered by default.

Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;

import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
Expand All @@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;

import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
Expand All @@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -4,11 +4,14 @@

import krati.core.segment.MappedSegmentFactory;
import krati.core.segment.SegmentFactory;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;
import voldemort.utils.Props;
import voldemort.utils.ReflectUtils;
Expand Down Expand Up @@ -42,16 +45,16 @@ public KratiStorageConfiguration(VoldemortConfig config) {

public void close() {}

public StorageEngine<ByteArray, byte[], byte[]> getStore(String storeName) {
public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
synchronized(lock) {
File storeDir = new File(dataDirectory, storeName);
File storeDir = new File(dataDirectory, storeDef.getName());
if(!storeDir.exists()) {
logger.info("Creating krati data directory '" + storeDir.getAbsolutePath() + "'.");
storeDir.mkdirs();
}

SegmentFactory segmentFactory = (SegmentFactory) ReflectUtils.callConstructor(factoryClass);
return new KratiStorageEngine(storeName,
return new KratiStorageEngine(storeDef.getName(),
segmentFactory,
segmentFileSizeMb,
lockStripes,
Expand All @@ -65,4 +68,8 @@ public String getType() {
return TYPE_NAME;
}

public void update(StoreDefinition storeDef) {
throw new VoldemortException("Storage config updates not permitted for "
+ this.getClass().getCanonicalName());
}
}
Expand Up @@ -291,7 +291,7 @@ public KratiClosableIterator(List<Pair<ByteArray, Versioned<byte[]>>> list) {
}

public void close() {
// Nothing to close here
// Nothing to close here
}

public boolean hasNext() {
Expand Down
18 changes: 16 additions & 2 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -220,6 +220,10 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("version")
.ofType(Long.class);
parser.accepts("reserve-memory", "Memory in MB to reserve for the store")
.withRequiredArg()
.describedAs("size-in-mb")
.ofType(Long.class);

OptionSet options = parser.parse(args);

Expand All @@ -236,7 +240,7 @@ public static void main(String[] args) throws Exception {
|| options.has("ro-metadata") || options.has("set-metadata")
|| options.has("get-metadata") || options.has("check-metadata") || options.has("key-distribution"))
|| options.has("truncate") || options.has("clear-rebalancing-metadata")
|| options.has("async") || options.has("native-backup") || options.has("rollback"))) {
|| options.has("async") || options.has("native-backup") || options.has("rollback") || options.has("reserve-memory"))) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(1);
Expand Down Expand Up @@ -311,11 +315,17 @@ public static void main(String[] args) throws Exception {
}
ops += "o";
}
if(options.has("reserve-memory")) {
if(!options.has("stores")) {
Utils.croak("Specify the list of stores to reserve memory");
}
ops += "f";
}
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
+ "repair-job, native-backup) must be specified");
+ "repair-job, native-backup, rollback, reserve-memory) must be specified");
}

List<String> storeNames = null;
Expand Down Expand Up @@ -480,6 +490,10 @@ public static void main(String[] args) throws Exception {
long pushVersion = (Long) options.valueOf("version");
executeRollback(nodeId, storeName, pushVersion, adminClient);
}
if(ops.contains("f")) {
long reserveMB = (Long) options.valueOf("reserve-memory");
adminClient.reserveMemory(nodeId, storeNames, reserveMB);
}
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
Expand Down
39 changes: 39 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -2448,4 +2449,42 @@ public void nativeBackup(int nodeId,
int asyncId = response.getRequestId();
waitForCompletion(nodeId, asyncId, timeOut, TimeUnit.MINUTES);
}

/**
* Reserve memory for the stores
*
* @param nodeId The node id to reserve, -1 for entire cluster
* @param stores list of stores for which to reserve
* @param sizeInMB size of reservation
*/
public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {

List<Integer> reserveNodes = new ArrayList<Integer>();
if(nodeId == -1) {
// if no node is specified send it to the entire cluster
for(Node node: currentCluster.getNodes())
reserveNodes.add(node.getId());
} else {
reserveNodes.add(nodeId);
}
for(String storeName: stores) {
for(Integer reserveNodeId: reserveNodes) {

VAdminProto.ReserveMemoryRequest reserveRequest = VAdminProto.ReserveMemoryRequest.newBuilder()
.setStoreName(storeName)
.setSizeInMb(sizeInMB)
.build();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setReserveMemory(reserveRequest)
.setType(VAdminProto.AdminRequestType.RESERVE_MEMORY)
.build();
VAdminProto.ReserveMemoryResponse.Builder response = sendAndReceive(reserveNodeId,
adminRequest,
VAdminProto.ReserveMemoryResponse.newBuilder());
if(response.hasError())
throwException(response.getError());
}
logger.info("Finished reserving memory for store : " + storeName);
}
}
}

0 comments on commit 986c8f2

Please sign in to comment.