Skip to content
Browse files

Merged Robey's fork

  • Loading branch information...
2 parents f368fcf + 5d192f5 commit 98a49e1aa243e3e5393054882e834b78ce70d8ee @szegedi szegedi committed Apr 5, 2011
View
2 .gitignore
@@ -5,4 +5,4 @@ target
*.settings
*.DS_Store
*.class
-
+jredis.tmproj
View
16 core/api/src/main/java/org/jredis/JRedisFuture.java
@@ -347,22 +347,22 @@
* @param listkey
* @param value
*/
- public Future<ResponseStatus> rpush (String listkey, byte[] value);
- public Future<ResponseStatus> rpush (String listkey, String stringValue);
- public Future<ResponseStatus> rpush (String listkey, Number numberValue);
+ public Future<Long> rpush (String listkey, byte[] value);
+ public Future<Long> rpush (String listkey, String stringValue);
+ public Future<Long> rpush (String listkey, Number numberValue);
public <T extends Serializable>
- Future<ResponseStatus> rpush (String listkey, T object);
+ Future<Long> rpush (String listkey, T object);
/**
* @Redis LPUSH
* @param listkey
* @param value
*/
- public Future<ResponseStatus> lpush (String listkey, byte[] value);
- public Future<ResponseStatus> lpush (String listkey, String stringValue);
- public Future<ResponseStatus> lpush (String listkey, Number numberValue);
+ public Future<Long> lpush (String listkey, byte[] value);
+ public Future<Long> lpush (String listkey, String stringValue);
+ public Future<Long> lpush (String listkey, Number numberValue);
public <T extends Serializable>
- Future<ResponseStatus> lpush (String listkey, T object);
+ Future<Long> lpush (String listkey, T object);
/**
* @Redis LSET
View
10 core/api/src/main/java/org/jredis/protocol/Command.java
@@ -61,6 +61,7 @@
// "Commands operating on the key space"
KEYS (RequestType.KEY, ResponseType.MULTI_BULK),
+ KEYSTOLIST (RequestType.KEY_KEY, ResponseType.NUMBER),
RANDOMKEY (RequestType.NO_ARG, ResponseType.BULK),
RENAME (RequestType.KEY_KEY, ResponseType.STATUS),
RENAMENX (RequestType.KEY_KEY, ResponseType.BOOLEAN),
@@ -70,8 +71,11 @@
TTL (RequestType.KEY, ResponseType.NUMBER),
// Commands operating on lists
- RPUSH (RequestType.KEY_VALUE, ResponseType.STATUS),
- LPUSH (RequestType.KEY_VALUE, ResponseType.STATUS),
+ RPUSH (RequestType.KEY_VALUE, ResponseType.NUMBER),
+ RPUSHX (RequestType.KEY_VALUE, ResponseType.NUMBER),
+ LPUSH (RequestType.KEY_VALUE, ResponseType.NUMBER),
+ LPUSHX (RequestType.KEY_VALUE, ResponseType.NUMBER),
+ LINSERT (RequestType.BULK_SET, ResponseType.NUMBER),
LLEN (RequestType.KEY, ResponseType.NUMBER),
LRANGE (RequestType.KEY_NUM_NUM, ResponseType.MULTI_BULK),
LTRIM (RequestType.KEY_NUM_NUM, ResponseType.STATUS),
@@ -463,4 +467,4 @@ static final public int bitclear(int bitset, Flag...flags){
{NULL,NULL,0,0,NULL,0,0,0}
};
-*/
+*/
View
73 core/ri/src/main/java/org/jredis/ri/alphazero/JRedisFutureSupport.java
@@ -179,17 +179,59 @@ public FutureStatus rename(String oldkey, String newkey) {
Future<Response> futureResponse = this.queueRequest(Command.RENAMENX, oldkeydata, newkeydata);
return new FutureBoolean(futureResponse);
}
- public FutureStatus rpush(String key, byte[] value) {
+ public FutureLong rpush(String key, byte[] value) {
byte[] keybytes = null;
if((keybytes = JRedisSupport.getKeyBytes(key)) == null)
throw new IllegalArgumentException ("invalid key => ["+key+"]");
if(value == null)
throw new IllegalArgumentException ("null value");
- return new FutureStatus(this.queueRequest(Command.RPUSH, keybytes, value));
+ return new FutureLong(this.queueRequest(Command.RPUSH, keybytes, value));
}
+ public FutureLong rpushx(String key, byte[] value) {
+ byte[] keybytes = null;
+ if ((keybytes = getKeyBytes(key)) == null)
+ throw new IllegalArgumentException ("invalid key => ["+key+"]");
+
+ if (value == null)
+ throw new IllegalArgumentException ("null value");
+
+ return new FutureLong(this.queueRequest(Command.RPUSHX, keybytes, value));
+ }
+
+ public FutureLong lpushx(String key, byte[] value) {
+ byte[] keybytes = null;
+ if ((keybytes = getKeyBytes(key)) == null)
+ throw new IllegalArgumentException ("invalid key => ["+key+"]");
+
+ if (value == null)
+ throw new IllegalArgumentException ("null value");
+
+ return new FutureLong(this.queueRequest(Command.LPUSHX, keybytes, value));
+ }
+
+ public FutureLong linsert(String key, boolean after, byte[] oldvalue, byte[] newvalue) {
+ byte[] keybytes = null;
+ if ((keybytes = getKeyBytes(key)) == null)
+ throw new IllegalArgumentException ("invalid key => ["+key+"]");
+ byte[][] bulk = new byte[4][];
+ bulk[0] = keybytes;
+ bulk[1] = (after ? "AFTER" : "BEFORE").getBytes();
+ bulk[2] = oldvalue;
+ bulk[3] = newvalue;
+ return new FutureLong(this.queueRequest(Command.LINSERT, bulk));
+ }
+
+ public FutureLong linsertAfter(String key, byte[] oldvalue, byte[] newvalue) {
+ return linsert(key, true, oldvalue, newvalue);
+ }
+
+ public FutureLong linsertBefore(String key, byte[] oldvalue, byte[] newvalue) {
+ return linsert(key, false, oldvalue, newvalue);
+ }
+
// @Override
public FutureByteArray rpoplpush (String srcList, String destList) {
byte[] srckeybytes = null;
@@ -203,16 +245,16 @@ public FutureByteArray rpoplpush (String srcList, String destList) {
return new FutureByteArray(futureResponse);
}
// @Override
- public FutureStatus rpush(String key, String value) {
+ public FutureLong rpush(String key, String value) {
// rpush(key, DefaultCodec.encode(value));
return rpush(key, DefaultCodec.encode(value));
}
// @Override
- public FutureStatus rpush(String key, Number value) {
+ public FutureLong rpush(String key, Number value) {
return rpush(key, String.valueOf(value).getBytes());
}
// @Override
- public <T extends Serializable> FutureStatus rpush (String key, T value)
+ public <T extends Serializable> FutureLong rpush (String key, T value)
{
return rpush(key, DefaultCodec.encode(value));
}
@@ -821,6 +863,17 @@ public FutureStatus mset(KeyValueSet.Numbers keyValueMap){
return new FutureKeyList(futureResponse);
}
+ public Future<Long> keystolist(String pattern, String listname) {
+ byte[] keydata = null;
+ if((keydata = getKeyBytes(pattern)) == null)
+ throw new IllegalArgumentException ("null key.");
+ byte[] listnamedata = null;
+ if((listnamedata = getKeyBytes(listname)) == null)
+ throw new IllegalArgumentException ("null list name.");
+
+ return new FutureLong(this.queueRequest(Command.KEYSTOLIST, keydata, listnamedata));
+ }
+
// @Override
public Future<List<byte[]>> lrange(String key, long from, long to) {
byte[] keybytes = null;
@@ -1112,7 +1165,7 @@ public FutureStatus sdiffstore(String dest, String... sets) {
// @Override
- public FutureStatus lpush(String key, byte[] value) {
+ public FutureLong lpush(String key, byte[] value) {
byte[] keybytes = null;
if((keybytes = JRedisSupport.getKeyBytes(key)) == null)
throw new IllegalArgumentException ("invalid key => ["+key+"]");
@@ -1121,18 +1174,18 @@ public FutureStatus lpush(String key, byte[] value) {
throw new IllegalArgumentException ("null value");
- return new FutureStatus(this.queueRequest(Command.LPUSH, keybytes, value));
+ return new FutureLong(this.queueRequest(Command.LPUSH, keybytes, value));
}
// @Override
- public FutureStatus lpush(String key, String value) {
+ public FutureLong lpush(String key, String value) {
return lpush(key, DefaultCodec.encode(value));
}
// @Override
- public FutureStatus lpush(String key, Number value) {
+ public FutureLong lpush(String key, Number value) {
return lpush(key, String.valueOf(value).getBytes());
}
// @Override
- public <T extends Serializable> FutureStatus lpush (String key, T value)
+ public <T extends Serializable> FutureLong lpush (String key, T value)
{
return lpush(key, DefaultCodec.encode(value));
}
View
31 core/ri/src/main/java/org/jredis/ri/alphazero/connection/ConnectionBase.java
@@ -118,21 +118,17 @@ protected ConnectionBase (ConnectionSpec spec)
catch (Exception e) {
throw new ProviderException("Unexpected error on initialize -- BUG", e);
}
- // TODO: problematic in constructor.
- if(spec.getConnectionFlag(Flag.CONNECT_IMMEDIATELY)) {
- connect ();
- }
+
+ if (spec.getConnectionFlag(Flag.CONNECT_IMMEDIATELY)) {
+ try {
+ connect ();
+ } catch (ClientRuntimeException e) {
+ // if connecting failed, clean up on our way out.
+ cleanup();
+ throw e;
+ }
+ }
}
- /*
- * TDOD: this is the right way but breaks some assumptions in various impls.
- * - need to add INITIALIZED state and go from there.
- */
-// @Override
-// public void initialize() throws ClientRuntimeException, ProviderException {
-// if(spec.getConnectionFlag(Flag.CONNECT_IMMEDIATELY)) {
-// connect ();
-// }
-// }
// ------------------------------------------------------------------------
// Interface
@@ -212,6 +208,12 @@ protected void initializeComponents () {
}
}
+ protected void cleanup () {
+ if (heartbeat != null) {
+ heartbeat.exit();
+ }
+ }
+
/**
* Extension point -- callback on this method when {@link ConnectionBase} has connected to server.
* <b>It is important to note that the extension must call super.notifyConnected</b> if reliable service (using
@@ -374,6 +376,7 @@ protected final void disconnect () throws IllegalStateException {
socketClose();
isConnected = false;
+ cleanup();
notifyDisconnected();
Log.debug ("DISCONNECTED | conn: %s", toString());
}
View
12 core/ri/src/main/java/org/jredis/ri/alphazero/connection/PipelineConnectionBase.java
@@ -127,6 +127,12 @@ protected void initializeComponents () {
}
@Override
+ protected void cleanup () {
+ super.cleanup();
+ respHandlerThread.interrupt();
+ }
+
+ @Override
protected void notifyConnected () {
super.notifyConnected();
Log.log("Pipeline <%s> connected", this);
@@ -336,7 +342,9 @@ public void run () {
}
}
catch (InterruptedException e1) {
- e1.printStackTrace();
+ Log.log("Pipeline thread interrupted.");
+ break;
+ //e1.printStackTrace();
}
}
Log.log("Pipeline <%s> thread for <%s> stopped.", Thread.currentThread().getName(), PipelineConnectionBase.this);
@@ -392,4 +400,4 @@ public void onEvent (Event event) {
}
}
}
-}
+}
View
2 core/ri/src/main/java/org/jredis/ri/alphazero/protocol/ProtocolBase.java
@@ -228,7 +228,7 @@ public Request createRequest(Command cmd, byte[]... args) throws ProviderExcepti
break;
case BULK_SET:
- Assert.isTrue(cmd == Command.MSET || cmd == Command.MSETNX , "Only MSET/NX bulk commands are supported", NotSupportedException.class);
+ Assert.isTrue(cmd == Command.MSET || cmd == Command.MSETNX || cmd == Command.LINSERT, "Only MSET, MSETNX, LINSERT bulk commands are supported", NotSupportedException.class);
byte[] setCmdLenBytes = Convert.toBytes(cmd.bytes.length);
byte[] bulkSetLineCntBytes = Convert.toBytes(args.length+1);
View
13 core/ri/src/main/java/org/jredis/ri/alphazero/support/Log.java
@@ -16,7 +16,8 @@
package org.jredis.ri.alphazero.support;
-import org.apache.commons.logging.LogFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
*
@@ -26,7 +27,7 @@
*
*/
public class Log {
- public static org.apache.commons.logging.Log logger = LogFactory.getLog("JREDIS");
+ public static Logger logger = Logger.getLogger("org.jredis.JRedis");
public enum Category { INFO, DEBUG, ERROR, PROBLEM, BUG }
// the various 'just FYI's ...
@@ -36,7 +37,7 @@ public static final void log (String format, Object...args) {
}
public static final void debug (String msg) { debug(msg, (Object[])null); }
public static final void debug (String format, Object...args) {
- logger.debug(String.format(format, args));
+ logger.log(Level.FINE, String.format(format, args));
}
// the various 'error! run for covers', ...
@@ -52,8 +53,8 @@ public static final void debug (String format, Object...args) {
private static final void _error (Category cat, String msg, Object...args) {
msg = String.format(msg, args);
if(cat.equals(Category.ERROR))
- logger.error(String.format("%s", msg));
+ logger.severe(String.format("%s", msg));
else
- logger.error(String.format("%s: %s", cat, msg));
+ logger.log(Level.WARNING, String.format("%s: %s", cat, msg));
}
-}
+}
View
11 pom.xml
@@ -72,17 +72,6 @@
<scope>test</scope>
<classifier>jdk15</classifier>
</dependency>
- <!-- logging -->
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.12</version>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- </dependency>
</dependencies>
</dependencyManagement>

0 comments on commit 98a49e1

Please sign in to comment.
Something went wrong with that request. Please try again.