Skip to content
This repository has been archived by the owner on Apr 8, 2023. It is now read-only.

Commit

Permalink
Better context switching
Browse files Browse the repository at this point in the history
  • Loading branch information
B. W. Lewis committed Oct 15, 2011
1 parent 45a64a6 commit 64865ee
Show file tree
Hide file tree
Showing 99 changed files with 192 additions and 158 deletions.
4 changes: 4 additions & 0 deletions NEWS
Expand Up @@ -2,6 +2,10 @@ Significant changes in version 1.6:
- Transactions are supported. - Transactions are supported.
- Pub/sub is supported. - Pub/sub is supported.
- Better support for raw value types across the board. - Better support for raw value types across the board.
1.6.3:
- Support for new redisInfo format
- Improved interrupt handling during communication
- Minor man page fixes


Significant changes in version 1.4: Significant changes in version 1.4:
- Many bugs were fixed in the sorted sets (redisZ*) functions. - Many bugs were fixed in the sorted sets (redisZ*) functions.
Expand Down
3 changes: 1 addition & 2 deletions R/allValCMD.R
Expand Up @@ -70,8 +70,7 @@ redisRandomKey <- function()
redisRename <- function(old, new, NX=FALSE) redisRename <- function(old, new, NX=FALSE)
{ {
if (NX) cmd <- 'RENAMENX' else cmd <- 'RENAME' if (NX) cmd <- 'RENAMENX' else cmd <- 'RENAME'
ret <- .redisCmd(.raw(cmd),.raw(old),.raw(new)) .redisCmd(.raw(cmd),.raw(old),.raw(new))
if (NX) 1==ret else ret
} }


redisExpire <- function(key, seconds) redisExpire <- function(key, seconds)
Expand Down
72 changes: 36 additions & 36 deletions R/controlCMD.R
@@ -1,51 +1,49 @@
# This file contains various control functions. # This file contains various control functions.


# Basic response handler, only really useful in nonblocking cases # Basic response handler, only really useful in nonblocking cases
`redisGetResponse` <- function() # all function argument is left in for backward compatibility,
# it is not used.
`redisGetResponse` <- function(all=TRUE)
{ {
if(!exists('count',where=.redisEnv)) return(.getResponse()) if(!exists('count',where=.redisEnv$current)) return(.getResponse())
if(.redisEnv$count < 1) return(NULL) if(.redisEnv$current$count < 1) return(NULL)
replicate(.redisEnv$count, .getResponse(), simplify=FALSE) replicate(.redisEnv$current$count, .getResponse(), simplify=FALSE)
} }


`redisSetBlocking` <- function(value=TRUE) `redisSetBlocking` <- function(value=TRUE)
{ {
value <- as.logical(value) value <- as.logical(value)
if(is.na(value)) stop("logical value required") if(is.na(value)) stop("logical value required")
assign('block',value,envir=.redisEnv) assign('block',value,envir=.redisEnv$current)
} }


`redisConnect` <- `redisConnect` <-
function(host='localhost', port=6379, returnRef=FALSE) function(host='localhost', port=6379, returnRef=FALSE)
{ {
connect <- FALSE .redisEnv$current <- new.env()
if(!exists("con",envir=.redisEnv)) connect <- TRUE
else connect <- tryCatch(!isOpen(.redisEnv$con), error=function(e) TRUE)
if(connect)
{
# R Windows appears to suffer from a serious problem affecting non-blocking # R Windows appears to suffer from a serious problem affecting non-blocking
# connections and readBin with raw data, see: # connections and readBin with raw data, see:
# http://www.mail-archive.com/r-devel@r-project.org/msg16420.html. # http://www.mail-archive.com/r-devel@r-project.org/msg16420.html.
# We force blocking connections on Windows systems to work around this. # We force blocking connections on Windows systems to work around this.
if(Sys.info()[[1]] == "Windows") if(Sys.info()[[1]] == "Windows")
con <- socketConnection(host, port, open='a+b', blocking=TRUE) con <- socketConnection(host, port, open='a+b', blocking=TRUE)
else else
con <- socketConnection(host, port, open='a+b') con <- socketConnection(host, port, open='a+b')
# Stash state in the redis enivronment describing this connection: # Stash state in the redis enivronment describing this connection:
assign('con',con,envir=.redisEnv) assign('con',con,envir=.redisEnv$current)
assign('host',host,envir=.redisEnv) assign('host',host,envir=.redisEnv$current)
assign('port',port,envir=.redisEnv) assign('port',port,envir=.redisEnv$current)
assign('block',TRUE,envir=.redisEnv$current)
# Count is for nonblocking communication, it keeps track of the number of # Count is for nonblocking communication, it keeps track of the number of
# getResponse calls that are pending. # getResponse calls that are pending.
assign('count',0,envir=.redisEnv) assign('count',0,envir=.redisEnv$current)
tryCatch(.redisPP(), tryCatch(.redisPP(),
error=function(e) { error=function(e) {
cat(paste('Error: ',e,'\n')) cat(paste('Error: ',e,'\n'))
close(con); close(con);
rm(list='con',envir=.redisEnv) rm(list='con',envir=.redisEnv$current)
}) })
} if(returnRef) return(.redisEnv$current)
if(returnRef) return(.redisEnv)
invisible() invisible()
} }


