Browse files

add memcached ConnectionPool

  • Loading branch information...
1 parent 1ecd549 commit 994536f8cb831918c7513dc55703525fa53d06e6 @flyerhzm flyerhzm committed Aug 29, 2012
View
132 src/main/java/com/openfeint/memcached/ConnectionPool.java
@@ -0,0 +1,132 @@
+package com.openfeint.memcached;
+
+import com.openfeint.memcached.error.Error;
+import com.openfeint.memcached.transcoder.MarshalTranscoder;
+import com.openfeint.memcached.transcoder.MarshalZlibTranscoder;
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionFactoryBuilder.Locator;
+import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.transcoders.Transcoder;
+import org.jruby.Ruby;
+import org.jruby.runtime.ThreadContext;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+class ConnectionPool {
+ private int poolSize;
+ private int currentIndex;
+ private MemcachedClient[] connections;
+
+ public ConnectionPool(ThreadContext context, List<String> servers, Map<String, String> options, int poolSize) {
+ this.poolSize = poolSize;
+ currentIndex = 0;
+ initConnections(context, servers, options);
+ }
+
+ public MemcachedClient getConnection() {
+ currentIndex = currentIndex++ % poolSize;
+ return connections[currentIndex];
+ }
+
+ public void shutdown() {
+ for (MemcachedClient connection : connections) {
+ connection.shutdown();
+ }
+ }
+
+ private void initConnections(ThreadContext context, List<String> servers, Map<String, String> options) {
+ connections = new MemcachedClient[poolSize];
+ for (int i = 0; i < poolSize; i++) {
+ connections[i] = initConnection(context, servers, options);
+ }
+ }
+
+ private MemcachedClient initConnection(ThreadContext context, List<String> servers, Map<String, String> options) {
+ Ruby ruby = context.getRuntime();
+ List<InetSocketAddress> addresses = AddrUtil.getAddresses(servers);
+ try {
+ ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder();
+
+ String distributionValue = "ketama";
+ String hashValue = "fnv1_32";
+ boolean binaryValue = false;
+ boolean shouldOptimize = false;
+ String transcoderValue = null;
+ if (!options.isEmpty()) {
+ if (options.containsKey("distribution")) {
+ distributionValue = options.get("distribution");
+ }
+ if (options.containsKey("hash")) {
+ hashValue = options.get("hash");
+ }
+ if (options.containsKey("binary_protocol")) {
+ binaryValue = Boolean.parseBoolean(options.get("binary_protocol"));
+ }
+ if (options.containsKey("should_optimize")) {
+ shouldOptimize = Boolean.parseBoolean(options.get("should_optimize"));
+ }
+ if (options.containsKey("transcoder")) {
+ transcoderValue = options.get("transcoder");
+ }
+ }
+
+ if ("array_mod".equals(distributionValue)) {
+ builder.setLocatorType(Locator.ARRAY_MOD);
+ } else if ("ketama".equals(distributionValue) || "consistent_ketama".equals(distributionValue)) {
+ builder.setLocatorType(Locator.CONSISTENT);
+ } else {
+ throw Error.newNotSupport(ruby, "distribution not support");
+ }
+ if ("native".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.NATIVE_HASH);
+ } else if ("crc".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.CRC_HASH);
+ } else if ("fnv1_64".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.FNV1_64_HASH);
+ } else if ("fnv1a_64".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH);
+ } else if ("fnv1_32".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.FNV1_32_HASH);
+ } else if ("fnv1a_32".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.FNV1A_32_HASH);
+ } else if ("ketama".equals(hashValue)) {
+ builder.setHashAlg(DefaultHashAlgorithm.KETAMA_HASH);
+ } else {
+ throw Error.newNotSupport(ruby, "hash not support");
+ }
+
+ if (binaryValue) {
+ builder.setProtocol(Protocol.BINARY);
+ }
+
+ if (shouldOptimize) {
+ builder.setShouldOptimize(true);
+ }
+
+ if (options.containsKey("timeout")) {
+ int timeout = Integer.parseInt(options.get("timeout"));
+ builder.setOpTimeout(timeout);
+ }
+
+ builder.setDaemon(true);
+
+ Transcoder transcoder;
+ if ("marshal_zlib".equals(transcoderValue)) {
+ transcoder = new MarshalZlibTranscoder(ruby);
+ } else {
+ transcoder = new MarshalTranscoder(ruby);
+ }
+ builder.setTranscoder(transcoder);
+
+ return new MemcachedClient(builder.build(), addresses);
+ } catch (IOException e) {
+ throw ruby.newIOErrorFromException(e);
+ }
+ }
+}
View
165 src/main/java/com/openfeint/memcached/Memcached.java
@@ -1,16 +1,7 @@
package com.openfeint.memcached;
import com.openfeint.memcached.error.Error;
-import com.openfeint.memcached.transcoder.MarshalTranscoder;
-import com.openfeint.memcached.transcoder.MarshalZlibTranscoder;
-import net.spy.memcached.AddrUtil;
-import net.spy.memcached.ConnectionFactoryBuilder;
-import net.spy.memcached.ConnectionFactoryBuilder.Locator;
-import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
-import net.spy.memcached.DefaultHashAlgorithm;
-import net.spy.memcached.MemcachedClient;
import net.spy.memcached.OperationTimeoutException;
-import net.spy.memcached.transcoders.Transcoder;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBoolean;
@@ -24,8 +15,6 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
-import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -35,9 +24,7 @@
@JRubyClass(name = "Memcached")
public class Memcached extends RubyObject {
- private MemcachedClient client;
-
- private Transcoder transcoder;
+ private ConnectionPool pool;
private int ttl;
@@ -77,14 +64,16 @@ public IRubyObject initialize(ThreadContext context, IRubyObject[] args) {
if (servers.isEmpty()) {
servers.add("127.0.0.1:11211");
}
- return init(context, servers, options);
+ init(context, servers, options);
+
+ return context.nil;
}
@JRubyMethod
public IRubyObject servers(ThreadContext context) {
Ruby ruby = context.getRuntime();
List<IRubyObject> addresses = new ArrayList<IRubyObject>();
- for (SocketAddress address : client.getAvailableServers()) {
+ for (SocketAddress address : pool.getConnection().getAvailableServers()) {
String addressStr = address.toString();
if (addressStr.indexOf("/") == 0) {
addressStr = addressStr.replace("/", "");
@@ -103,7 +92,7 @@ public IRubyObject add(ThreadContext context, IRubyObject[] args) {
int retry = 0;
while (true) {
try {
- Boolean result = (Boolean) client.add(key, expiry, value, transcoder).get();
+ Boolean result = (Boolean) pool.getConnection().add(key, expiry, value).get();
if (!result) {
throw Error.newNotStored(ruby, "not stored");
}
@@ -144,7 +133,7 @@ public IRubyObject replace(ThreadContext context, IRubyObject [] args) {
int retry = 0;
while (true) {
try {
- Boolean result = (Boolean) client.replace(key, expiry, value, transcoder).get();
+ Boolean result = (Boolean) pool.getConnection().replace(key, expiry, value).get();
if (!result) {
throw Error.newNotStored(ruby, "not stored");
}
@@ -185,7 +174,7 @@ public IRubyObject set(ThreadContext context, IRubyObject[] args) {
int retry = 0;
while (true) {
try {
- Boolean result = (Boolean) client.set(key, expiry, value, transcoder).get();
+ Boolean result = (Boolean) pool.getConnection().set(key, expiry, value).get();
if (!result) {
throw Error.newNotStored(ruby, "not stored");
}
@@ -225,7 +214,7 @@ public IRubyObject get(ThreadContext context, IRubyObject[] args) {
while (true) {
try {
if (keys instanceof RubyString) {
- Object ret = client.get(getFullKey(keys.toString()), transcoder);
+ Object ret = pool.getConnection().get(getFullKey(keys.toString()));
if (ret == null) {
throw Error.newNotFound(ruby, "not found");
}
@@ -239,7 +228,7 @@ public IRubyObject get(ThreadContext context, IRubyObject[] args) {
} else if (keys instanceof RubyArray) {
RubyHash results = RubyHash.newHash(ruby);
- Map<String, IRubyObject> bulkResults = (Map<String, IRubyObject>) client.getBulk(getFullKeys(keys.convertToArray()), transcoder);
+ Map<String, Object> bulkResults = (Map<String, Object>) pool.getConnection().getBulk(getFullKeys(keys.convertToArray()));
for (String key : (List<String>) keys.convertToArray()) {
if (bulkResults.containsKey(getFullKey(key))) {
results.put(key, bulkResults.get(getFullKey(key)));
@@ -269,7 +258,7 @@ public IRubyObject incr(ThreadContext context, IRubyObject[] args) {
int retry = 0;
while (true) {
try {
- long result = client.incr(key, by, 1, expiry);
+ long result = pool.getConnection().incr(key, by, 1, expiry);
return ruby.newFixnum(result);
} catch (OperationTimeoutException e) {
if (retry == exceptionRetryLimit) {
@@ -289,7 +278,7 @@ public IRubyObject decr(ThreadContext context, IRubyObject[] args) {
int retry = 0;
while (true) {
try {
- long result = client.decr(key, by, 0, expiry);
+ long result = pool.getConnection().decr(key, by, 0, expiry);
return ruby.newFixnum(result);
} catch (OperationTimeoutException e) {
if (retry == exceptionRetryLimit) {
@@ -306,7 +295,7 @@ public IRubyObject delete(ThreadContext context, IRubyObject key) {
int retry = 0;
while (true) {
try {
- boolean result = client.delete(getFullKey(key.toString())).get();
+ boolean result = pool.getConnection().delete(getFullKey(key.toString())).get();
if (!result) {
throw Error.newNotFound(ruby, "not found");
}
@@ -342,7 +331,7 @@ public IRubyObject delete(ThreadContext context, IRubyObject key) {
public IRubyObject flush(ThreadContext context) {
Ruby ruby = context.getRuntime();
try {
- client.flush().get();
+ pool.getConnection().flush().get();
return context.nil;
} catch (OperationTimeoutException e) {
throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
@@ -357,7 +346,7 @@ public IRubyObject flush(ThreadContext context) {
public IRubyObject stats(ThreadContext context) {
Ruby ruby = context.getRuntime();
RubyHash results = RubyHash.newHash(ruby);
- for(Map.Entry<SocketAddress, Map<String, String>> entry : client.getStats().entrySet()) {
+ for(Map.Entry<SocketAddress, Map<String, String>> entry : pool.getConnection().getStats().entrySet()) {
RubyHash serverHash = RubyHash.newHash(ruby);
for(Map.Entry<String, String> server : entry.getValue().entrySet()) {
serverHash.op_aset(context, ruby.newString(server.getKey()), ruby.newString(server.getValue()));
@@ -369,7 +358,7 @@ public IRubyObject stats(ThreadContext context) {
@JRubyMethod(name = {"quit", "shutdown"})
public IRubyObject shutdown(ThreadContext context) {
- client.shutdown();
+ pool.shutdown();
return context.nil;
}
@@ -378,101 +367,6 @@ protected int getDefaultTTL() {
return ttl;
}
- protected IRubyObject init(ThreadContext context, List<String> servers, Map<String, String> opts) {
- Ruby ruby = context.getRuntime();
- List<InetSocketAddress> addresses = AddrUtil.getAddresses(servers);
- try {
- ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder();
-
- String distributionValue = "ketama";
- String hashValue = "fnv1_32";
- boolean binaryValue = false;
- boolean shouldOptimize = false;
- String transcoderValue = null;
- if (!opts.isEmpty()) {
- if (opts.containsKey("distribution")) {
- distributionValue = opts.get("distribution");
- }
- if (opts.containsKey("hash")) {
- hashValue = opts.get("hash");
- }
- if (opts.containsKey("binary_protocol")) {
- binaryValue = Boolean.parseBoolean(opts.get("binary_protocol"));
- }
- if (opts.containsKey("should_optimize")) {
- shouldOptimize = Boolean.parseBoolean(opts.get("should_optimize"));
- }
- if (opts.containsKey("default_ttl")) {
- ttl = Integer.parseInt(opts.get("default_ttl"));
- }
- if (opts.containsKey("timeout")) {
- timeout = Integer.parseInt(opts.get("timeout"));
- }
- if (opts.containsKey("exception_retry_limit")) {
- exceptionRetryLimit = Integer.parseInt(opts.get("exception_retry_limit"));
- }
- if (opts.containsKey("namespace")) {
- prefixKey = opts.get("namespace");
- }
- if (opts.containsKey("prefix_key")) {
- prefixKey = opts.get("prefix_key");
- }
- if (opts.containsKey("transcoder")) {
- transcoderValue = opts.get("transcoder");
- }
- }
-
- if ("array_mod".equals(distributionValue)) {
- builder.setLocatorType(Locator.ARRAY_MOD);
- } else if ("ketama".equals(distributionValue) || "consistent_ketama".equals(distributionValue)) {
- builder.setLocatorType(Locator.CONSISTENT);
- } else {
- throw Error.newNotSupport(ruby, "distribution not support");
- }
- if ("native".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.NATIVE_HASH);
- } else if ("crc".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.CRC_HASH);
- } else if ("fnv1_64".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.FNV1_64_HASH);
- } else if ("fnv1a_64".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH);
- } else if ("fnv1_32".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.FNV1_32_HASH);
- } else if ("fnv1a_32".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.FNV1A_32_HASH);
- } else if ("ketama".equals(hashValue)) {
- builder.setHashAlg(DefaultHashAlgorithm.KETAMA_HASH);
- } else {
- throw Error.newNotSupport(ruby, "hash not support");
- }
-
- if (binaryValue) {
- builder.setProtocol(Protocol.BINARY);
- }
- if (shouldOptimize) {
- builder.setShouldOptimize(true);
- }
-
- if (timeout != -1) {
- builder.setOpTimeout(timeout);
- }
- builder.setDaemon(true);
- if ("marshal_zlib".equals(transcoderValue)) {
- transcoder = new MarshalZlibTranscoder(ruby);
- } else {
- transcoder = new MarshalTranscoder(ruby);
- }
- builder.setTranscoder(transcoder);
-
- client = new MemcachedClient(builder.build(), addresses);
-
- return context.nil;
- } catch (IOException e) {
- throw ruby.newIOErrorFromException(e);
- }
- }
-
private int getExpiry(IRubyObject[] args) {
if (args.length > 2) {
return (int) args[2].convertToInteger().getLongValue();
@@ -498,4 +392,31 @@ private int getIncrDecrBy(IRubyObject[] args) {
private String getFullKey(String key) {
return prefixKey + key;
}
+
+ protected void init(ThreadContext context, List<String> servers, Map<String, String> options) {
+ if (options.containsKey("default_ttl")) {
+ ttl = Integer.parseInt(options.get("default_ttl"));
+ }
+
+ if (options.containsKey("exception_retry_limit")) {
+ exceptionRetryLimit = Integer.parseInt(options.get("exception_retry_limit"));
+ }
+
+ if (options.containsKey("timeout")) {
+ timeout = Integer.parseInt(options.get("timeout"));
+ }
+
+ if (options.containsKey("prefix_key")) {
+ prefixKey = options.get("prefix_key");
+ }
+ if (options.containsKey("namespace")) {
+ prefixKey = options.get("namespace");
+ }
+
+ int poolSize = 1;
+ if (options.containsKey("pool_size")) {
+ poolSize = Integer.parseInt(options.get("pool_size"));
+ }
+ pool = new ConnectionPool(context, servers, options, poolSize);
+ }
}
View
4 src/main/java/com/openfeint/memcached/Rails.java
@@ -64,7 +64,9 @@ public IRubyObject initialize(ThreadContext context, IRubyObject[] args) {
if (options.containsKey("string_return_types")) {
stringReturnTypes = true;
}
- return super.init(context, servers, options);
+ init(context, servers, options);
+
+ return context.nil;
}
@JRubyMethod(name = "active?")
View
BIN target/spymemcached-ext-0.0.1.jar
Binary file not shown.

0 comments on commit 994536f

Please sign in to comment.