Permalink
Browse files

CRAN Version 1.6.7; all nonblocking now, added redisBRPopLPush, added…

… connection timeout fixing problems with blocking ops, added redisEval
  • Loading branch information...
1 parent 715463b commit b653314f1d4dc798b5355e9012898ee2348b02a3 @bwlewis committed Jul 16, 2012
Showing with 113 additions and 40 deletions.
  1. +2 −2 DESCRIPTION
  2. +11 −1 NEWS
  3. +6 −8 R/controlCMD.R
  4. +11 −7 R/hashCMD.R
  5. +5 −0 R/listCMD.R
  6. +23 −21 R/redis-internal.R
  7. +53 −0 man/redisBRPopLPush.Rd
  8. +2 −1 man/redisConnect.Rd
View
@@ -1,8 +1,8 @@
Package: rredis
Type: Package
Title: Redis client for R
-Version: 1.6.5
-Date: 2012-05-14
+Version: 1.6.7
+Date: 2012-07-14
Author: B. W. Lewis
Maintainer: B. W. Lewis <blewis@illposed.net>
Description: An R client for the Redis persistent key-value database available from http://redis.io.
View
@@ -1,4 +1,14 @@
-1.6.4:
+1.6.7:
+- Nonblocking connections in R are problematic, we switched
+ to blocking mode.
+- Added redisBRPopLPush
+- Added connection timeout option to redisConnect
+- Added 'eval' lua scripting functions
+
+1.6.6:
+- Fixed a bug in HMSet.
+
+1.6.5:
- Added missing HMGet function.
- Added generic redis function.
View
@@ -18,22 +18,20 @@
}
`redisConnect` <-
-function(host='localhost', port=6379, returnRef=FALSE)
+function(host='localhost', port=6379, returnRef=FALSE, timeout=2147483647L)
{
.redisEnv$current <- new.env()
-# R Windows appears to suffer from a serious problem affecting non-blocking
-# connections and readBin with raw data, see:
+# R nonblocking connections are flaky, especially on Windows, see
+# for example:
# http://www.mail-archive.com/r-devel@r-project.org/msg16420.html.
-# We force blocking connections on Windows systems to work around this.
- if(Sys.info()[[1]] == "Windows")
- con <- socketConnection(host, port, open='a+b', blocking=TRUE)
- else
- con <- socketConnection(host, port, open='a+b')
+# So, we use blocking connections now.
+ con <- socketConnection(host, port, open='a+b', blocking=TRUE, timeout=timeout)
# Stash state in the redis enivronment describing this connection:
assign('con',con,envir=.redisEnv$current)
assign('host',host,envir=.redisEnv$current)
assign('port',port,envir=.redisEnv$current)
assign('block',TRUE,envir=.redisEnv$current)
+ assign('timeout',timeout,envir=.redisEnv$current)
# Count is for nonblocking communication, it keeps track of the number of
# getResponse calls that are pending.
assign('count',0,envir=.redisEnv$current)
View
@@ -15,13 +15,6 @@ redisHSet <- function(key, field, value, NX=FALSE) {
1 == .redisCmd(.raw(cmd), .raw(key), .raw(field), value)
}
-redisHMSet <- function(key, values) {
- a <- c(alist(),list(.raw('HMSET')))
- fieldnames <- lapply(names(values), charToRaw)
- a <- c(a, Map(list, fieldnames, values, USE.NAMES=FALSE))
- do.call('.redisCmd', a)
-}
-
redisHIncrBy <- function(key, field, value)
{
.redisCmd(.raw('HINCRBY'),.raw(key),.raw(field),.raw(as.character(value)))
@@ -75,3 +68,14 @@ redisHMGet <- function(key, fields) {
if(length(retval) == length(fields)) names(retval) <- fields
retval
}
+
+redisHMSet <- function(key, values) {
+ a <- c(alist(),list(.raw('HMSET')),list(.raw(key)))
+ fieldnames <- lapply(names(values), charToRaw)
+ for(j in 1:length(values)) {
+ a <- c(a, fieldnames[j])
+ if(is.character(values[[j]])) a <- c(a, list(charToRaw(values[[j]])))
+ else(a <- c(a,list(.cerealize(values[[j]]))))
+ }
+ do.call('.redisCmd', a)
+}
View
@@ -73,3 +73,8 @@ redisBLPop <- function(keys, timeout=0) {
}
x
}
+
+redisBRPopLPush <- function(src, dest, timeout=0) {
+ tout <- as.character(timeout)
+ .redisCmd(.raw('BRPOPLPUSH'), .raw(src), .raw(dest), .raw(tout))
+}
View
@@ -14,14 +14,15 @@
# .redisError may be called by any function when a serious error occurs.
# It will print an indicated error message, attempt to reset the current
# Redis server connection, and signal the error.
-.redisError <- function(msg)
+.redisError <- function(msg, e=NULL)
{
env <- .redisEnv$current
con <- .redis()
close(con)
# May stop with an error here on connect fail
- con <- socketConnection(env$host, env$port,open='a+b')
+ con <- socketConnection(env$host, env$port,open='a+b', blocking=TRUE, timeout=env$timeout)
assign('con',con,envir=env)
+ if(!is.null(e)) print(as.character(e))
stop(msg)
}
@@ -38,21 +39,22 @@
}
# Burn data in the RX buffer, used after interrupt conditions
-.burn <- function()
+.burn <- function(e)
{
con <- .redis()
while(socketSelect(list(con),timeout=1L))
readBin(con, raw(), 1000000L)
- .redisError("Interrupted communincation with Redis")
+ .redisError("Interrupted communincation with Redis",e)
}
.getResponse <- function(raw=FALSE)
{
env <- .redisEnv$current
tryCatch({
con <- .redis()
- socketSelect(list(con))
+# socketSelect(list(con), timeout=10L)
l <- readLines(con=con, n=1)
+ if(length(l)==0) .burn("Empty")
tryCatch(
env$count <- max(env$count - 1,0),
error = function(e) assign('count', 0, envir=env)
@@ -63,7 +65,7 @@ tryCatch({
# '+' is a valid retrun message on at least one cmd (RANDOMKEY)
return('')
}
- .redisError('Message garbled')
+ .burn("Invalid")
}
switch(s,
'-' = stop(substr(l,2,nchar(l))),
@@ -74,12 +76,12 @@ tryCatch({
if (n < 0) {
return(NULL)
}
- socketSelect(list(con))
+# socketSelect(list(con),timeout=10L)
dat <- tryCatch(readBin(con, 'raw', n=n),
error=function(e) .redisError(e$message))
m <- length(dat)
if(m==n) {
- socketSelect(list(con))
+# socketSelect(list(con),timeout=10L)
l <- readLines(con,n=1) # Trailing \r\n
if(raw)
return(dat)
@@ -97,7 +99,7 @@ tryCatch({
r[j] <- list(dat)
while(m<n) {
# Short read; we need to retrieve the rest of this message.
- socketSelect(list(con))
+# socketSelect(list(con),timeout=10L)
dat <- tryCatch(readBin(con, 'raw', n=(n-m)),
error=function (e) .redisError(e$message))
j <- j + 1
@@ -108,7 +110,7 @@ tryCatch({
r[j] <- list(dat)
m <- m + length(dat)
}
- socketSelect(list(con))
+# socketSelect(list(con),timeout=10L)
l <- readLines(con,n=1) # Trailing \r\n
length(r) <- j
if(raw)
@@ -124,7 +126,7 @@ tryCatch({
} else NULL
},
stop('Unknown message type'))
-}, interrupt=function(e) .burn()
+}, interrupt=function(e) .burn(e)
)
}
@@ -169,7 +171,7 @@ redisCmd <- function(CMD, ..., raw=FALSE)
f <- match.call()
n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='')
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
# cat(hdr, file=con)
writeBin(.raw(hdr), con)
tryCatch({
@@ -178,18 +180,18 @@ tryCatch({
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
# cat(hdr, file=con)
writeBin(.raw(hdr), con)
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
writeBin(v, con)
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
# cat('\r\n', file=con)
writeBin(.raw('\r\n'), con)
}
},
error=function(e) {.redisError("Invalid agrument");invisible()},
-interrupt=function(e) .burn()
+interrupt=function(e) .burn(e)
)
block <- TRUE
@@ -209,24 +211,24 @@ interrupt=function(e) .burn()
f <- match.call()
n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='')
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
cat(hdr, file=con)
tryCatch({
for(j in seq_len(n)) {
v <- eval(f[[j+1]],envir=sys.frame(-1))
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
cat(hdr, file=con)
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
writeBin(v, con)
- socketSelect(list(con), write=TRUE)
+# socketSelect(list(con),timeout=10L, write=TRUE)
cat('\r\n', file=con)
}
},
error=function(e) {.redisError("Invalid agrument");invisible()},
-interrupt=function(e) .burn()
+interrupt=function(e) .burn(e)
)
.getResponse(raw=TRUE)
}
@@ -0,0 +1,53 @@
+\name{redisBRPopLPush}
+\alias{redisBRPopLPush}
+\title{Remove the tail from a list, blocking if it does not exist, pushing to another.}
+\description{
+Atomically return and remove the last (tail) element of the src list, blocking
+if the element does not exist, and
+push the element as the first (head) element of the dst list.
+}
+\usage{
+redisBRPopLPush(src, dest, timeout = 0)
+}
+\arguments{
+ \item{src}{A key corresponding to the source list.}
+ \item{dest}{A key corresponding to the destination list.}
+ \item{timeout}{Block for at most timeout seconds. Set to zero to block indefinitely.}
+}
+\details{
+Atomically return and remove the last (tail) element of the src list, blocking
+until the element exists, and
+push the element as the first (head) element of the dst list. For example if
+the source list contains the elements "a","b","c" and the destination list
+contains the elements "foo","bar" after a \code{redisRPopLPush}
+command the content of the
+two lists will be "a","b" and "c","foo","bar".
+
+If the key does not exist or the list is already empty the special value NULL
+is returned. If the srckey and dstkey are the same the operation is equivalent
+to removing the last element from the list and pusing it as first element of
+the list, so it's a "list rotation" command.
+
+See the Redis reference below for programming examples and discussion.
+}
+\value{
+The value moved or rotated across lists, or NULL if the source key does
+not exist or corresponds to an empty list. An error is thrown if either
+of the keys does not correspond to a value of 'list' type.
+}
+\references{
+http://redis.io/commands
+}
+\author{
+B. W. Lewis
+}
+\seealso{
+\code{\link{redisRPopLPush}}
+}
+\examples{
+\dontrun{
+redisConnect()
+redisLPush('x',1)
+redisBRPopLPush('x','x')
+}
+}
View
@@ -3,12 +3,13 @@
\title{Connect to a Redis server.}
\description{Connect to an available Redis server on the specified port.}
\usage{
-redisConnect(host = "localhost", port = 6379, returnRef = FALSE)
+redisConnect(host = "localhost", port = 6379, returnRef = FALSE, timeout = 2147483647L)
}
\arguments{
\item{host}{The Redis server host name or inet address (optional, character)}
\item{port}{The Redis port number (optional, numeric or integer)}
\item{returnRef}{Set returnRef=TRUE to return a list describing the Redis connection (not presently useful).}
+ \item{timeout}{Optional TCP connection timeout value in seconds (integer).}
}
\details{A running instance of a Redis server is required.}

0 comments on commit b653314

Please sign in to comment.