Expand All @@ -54,7 +52,7 @@ function()
{ {
con <- .redis() con <- .redis()
close(con) close(con)
remove(list='con',envir=.redisEnv) remove(list='con',envir=.redisEnv$current)
} }


`redisAuth` <- `redisAuth` <-
Expand Down Expand Up @@ -85,14 +83,16 @@ function()
function() function()
{ {
.redisCmd(.raw('SHUTDOWN')) .redisCmd(.raw('SHUTDOWN'))
remove(list='con',envir=.redisEnv) remove(list='con',envir=.redisEnv$current)
} }


`redisInfo` <- `redisInfo` <-
function() function()
{ {
x <- .redisCmd(.raw('INFO')) x <- .redisCmd(.raw('INFO'))
z <- strsplit(x,'\r\n') z <- strsplit(x,'\r\n')[[1]]
rj <- c(grep("^$",z), grep("^#",z))
if(length(rj)>0) z <- z[-rj]
w <- unlist(lapply(z,strsplit,':')) w <- unlist(lapply(z,strsplit,':'))
n <- length(w) n <- length(w)
e <- seq(from=2,to=n,by=2) e <- seq(from=2,to=n,by=2)
Expand Down Expand Up @@ -126,14 +126,14 @@ redisDBSize <- function() {
} }


redisGetContext <- function() { redisGetContext <- function() {
.redisEnv .redisEnv$current
} }


redisSetContext <- function(e=new.env()) redisSetContext <- function(e=NULL)
{ {
p <- environment(redisSetContext) if(is.null(e)) .redisEnv$current <- .redisEnv
unlockBinding('.redisEnv',p) else {
assign('.redisEnv',e,p) if(!is.environment(e)) stop("Invalid context")
lockBinding('.redisEnv',p) .redisEnv$current <- e
invisible() }
} }
51 changes: 39 additions & 12 deletions R/redis-internal.R
Expand Up @@ -2,21 +2,26 @@
# by the rredis package (not exported in the namespace). # by the rredis package (not exported in the namespace).


.redisEnv <- new.env() .redisEnv <- new.env()
.redisEnv$current <- .redisEnv


.redis <- function() .redis <- function()
{ {
tryCatch(get('con',envir=.redisEnv),error=function(e) stop('Not connected, try using redisConnect()')) if(!exists('con',envir=.redisEnv$current))
stop('Not connected, try using redisConnect()')
.redisEnv$current$con
} }


# .redisError may be called by any function when a serious error occurs. # .redisError may be called by any function when a serious error occurs.
# It will print an indicated error message, attempt to reset the current # It will print an indicated error message, attempt to reset the current
# Redis server connection, and signal the error. # Redis server connection, and signal the error.
.redisError <- function(msg) .redisError <- function(msg)
{ {
env <- .redisEnv$current
con <- .redis() con <- .redis()
close(con) close(con)
con <- socketConnection(.redisEnv$host, .redisEnv$port,open='a+b') # May stop with an error here on connect fail
assign('con',con,envir=.redisEnv) con <- socketConnection(env$host, env$port,open='a+b')
assign('con',con,envir=env)
stop(msg) stop(msg)
} }


Expand All @@ -32,14 +37,25 @@
else value else value
} }


# Burn data in the RX buffer, used after interrupt conditions
.burn <- function()
{
con <- .redis()
while(socketSelect(list(con),timeout=1L))
readBin(con, raw(), 1000000L)
.redisError("Interrupted communincation with Redis")
}

