Skip to content

Commit

Permalink
Fixed the merge issues from master. Fixed a bug in SocketStoreClientF…
Browse files Browse the repository at this point in the history
…actoryMbeanTest
  • Loading branch information
Chinmay Soman committed Sep 20, 2012
2 parents 78ae5d5 + c54100d commit be12a28
Show file tree
Hide file tree
Showing 92 changed files with 4,472 additions and 625 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -13,3 +13,4 @@ server.state
.version
.temp
.idea
data/
126 changes: 87 additions & 39 deletions bin/generate_cluster_xml.py
@@ -1,42 +1,90 @@
#!/usr/bin/python

import sys
import random
import argparse

# Get a random seed
rseed = int(random.randint(00000000001,99999999999))

# Setup and argument parser
parser = argparse.ArgumentParser(description='Build a voldemort cluster.xml.')
# Add supported arguments
parser.add_argument('-N', '--name', type=str, default='voldemort', dest='name',
help='the name you want to give the cluster')
parser.add_argument('-n', '--nodes', type=int, default=2, dest='nodes',
help='the number of nodes in the cluster')
parser.add_argument('-p', '--partitions', type=int, default=300,
dest='partitions', help='number of partitions per node')
parser.add_argument('-s', '--socket-port', type=int, default=6666,
dest='sock_port', help='socket port number')
parser.add_argument('-a', '--admin-port', type=int, default=6667,
dest='admin_port', help='admin port number')
parser.add_argument('-H', '--http-port', type=int, default=6665,
dest='http_port', help='http port number')
genType = parser.add_mutually_exclusive_group()
genType.add_argument('-S', '--seed', type=int, default=rseed, dest='seed',
help='seed for randomizing partition distribution')
genType.add_argument('-l', '--loops', type=int, default=1000, dest='loops',
help='loop n times, using a different random seed every \
time (Note: not currently supported)')
parser.add_argument('-z', '--zones', type=int, dest='zones',
help='if using zones, the number of zones you will have\
(Note: you must add your own <zone> fields \
manually)')

# Parse arguments
args = parser.parse_args()

# Check args
if args.zones:
zones = args.zones
if (args.nodes % zones) != 0:
print "Number of nodes must be evenly divisible by number of zones"
sys.exit(1)

# Store arguments
nodes = args.nodes
partitions = args.partitions
name = args.name
http_port = args.http_port
sock_port = args.sock_port
admin_port = args.admin_port
seed = args.seed

# Generate the full list of partition IDs
part_ids = range(nodes * partitions)
# Generate full list of zone IDs
if args.zones:
zone_ids = range(zones)
zone_id = 0

# Shuffle up the partitions
random.seed(seed)
random.shuffle(part_ids)

# Printing cluster.xml
print "<!-- Partition distribution generated using seed [%d] -->" % seed
print "<cluster>"
print " <name>%s</name>" % name

for i in xrange(nodes):
node_partitions = ", ".join(str(p) for p in sorted(part_ids[i*partitions:(i+1)*partitions]))

print " <server>"
print " <id>%d</id>" % i
print " <host>host%d</host>" % i
print " <http-port>%d</http-port>" % http_port
print " <socket-port>%d</socket-port>" % sock_port
print " <admin-port>%d</admin-port>" % admin_port
print " <partitions>%s</partitions>" % node_partitions
# If zones are being used, assign a zone-id
if args.zones:
print " <zone-id>%d</zone-id>" % zone_id
if zone_id == (zones - 1):
zone_id = 0
else:
zone_id += 1
print " </server>"

if len(sys.argv) != 3:
print >> sys.stderr, "USAGE: python generate_cluster_xml.py <nodes_file> <partitions_per_node>"
sys.exit()

FORMAT_WIDTH = 10

nodes = 0
for line in open(sys.argv[1],'r'):
nodes+=1

partitions = int(sys.argv[2])

ids = range(nodes * partitions)

# use known seed so this is repeatable
random.seed(92873498274)
random.shuffle(ids)

print '<cluster>'
print '<name>prodcluster</name>'
id = 0
for host in open(sys.argv[1],'r'):
print '<server>'
print " <id>%d</id>" % id
print " <host>%s</host>" % host.strip()
print ' <http-port>8081</http-port>'
print ' <socket-port>6666</socket-port>'
print ' <partitions>',
node_ids = sorted(ids[id*partitions:(id+1)*partitions])
for j in xrange(len(node_ids)):
print str(node_ids[j]) + ',',
if j % FORMAT_WIDTH == FORMAT_WIDTH - 1:
print ' ',
print ' </partitions>'
print '</server>'
id += 1
print '</cluster>'


