Permalink
Browse files

Prototype adaptation of borrowed minimalist socket library from the w…

…ebsockets package
  • Loading branch information...
1 parent 9f714e0 commit 1427f5c50a6921ce7ec729ea37ef211089fa3e87 @bwlewis committed Sep 7, 2013
Showing with 55 additions and 60 deletions.
  1. +5 −14 R/controlCMD.R
  2. +1 −1 R/libsock.R
  3. +49 −45 R/redis-internal.R
View
@@ -28,20 +28,11 @@ function(host='localhost', port=6379, password=NULL,
returnRef=FALSE, timeout=2678399L)
{
.redisEnv$current <- new.env()
-# WARNING R nonblocking connections are flaky, especially on Windows, see
-# for example:
-# http://www.mail-archive.com/r-devel@r-project.org/msg16420.html.
-# So, we use only blocking connections.
-#
-# We track the file descriptor of the new connection in a sneaky way
- fds <- rownames(showConnections(all=TRUE))
- con <- socketConnection(host,port,open='a+b',blocking=TRUE,timeout=timeout)
- fd <- rownames(showConnections(all=TRUE))
- fd <- as.integer(setdiff(fd,fds))
+# con <- socketConnection(host,port,open='a+b',blocking=TRUE,timeout=timeout)
+ con <- .openConnection(host=host, port=port)
# Stash state in the redis enivronment describing this connection:
assign('con',con,envir=.redisEnv$current)
- assign('fd',fd,envir=.redisEnv$current)
assign('host',host,envir=.redisEnv$current)
assign('port',port,envir=.redisEnv$current)
assign('pipeline',FALSE,envir=.redisEnv$current)
@@ -52,13 +43,13 @@ function(host='localhost', port=6379, password=NULL,
if (!is.null(password)) tryCatch(redisAuth(password),
error=function(e) {
cat(paste('Error: ',e,'\n'))
- close(con);
+ .closeConnection(con);
rm(list='con',envir=.redisEnv$current)
})
tryCatch(.redisPP(),
error=function(e) {
cat(paste('Error: ',e,'\n'))
- close(con);
+ .closeConnection(con);
rm(list='con',envir=.redisEnv$current)
})
if(returnRef) return(.redisEnv$current)
@@ -70,7 +61,7 @@ function(e)
{
if(missing(e)) e = .redisEnv$current
con <- .redis(e)
- close(con)
+ .closeConnection(con)
remove(list='con',envir=e)
}
View
@@ -29,7 +29,7 @@
.SOCK_RECV_N = function(socket, N)
{
- .Call('SOCK_RECV_HTTP_HEAD', as.integer(socket), as.integer(N), PACKAGE='rredis')
+ .Call('SOCK_RECV_N', as.integer(socket), as.integer(N), PACKAGE='rredis')
}
# We trap the possibility of a SIGPIPE signal error during SOCK_SEND.
View
@@ -12,17 +12,32 @@
e$con
}
+.openConnection <- function(host, port, mode="a+b", timeout=0)
+{
+# socketConnection(env$host, env$port,open=mode,
+# blocking=FALSE, timeout=timeout)
+# mode, timeout no longer used
+ .SOCK_CONNECT(host, port)
+}
+
+.closeConnection <- function(s)
+{
+# close(s)
+ .SOCK_CLOSE(s)
+}
+
# .redisError may be called by any function when a serious error occurs.
# It will print an indicated error message, attempt to reset the current
# Redis server connection, and signal the error.
.redisError <- function(msg, e=NULL)
{
env <- .redisEnv$current
con <- .redis()
- close(con)
+ .closeConnection(con)
# May stop with an error here on connect fail
- con <- socketConnection(env$host, env$port,open='a+b',
- blocking=FALSE, timeout=env$timeout)
+# con <- socketConnection(env$host, env$port,open='a+b',
+# blocking=FALSE, timeout=env$timeout)
+ con <- .openConnection(env$host, env$port)
assign('con',con,envir=env)
if(!is.null(e)) print(as.character(e))
stop(msg)
@@ -44,8 +59,9 @@
.burn <- function(e)
{
con <- .redis()
- while(socketSelect(list(con),timeout=1L))
- readBin(con, raw(), 1000000L)
+# while(socketSelect(list(con),timeout=1L))
+# readBin(con, raw(), 1000000L)
+ .SOCK_RECV(con)
.redisError("Interrupted communincation with Redis",e)
}
@@ -54,7 +70,9 @@
#
.raw <- function(word)
{
- tryCatch(charToRaw(word),warning=function(w) stop(w), error=function(e) stop(e))
+ tryCatch(charToRaw(word),
+ warning=function(w) stop(w),
+ error=function(e) stop(e))
}
# Expose the basic Redis interface to the user
@@ -96,7 +114,8 @@ redisCmd <- function(CMD, ..., raw=FALSE)
f <- match.call()
n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='')
- writeBin(.raw(hdr), con)
+# writeBin(.raw(hdr), con)
+ .SOCK_SEND(con, .raw(hdr))
tryCatch({
for(j in seq_len(n)) {
if(j==1)
@@ -106,9 +125,12 @@ redisCmd <- function(CMD, ..., raw=FALSE)
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
- writeBin(.raw(hdr), con)
- writeBin(v, con)
- writeBin(.raw('\r\n'), con)
+# writeBin(.raw(hdr), con)
+# writeBin(v, con)
+# writeBin(.raw('\r\n'), con)
+ .SOCK_SEND(con, .raw(hdr))
+ .SOCK_SEND(con, v)
+ .SOCK_SEND(con, .raw("\r\n"))
}
},
error=function(e) {.redisError("Invalid agrument");invisible()},
@@ -135,7 +157,8 @@ redisCmd <- function(CMD, ..., raw=FALSE)
# Check to see if a rename list exists and use it if it does...we also
rep = c()
if(exists("rename",envir=.redisEnv)) rep = get("rename",envir=.redisEnv)
- cat(hdr, file=con)
+# cat(hdr, file=con)
+ .SOCK_SEND(con, hdr)
tryCatch({
for(j in seq_len(n)) {
if(j==1)
@@ -145,9 +168,12 @@ tryCatch({
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
- cat(hdr, file=con)
- writeBin(v, con)
- cat('\r\n', file=con)
+# cat(hdr, file=con)
+# writeBin(v, con)
+# cat('\r\n', file=con)
+ .SOCK_SEND(con, hdr)
+ .SOCK_SEND(con, v)
+ .SOCK_SEND(con, "\r\n")
}
},
error=function(e) {.redisError("Invalid agrument");invisible()},
@@ -159,7 +185,7 @@ interrupt=function(e) .burn(e)
.renameCommand <- function(x, rep)
{
if(is.null(rep)) return(x)
- v = rawToChar(x)
+ v <- rawToChar(x)
if(v %in% names(rep)) return(charToRaw(rep[[v]]))
x
}
@@ -170,7 +196,8 @@ interrupt=function(e) .burn(e)
env <- .redisEnv$current
tryCatch({
con <- .redis()
- l <- readLines(con=con, n=1)
+# l <- readLines(con=con, n=1)
+ l <- .SOCK_GETLINE(con)
if(length(l)==0) .burn("Empty")
l = l[[1]]
tryCatch(
@@ -194,44 +221,21 @@ tryCatch({
if (n < 0) {
return(NULL)
}
- dat <- tryCatch(readBin(con, 'raw', n=n),
+# dat <- tryCatch(readBin(con, 'raw', n=n),
+# error=function(e) .redisError(e$message))
+ dat <- tryCatch(.SOCK_RECV_N(con, N=n),
error=function(e) .redisError(e$message))
m <- length(dat)
if(m==n) {
- l <- readLines(con,n=1) # Trailing \r\n
+# l <- readLines(con,n=1) # Trailing \r\n
+ l <- .SOCK_GETLINE(con) # Trailing \r\n
if(raw)
return(dat)
else
return(tryCatch(unserialize(dat),
error=function(e) rawToChar(dat)))
}
-# The message was not fully recieved in one pass.
-# We allocate a list to hold incremental messages and then concatenate it.
-# This perfromance enhancement was adapted from the Rbig server package,
-# written by Steve Weston and Pat Shields.
- rlen <- 50
- j <- 1
- r <- vector('list',rlen)
- r[j] <- list(dat)
- while(m<n) {
-# Short read; we need to retrieve the rest of this message.
- dat <- tryCatch(readBin(con, 'raw', n=(n-m)),
- error=function (e) .redisError(e$message))
- j <- j + 1
- if(j>rlen) {
- rlen <- 2*rlen
- length(r) <- rlen
- }
- r[j] <- list(dat)
- m <- m + length(dat)
- }
- l <- readLines(con,n=1) # Trailing \r\n
- length(r) <- j
- if(raw)
- do.call(c,r)
- else
- tryCatch(unserialize(do.call(c,r)),
- error=function(e) rawToChar(do.call(c,r)))
+ .burn("Truncated response")
},
'*' = {
numVars <- as.integer(substr(l,2,nchar(l)))

0 comments on commit 1427f5c

Please sign in to comment.