Permalink
Browse files

add Memcached::ATimeoutOccurred to handle timeout exceptions

  • Loading branch information...
1 parent 57b0b27 commit 793f342b8a62363adb60c75a6add4d28e9789e68 @flyerhzm flyerhzm committed Aug 20, 2012
View
@@ -3,7 +3,7 @@
describe Memcached do
context "localhost" do
before(:all) { @memcached = Memcached.new(["127.0.0.1:11211"]) }
- after(:all) { @memcached.shutdown }
+ after(:all) { @memcached.quit }
it "should get all servers" do
@memcached.set "foo", "bar"
@@ -124,14 +124,6 @@
@memcached.delete "key" rescue nil
lambda { @memcached.delete "key" }.should raise_error(Memcached::NotFound)
end
-
- #context "incr/decr" do
- #it "should incr key" do
- #@memcached.incr "intkey"
- #@memcached.incr "intkey"
- #@memcached.get("intkey").should == 1
- #end
- #end
end
context "flush" do
@@ -163,12 +155,6 @@
@memcached.get("jrubykey").should == "value"
end
- #it "should incr/decr with prefix_key" do
- #@prefix_memcached.incr "intkey"
- #@prefix_memcached.decr "intkey"
- #@memcached.get("jrubyintkey").should == 0
- #end
-
it "should add/replace with prefix_key" do
@prefix_memcached.add "newkey", "value"
@prefix_memcached.replace "newkey", "new_value"
@@ -188,5 +174,40 @@
end
end
end
+
+ context "timeout" do
+ before(:all) do
+ @memcached = Memcached.new("127.0.0.1:11211")
+ @timeout_memcached = Memcached.new("127.0.0.1:11211", :timeout => 1, :exception_retry_limit => 0)
+ end
+ after(:all) do
+ @timeout_memcached.quit
+ @memcached.quit
+ end
+
+ it "should set timeout" do
+ lambda { @timeout_memcached.set "key", "new_value" }.should raise_error(Memcached::ATimeoutOccurred)
+ end
+
+ it "should add timeout" do
+ @memcached.delete "key" rescue nil
+ lambda { @timeout_memcached.add "key", "new_value" }.should raise_error(Memcached::ATimeoutOccurred)
+ end
+
+ it "should replace timeout" do
+ @memcached.set "key", "value"
+ lambda { @timeout_memcached.replace "key", "new_value" }.should raise_error(Memcached::ATimeoutOccurred)
+ end
+
+ it "should delete timeout" do
+ @memcached.set "key", "value"
+ lambda { @timeout_memcached.delete "key" }.should raise_error(Memcached::ATimeoutOccurred)
+ end
+
+ it "should get timeout" do
+ @memcached.set "key", "value"
+ lambda { @timeout_memcached.get "key" }.should raise_error(Memcached::ATimeoutOccurred)
+ end
+ end
end
end
@@ -39,12 +39,18 @@
private int ttl;
+ private int timeout;
+
+ private int exceptionRetryLimit;
+
private String prefixKey;
public Memcached(final Ruby ruby, RubyClass rubyClass) {
super(ruby, rubyClass);
ttl = 604800;
+ timeout = -1;
+ exceptionRetryLimit = 5;
prefixKey = "";
}
@@ -90,19 +96,39 @@ public IRubyObject add(ThreadContext context, IRubyObject[] args) {
Ruby ruby = context.getRuntime();
String key = getFullKey(args[0].toString());
IRubyObject value = args[1];
- int timeout = getTimeout(args);
- try {
- boolean result = client.add(key, timeout, value, transcoder).get();
- if (result == false) {
- throw Error.newNotStored(ruby, "not stored");
+ int expiry = getExpiry(args);
+ int retry = 0;
+ while (true) {
+ try {
+ boolean result = client.add(key, expiry, value, transcoder).get();
+ if (result == false) {
+ throw Error.newNotStored(ruby, "not stored");
+ }
+ return context.nil;
+ } catch (ExecutionException e) {
+ if ("net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw ruby.newRuntimeError(e.getLocalizedMessage());
+ }
+ } catch (RuntimeException e) {
+ if (e.getCause() != null &&
+ "net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ throw ruby.newThreadError(e.getLocalizedMessage());
}
- return context.nil;
- } catch (OperationTimeoutException e) {
- throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
- } catch (ExecutionException e) {
- throw ruby.newRuntimeError(e.getLocalizedMessage());
- } catch (InterruptedException e) {
- throw ruby.newThreadError(e.getLocalizedMessage());
}
}
@@ -111,19 +137,39 @@ public IRubyObject replace(ThreadContext context, IRubyObject [] args) {
Ruby ruby = context.getRuntime();
String key = getFullKey(args[0].toString());
IRubyObject value = args[1];
- int timeout = getTimeout(args);
- try {
- boolean result = client.replace(key, timeout, value, transcoder).get();
- if (result == false) {
- throw Error.newNotStored(ruby, "not stored");
+ int expiry = getExpiry(args);
+ int retry = 0;
+ while (true) {
+ try {
+ boolean result = client.replace(key, expiry, value, transcoder).get();
+ if (result == false) {
+ throw Error.newNotStored(ruby, "not stored");
+ }
+ return context.nil;
+ } catch (ExecutionException e) {
+ if ("net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw ruby.newRuntimeError(e.getLocalizedMessage());
+ }
+ } catch (RuntimeException e) {
+ if (e.getCause() != null &&
+ "net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ throw ruby.newThreadError(e.getLocalizedMessage());
}
- return context.nil;
- } catch (OperationTimeoutException e) {
- throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
- } catch (ExecutionException e) {
- throw ruby.newRuntimeError(e.getLocalizedMessage());
- } catch (InterruptedException e) {
- throw ruby.newThreadError(e.getLocalizedMessage());
}
}
@@ -132,19 +178,39 @@ public IRubyObject set(ThreadContext context, IRubyObject[] args) {
Ruby ruby = context.getRuntime();
String key = getFullKey(args[0].toString());
IRubyObject value = args[1];
- int timeout = getTimeout(args);
- try {
- boolean result = client.set(key, timeout, value, transcoder).get();
- if (result == false) {
- throw Error.newNotStored(ruby, "not stored");
+ int expiry = getExpiry(args);
+ int retry = 0;
+ while (true) {
+ try {
+ boolean result = client.set(key, expiry, value, transcoder).get();
+ if (result == false) {
+ throw Error.newNotStored(ruby, "not stored");
+ }
+ return context.nil;
+ } catch (ExecutionException e) {
+ if ("net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw ruby.newRuntimeError(e.getLocalizedMessage());
+ }
+ } catch (RuntimeException e) {
+ if (e.getCause() != null &&
+ "net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ throw ruby.newThreadError(e.getLocalizedMessage());
}
- return context.nil;
- } catch (OperationTimeoutException e) {
- throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
- } catch (ExecutionException e) {
- throw ruby.newRuntimeError(e.getLocalizedMessage());
- } catch (InterruptedException e) {
- throw ruby.newThreadError(e.getLocalizedMessage());
}
}
@@ -153,11 +219,15 @@ public IRubyObject get(ThreadContext context, IRubyObject[] args) {
Ruby ruby = context.getRuntime();
IRubyObject keys = args[0];
if (keys instanceof RubyString) {
- IRubyObject value = client.get(getFullKey(keys.toString()), transcoder);
- if (value == null) {
- throw Error.newNotFound(ruby, "not found");
+ try {
+ IRubyObject value = client.get(getFullKey(keys.toString()), transcoder);
+ if (value == null) {
+ throw Error.newNotFound(ruby, "not found");
+ }
+ return value;
+ } catch (OperationTimeoutException e) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
}
- return value;
} else if (keys instanceof RubyArray) {
RubyHash results = RubyHash.newHash(ruby);
@@ -177,8 +247,8 @@ public IRubyObject incr(ThreadContext context, IRubyObject[] args) {
Ruby ruby = context.getRuntime();
String key = getFullKey(args[0].toString());
int by = getIncrDecrBy(args);
- int timeout = getTimeout(args);
- long result = client.incr(key, by, 1, timeout);
+ int expiry = getExpiry(args);
+ long result = client.incr(key, by, 1, expiry);
return ruby.newFixnum(result);
}
@@ -187,26 +257,46 @@ public IRubyObject decr(ThreadContext context, IRubyObject[] args) {
Ruby ruby = context.getRuntime();
String key = getFullKey(args[0].toString());
int by = getIncrDecrBy(args);
- int timeout = getTimeout(args);
- long result = client.decr(key, by, 0, timeout);
+ int expiry = getExpiry(args);
+ long result = client.decr(key, by, 0, expiry);
return ruby.newFixnum(result);
}
- @JRubyMethod
+ @JRubyMethod(name = "delete")
public IRubyObject delete(ThreadContext context, IRubyObject key) {
Ruby ruby = context.getRuntime();
- try {
- boolean result = client.delete(getFullKey(key.toString())).get();
- if (result == false) {
- throw Error.newNotFound(ruby, "not found");
+ int retry = 0;
+ while (true) {
+ try {
+ boolean result = client.delete(getFullKey(key.toString())).get();
+ if (result == false) {
+ throw Error.newNotFound(ruby, "not found");
+ }
+ return context.nil;
+ } catch (ExecutionException e) {
+ if ("net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw ruby.newRuntimeError(e.getLocalizedMessage());
+ }
+ } catch (RuntimeException e) {
+ if (e.getCause() != null &&
+ "net.spy.memcached.internal.CheckedOperationTimeoutException".equals(e.getCause().getClass().getName())) {
+ if (retry == exceptionRetryLimit) {
+ throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
+ }
+ retry++;
+ continue;
+ } else {
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ throw ruby.newThreadError(e.getLocalizedMessage());
}
- return context.nil;
- } catch (OperationTimeoutException e) {
- throw Error.newATimeoutOccurred(ruby, e.getLocalizedMessage());
- } catch (ExecutionException e) {
- throw ruby.newRuntimeError(e.getLocalizedMessage());
- } catch (InterruptedException e) {
- throw ruby.newThreadError(e.getLocalizedMessage());
}
}
@@ -278,6 +368,12 @@ protected IRubyObject init(ThreadContext context, List<String> servers, RubyHash
if (opts.containsKey(ruby.newSymbol("default_ttl"))) {
ttl = Integer.parseInt(opts.get(ruby.newSymbol("default_ttl")).toString());
}
+ if (opts.containsKey(ruby.newSymbol("timeout"))) {
+ timeout = Integer.parseInt(opts.get(ruby.newSymbol("timeout")).toString());
+ }
+ if (opts.containsKey(ruby.newSymbol("exception_retry_limit"))) {
+ exceptionRetryLimit = Integer.parseInt(opts.get(ruby.newSymbol("exception_retry_limit")).toString());
+ }
if (opts.containsKey(ruby.newSymbol("namespace"))) {
prefixKey = opts.get(ruby.newSymbol("namespace")).toString();
}
@@ -321,6 +417,9 @@ protected IRubyObject init(ThreadContext context, List<String> servers, RubyHash
builder.setShouldOptimize(true);
}
+ if (timeout != -1) {
+ builder.setOpTimeout(timeout);
+ }
builder.setDaemon(true);
client = new MemcachedClient(builder.build(), addresses);
@@ -336,7 +435,7 @@ protected IRubyObject init(ThreadContext context, List<String> servers, RubyHash
return context.nil;
}
- private int getTimeout(IRubyObject[] args) {
+ private int getExpiry(IRubyObject[] args) {
if (args.length > 2) {
return (int) args[2].convertToInteger().getLongValue();
}
Binary file not shown.

0 comments on commit 793f342

Please sign in to comment.