print "</cluster>"
2 changes: 1 addition & 1 deletion build.properties
Expand Up @@ -37,4 +37,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort

## Release
curr.release=0.90.1
curr.release=0.96
2 changes: 2 additions & 0 deletions build.xml
Expand Up @@ -27,6 +27,7 @@
</condition>

<path id="contrib-classpath">
<pathelement path="${resources.dir}" />
<fileset dir="${dist.dir}">
<include name="${name}-${curr.release}.jar" />
</fileset>
Expand All @@ -37,6 +38,7 @@
</path>

<path id="test-classpath">
<pathelement path="${resources.dir}" />
<pathelement path="${env.VOLD_TEST_JARS}" />
<path refid="main-classpath" />
<pathelement path="${testclasses.dir}" />
Expand Down
4 changes: 4 additions & 0 deletions clients/python/voldemort/client.py
Expand Up @@ -239,6 +239,9 @@ def _send_request(self, connection, req_bytes):
## read a response from the connection
def _receive_response(self, connection):
size_bytes = connection.recv(4)
if not size_bytes:
raise VoldemortException('Connection closed')

size = struct.unpack('>i', size_bytes)[0]

bytes_read = 0
Expand All @@ -252,6 +255,7 @@ def _receive_response(self, connection):
return ''.join(data)



## Bootstrap cluster metadata from a list of urls of nodes in the cluster.
## The urls are tuples in the form (host, port).
## A dictionary of node_id => node is returned.
Expand Down
99 changes: 94 additions & 5 deletions clients/python/voldemort/protocol/voldemort_admin_pb2.py

Large diffs are not rendered by default.

14 changes: 0 additions & 14 deletions config/single_node_cluster/config/stores.xml
Expand Up @@ -15,18 +15,4 @@
<type>string</type>
</value-serializer>
</store>
<view>
<name>test-view</name>
<view-of>test</view-of>
<owners> ron@hogwarts.edu </owners>
<view-class>
voldemort.store.views.UpperCaseView
</view-class>
<value-serializer>
<type>string</type>
</value-serializer>
<transforms-serializer>
<type>string</type>
</transforms-serializer>
</view>
</stores>
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;

import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
Expand All @@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;

import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
Expand All @@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -16,6 +16,7 @@

package voldemort.store.readonly.fetcher;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -283,18 +284,18 @@ private void copyFileWithCheckSum(FileSystem fs,
OutputStream output = null;
try {
input = fs.open(source);
output = new FileOutputStream(dest);
output = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[bufferSize];
while(true) {
int read = input.read(buffer);
if(read < 0) {
break;
} else if(read < bufferSize) {
buffer = ByteUtils.copy(buffer, 0, read);
} else {
output.write(buffer, 0, read);
}
output.write(buffer);

if(fileCheckSumGenerator != null)
fileCheckSumGenerator.update(buffer);
fileCheckSumGenerator.update(buffer, 0, read);
if(throttler != null)
throttler.maybeThrottle(read);
stats.recordBytes(read);
Expand Down
Expand Up @@ -4,11 +4,14 @@

import krati.core.segment.MappedSegmentFactory;
import krati.core.segment.SegmentFactory;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;
import voldemort.utils.Props;
import voldemort.utils.ReflectUtils;
Expand Down Expand Up @@ -42,16 +45,16 @@ public KratiStorageConfiguration(VoldemortConfig config) {

public void close() {}

public StorageEngine<ByteArray, byte[], byte[]> getStore(String storeName) {
public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
synchronized(lock) {
File storeDir = new File(dataDirectory, storeName);
File storeDir = new File(dataDirectory, storeDef.getName());
if(!storeDir.exists()) {
logger.info("Creating krati data directory '" + storeDir.getAbsolutePath() + "'.");
storeDir.mkdirs();
}

SegmentFactory segmentFactory = (SegmentFactory) ReflectUtils.callConstructor(factoryClass);
return new KratiStorageEngine(storeName,
return new KratiStorageEngine(storeDef.getName(),
segmentFactory,
segmentFileSizeMb,
lockStripes,
Expand All @@ -65,4 +68,8 @@ public String getType() {
return TYPE_NAME;
}

public void update(StoreDefinition storeDef) {
throw new VoldemortException("Storage config updates not permitted for "
+ this.getClass().getCanonicalName());
}
}
Expand Up @@ -291,7 +291,7 @@ public KratiClosableIterator(List<Pair<ByteArray, Versioned<byte[]>>> list) {
}

public void close() {
// Nothing to close here
// Nothing to close here
}

public boolean hasNext() {
Expand Down

0 comments on commit be12a28

Please sign in to comment.