.getResponse <- function(raw=FALSE) .getResponse <- function(raw=FALSE)
{ {
env <- .redisEnv$current
tryCatch({
con <- .redis() con <- .redis()
socketSelect(list(con)) socketSelect(list(con))
l <- readLines(con=con, n=1) l <- readLines(con=con, n=1)
tryCatch( tryCatch(
.redisEnv$count <- max(.redisEnv$count - 1,0), env$count <- max(env$count - 1,0),
error = function(e) assign('count', 0, envir=.redisEnv) error = function(e) assign('count', 0, envir=env)
) )
s <- substr(l, 1, 1) s <- substr(l, 1, 1)
if (nchar(l) < 2) { if (nchar(l) < 2) {
Expand Down Expand Up @@ -108,6 +124,8 @@
} else NULL } else NULL
}, },
stop('Unknown message type')) stop('Unknown message type'))
}, interrupt=function(e) .burn()
)
} }


# #
Expand All @@ -134,17 +152,16 @@
# copy (which, unfortunately, is limited to 2GB due to R indexing). # copy (which, unfortunately, is limited to 2GB due to R indexing).
.redisCmd <- function(...) .redisCmd <- function(...)
{ {
env <- .redisEnv$current
con <- .redis() con <- .redis()
f <- match.call() f <- match.call()
n <- length(f) - 1 n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='') hdr <- paste('*', as.character(n), '\r\n',sep='')
socketSelect(list(con), write=TRUE) socketSelect(list(con), write=TRUE)
cat(hdr, file=con) cat(hdr, file=con)
tryCatch({
for(j in seq_len(n)) { for(j in seq_len(n)) {
tryCatch( v <- eval(f[[j+1]],envir=sys.frame(-1))
v <- eval(f[[j+1]],envir=sys.frame(-1)),
error=function(e) {.redisError("Invalid agrument");invisible()}
)
if(!is.raw(v)) v <- .cerealize(v) if(!is.raw(v)) v <- .cerealize(v)
l <- length(v) l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='') hdr <- paste('$', as.character(l), '\r\n', sep='')
Expand All @@ -155,13 +172,18 @@
socketSelect(list(con), write=TRUE) socketSelect(list(con), write=TRUE)
cat('\r\n', file=con) cat('\r\n', file=con)
} }
},
error=function(e) {.redisError("Invalid agrument");invisible()},
interrupt=function(e) .burn()
)

block <- TRUE block <- TRUE
if(exists('block',envir=.redisEnv)) block <- get('block',envir=.redisEnv) if(exists('block',envir=env)) block <- get('block',envir=env)
if(block) if(block)
return(.getResponse()) return(.getResponse())
tryCatch( tryCatch(
.redisEnv$count <- .redisEnv$count + 1, env$count <- env$count + 1,
error = function(e) assign('count', 1, envir=.redisEnv) error = function(e) assign('count', 1, envir=env)
) )
invisible() invisible()
} }
Expand All @@ -174,6 +196,7 @@
hdr <- paste('*', as.character(n), '\r\n',sep='') hdr <- paste('*', as.character(n), '\r\n',sep='')
socketSelect(list(con), write=TRUE) socketSelect(list(con), write=TRUE)
cat(hdr, file=con) cat(hdr, file=con)
tryCatch({
for(j in seq_len(n)) { for(j in seq_len(n)) {
v <- eval(f[[j+1]],envir=sys.frame(-1)) v <- eval(f[[j+1]],envir=sys.frame(-1))
if(!is.raw(v)) v <- .cerealize(v) if(!is.raw(v)) v <- .cerealize(v)
Expand All @@ -186,5 +209,9 @@
socketSelect(list(con), write=TRUE) socketSelect(list(con), write=TRUE)
cat('\r\n', file=con) cat('\r\n', file=con)
} }
},
error=function(e) {.redisError("Invalid agrument");invisible()},
interrupt=function(e) .burn()
)
.getResponse(raw=TRUE) .getResponse(raw=TRUE)
} }
2 changes: 1 addition & 1 deletion man/redis-package.Rd
Expand Up @@ -30,7 +30,7 @@ B. W. Lewis, with substantial assistance from Pat Sheilds
Maintainer: B. W. Lewis <blewis@illposed.net> Maintainer: B. W. Lewis <blewis@illposed.net>
} }
\references{ \references{
http://code.google.com/p/redis/ http://redis.io/commands
} }
\keyword{ package } \keyword{ package }
\examples{ \examples{
Expand Down
2 changes: 1 addition & 1 deletion man/redisAuth.Rd
Expand Up @@ -19,7 +19,7 @@ database, it's best not to use Redis for now.
TRUE if sueccessful, FALSE otherwise. TRUE if sueccessful, FALSE otherwise.
} }
\references{ \references{
http://code.google.com/p/redis/ http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisBLPop.Rd
Expand Up @@ -25,7 +25,7 @@ redisBLPop returns NULL after the timeout period, or a list containing:
\item{value}{The corresponding value.} \item{value}{The corresponding value.}
} }
\references{ \references{
http://code.google.com/p/redis/wiki/BlpopCommand http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisBRPop.Rd
Expand Up @@ -25,7 +25,7 @@ redisBRPop returns NULL after the timeout period, or a list containing:
\item{value}{The corresponding value.} \item{value}{The corresponding value.}
} }
\references{ \references{
http://code.google.com/p/redis/wiki/BlpopCommand http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisBgRewriteAOF.Rd
Expand Up @@ -22,7 +22,7 @@ references below.)
Nothing is returned. Nothing is returned.
} }
\references{ \references{
http://code.google.com/p/redis/ http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisBgSave.Rd
Expand Up @@ -12,7 +12,7 @@ redisBgSave()
save operation with the redisLastsave function. save operation with the redisLastsave function.
} }
\references{ \references{
http://code.google.com/p/redis/ http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
5 changes: 2 additions & 3 deletions man/redisClose.Rd
Expand Up @@ -6,13 +6,12 @@ server.}
\usage{ \usage{
redisClose() redisClose()
} }
\details{A running instance of a Redis server is required. See \details{A running instance of a Redis server is required. }
http://code.google.com/p/redis/ for details.}
\value{Nothing is returned. Errors are displayed if the function fails to \value{Nothing is returned. Errors are displayed if the function fails to
close the connection to the Redis server, or if the connection is invalid. close the connection to the Redis server, or if the connection is invalid.
} }
\references{ \references{
http://code.google.com/p/redis/ for details. http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
6 changes: 3 additions & 3 deletions man/redisConnect.Rd
Expand Up @@ -10,8 +10,8 @@ redisConnect(host = "localhost", port = 6379, returnRef = FALSE)
\item{port}{The Redis port number (optional, numeric or integer)} \item{port}{The Redis port number (optional, numeric or integer)}
\item{returnRef}{Set returnRef=TRUE to return a list describing the Redis connection (not presently useful).} \item{returnRef}{Set returnRef=TRUE to return a list describing the Redis connection (not presently useful).}
} }
\details{A running instance of a Redis server is required. See \details{A running instance of a Redis server is required.}
http://code.google.com/p/redis/ for details.}
\value{Nothing is returned by default. \value{Nothing is returned by default.
Errors are displayed if the function fails to Errors are displayed if the function fails to
connect to the specified Redis server. Disconnect from a connected server connect to the specified Redis server. Disconnect from a connected server
Expand All @@ -23,7 +23,7 @@ use this feature to support multiple Redis connections with
the \code{attachRedis} function. the \code{attachRedis} function.
} }
\references{ \references{
See http://code.google.com/p/redis/ for details. http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisDBSize.Rd
Expand Up @@ -15,7 +15,7 @@ among the available Redis databases.
The number of keys in the current database. The number of keys in the current database.
} }
\references{ \references{
http://code.google.com/p/redis/wiki/DbsizeCommand http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisDecr.Rd
Expand Up @@ -20,7 +20,7 @@ The new value of key after the decrement, returned as a character
string. string.
} }
\references{ \references{
http://code.google.com/p/redis/wiki/IncrCommand http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down
2 changes: 1 addition & 1 deletion man/redisDecrBy.Rd
Expand Up @@ -20,7 +20,7 @@ The new value of key after the decrement, returned as a character
string. string.
} }
\references{ \references{
http://code.google.com/p/redis/wiki/DecrCommand http://redis.io/commands
} }
\author{ \author{
B. W. Lewis B. W. Lewis
Expand Down

0 comments on commit 64865ee

Please sign in to comment.