Permalink
Browse files

About to change to new socket library

  • Loading branch information...
1 parent 065724e commit 5586ac8cd52a4f6b6a8c6be7c7b5d0af3417e964 @bwlewis committed Sep 7, 2013
Showing with 620 additions and 101 deletions.
  1. +1 −0 NAMESPACE
  2. +26 −11 R/controlCMD.R
  3. +58 −0 R/libsock.R
  4. +89 −90 R/redis-internal.R
  5. +407 −0 src/libsock.c
  6. +39 −0 src/libsock.h
  7. BIN src/libsock.o
  8. BIN src/rredis.so
View
@@ -1 +1,2 @@
+useDynLib(rredis)
exportPattern("^[[:alpha:]]+")
View
@@ -1,6 +1,6 @@
# This file contains various control functions.
-# Basic response handler, only really useful in nonblocking cases
+# Basic response handler, only really useful in pipelined cases
# all function argument is left in for backward compatibility,
# it is not used.
`redisGetResponse` <- function(all=TRUE)
@@ -10,29 +10,43 @@
replicate(.redisEnv$current$count, .getResponse(), simplify=FALSE)
}
+# Maintained for compatability.
`redisSetBlocking` <- function(value=TRUE)
{
+ warning("redisSetBlocking is deprecated. Use redisSetPipeline instead.")
+ redisSetPipeline(!value)
+}
+`redisSetPipeline` <- function(value=FALSE)
+{
value <- as.logical(value)
if(is.na(value)) stop("logical value required")
- assign('block',value,envir=.redisEnv$current)
+ assign('pipeline',value,envir=.redisEnv$current)
}
`redisConnect` <-
-function(host='localhost', port=6379, returnRef=FALSE, timeout=2678399L, password=NULL)
+function(host='localhost', port=6379, password=NULL,
+ returnRef=FALSE, timeout=2678399L)
{
.redisEnv$current <- new.env()
-# R nonblocking connections are flaky, especially on Windows, see
+# WARNING R nonblocking connections are flaky, especially on Windows, see
# for example:
# http://www.mail-archive.com/r-devel@r-project.org/msg16420.html.
-# So, we use blocking connections now.
- con <- socketConnection(host, port, open='a+b', blocking=TRUE, timeout=timeout)
+# So, we use only blocking connections.
+#
+# We track the file descriptor of the new connection in a sneaky way
+ fds <- rownames(showConnections(all=TRUE))
+ con <- socketConnection(host,port,open='a+b',blocking=TRUE,timeout=timeout)
+ fd <- rownames(showConnections(all=TRUE))
+ fd <- as.integer(setdiff(fd,fds))
+
# Stash state in the redis enivronment describing this connection:
assign('con',con,envir=.redisEnv$current)
+ assign('fd',fd,envir=.redisEnv$current)
assign('host',host,envir=.redisEnv$current)
assign('port',port,envir=.redisEnv$current)
- assign('block',TRUE,envir=.redisEnv$current)
+ assign('pipeline',FALSE,envir=.redisEnv$current)
assign('timeout',timeout,envir=.redisEnv$current)
-# Count is for nonblocking communication, it keeps track of the number of
+# Count is for pipelined communication, it keeps track of the number of
# getResponse calls that are pending.
assign('count',0,envir=.redisEnv$current)
if (!is.null(password)) tryCatch(redisAuth(password),
@@ -52,11 +66,12 @@ function(host='localhost', port=6379, returnRef=FALSE, timeout=2678399L, passwor
}
`redisClose` <-
-function()
+function(e)
{
- con <- .redis()
+ if(missing(e)) e = .redisEnv$current
+ con <- .redis(e)
close(con)
- remove(list='con',envir=.redisEnv$current)
+ remove(list='con',envir=e)
}
`redisAuth` <-
View
@@ -0,0 +1,58 @@
+# Minimalist socket functions
+.SOCK_POLLIN = 1L
+.SOCK_POLLPRI = 2L
+.SOCK_POLLOUT = 4L
+
+# Return vector of sockets matching requested events. Negative
+# socket numbers in returned vector indicates sockets with error
+# conditions.
+.SOCK_POLL = function(fds, timeout=1000L, events=.SOCK_POLLIN)
+{
+ x = .Call('SOCK_POLL', as.integer(fds), as.integer(timeout), as.integer(events), PACKAGE='rredis')
+ c(fds[x == events], -fds[x>4])
+}
+
+.SOCK_CLOSE = function(socket)
+{
+ .Call('SOCK_CLOSE', as.integer(socket), PACKAGE='rredis')
+}
+
+.SOCK_RECV = function(socket, external_pointer=FALSE, buf_size=8192, max_buffer_size=2^24)
+{
+ .Call('SOCK_RECV', as.integer(socket), as.integer(external_pointer), as.integer(buf_size), as.numeric(max_buffer_size), PACKAGE='rredis')
+}
+
+.SOCK_GETLINE = function(socket)
+{
+ .Call('SOCK_GETLINE', as.integer(socket), PACKAGE='rredis')
+}
+
+.SOCK_RECV_N = function(socket, N)
+{
+ .Call('SOCK_RECV_HTTP_HEAD', as.integer(socket), as.integer(N), PACKAGE='rredis')
+}
+
+# We trap the possibility of a SIGPIPE signal error during SOCK_SEND.
+.SOCK_SEND = function(socket, msg)
+{
+ if(is.raw(msg)) return(
+ tryCatch(.Call('SOCK_SEND', socket, msg, PACKAGE='rredis'),
+ error=function(e) -1, interrupt=function(e) -1))
+ if(is.character(msg))
+ return(
+ tryCatch(
+ .Call('SOCK_SEND', socket, charToRaw(msg), PACKAGE='rredis'),
+ error=function(e) -1, interrupt=function(e) -1))
+ stop("msg must be of data type 'Raw'")
+}
+
+.SOCK_GETSOCKNAME = function(socket)
+{
+ .Call('SOCK_NAME', as.integer(socket), PACKAGE='rredis')
+}
+
+.SOCK_CONNECT = function(host, port)
+{
+ .Call('SOCK_CONNECT', as.character(host), as.integer(port),
+ PACKAGE='rredis')
+}
View
@@ -4,11 +4,12 @@
.redisEnv <- new.env()
.redisEnv$current <- .redisEnv
-.redis <- function()
+.redis <- function(e)
{
- if(!exists('con',envir=.redisEnv$current))
+ if(missing(e)) e = .redisEnv$current
+ if(!exists('con',envir=e))
stop('Not connected, try using redisConnect()')
- .redisEnv$current$con
+ e$con
}
# .redisError may be called by any function when a serious error occurs.
@@ -20,7 +21,8 @@
con <- .redis()
close(con)
# May stop with an error here on connect fail
- con <- socketConnection(env$host, env$port,open='a+b', blocking=TRUE, timeout=env$timeout)
+ con <- socketConnection(env$host, env$port,open='a+b',
+ blocking=FALSE, timeout=env$timeout)
assign('con',con,envir=env)
if(!is.null(e)) print(as.character(e))
stop(msg)
@@ -47,89 +49,6 @@
.redisError("Interrupted communincation with Redis",e)
}
-.getResponse <- function(raw=FALSE)
-{
- env <- .redisEnv$current
-tryCatch({
- con <- .redis()
-# socketSelect(list(con), timeout=10L)
- l <- readLines(con=con, n=1)
- if(length(l)==0) .burn("Empty")
- tryCatch(
- env$count <- max(env$count - 1,0),
- error = function(e) assign('count', 0, envir=env)
- )
- s <- substr(l, 1, 1)
- if (nchar(l) < 2) {
- if(s == '+') {
- # '+' is a valid retrun message on at least one cmd (RANDOMKEY)
- return('')
- }
- .burn("Invalid")
- }
- switch(s,
- '-' = stop(substr(l,2,nchar(l))),
- '+' = substr(l,2,nchar(l)),
- ':' = as.numeric(substr(l,2,nchar(l))),
- '$' = {
- n <- as.numeric(substr(l,2,nchar(l)))
- if (n < 0) {
- return(NULL)
- }
-# socketSelect(list(con),timeout=10L)
- dat <- tryCatch(readBin(con, 'raw', n=n),
- error=function(e) .redisError(e$message))
- m <- length(dat)
- if(m==n) {
-# socketSelect(list(con),timeout=10L)
- l <- readLines(con,n=1) # Trailing \r\n
- if(raw)
- return(dat)
- else
- return(tryCatch(unserialize(dat),
- error=function(e) rawToChar(dat)))
- }
-# The message was not fully recieved in one pass.
-# We allocate a list to hold incremental messages and then concatenate it.
-# This perfromance enhancement was adapted from the Rbig server package,
-# written by Steve Weston and Pat Shields.
- rlen <- 50
- j <- 1
- r <- vector('list',rlen)
- r[j] <- list(dat)
- while(m<n) {
-# Short read; we need to retrieve the rest of this message.
-# socketSelect(list(con),timeout=10L)
- dat <- tryCatch(readBin(con, 'raw', n=(n-m)),
- error=function (e) .redisError(e$message))
- j <- j + 1
- if(j>rlen) {
- rlen <- 2*rlen
- length(r) <- rlen
- }
- r[j] <- list(dat)
- m <- m + length(dat)
- }
-# socketSelect(list(con),timeout=10L)
- l <- readLines(con,n=1) # Trailing \r\n
- length(r) <- j
- if(raw)
- do.call(c,r)
- else
- tryCatch(unserialize(do.call(c,r)),
- error=function(e) rawToChar(do.call(c,r)))
- },
- '*' = {
- numVars <- as.integer(substr(l,2,nchar(l)))
- if(numVars > 0L) {
- replicate(numVars, .getResponse(raw=raw), simplify=FALSE)
- } else NULL
- },
- stop('Unknown message type'))
-}, interrupt=function(e) .burn(e)
-)
-}
-
#
# .raw is just a shorthand wrapper for charToRaw:
#
@@ -196,9 +115,9 @@ redisCmd <- function(CMD, ..., raw=FALSE)
interrupt=function(e) .burn(e)
)
- block <- TRUE
- if(exists('block',envir=env)) block <- get('block',envir=env)
- if(block)
+ pipeline <- FALSE
+ if(exists('pipeline',envir=env)) pipeline <- get('pipeline',envir=env)
+ if(!pipeline)
return(.getResponse())
tryCatch(
env$count <- env$count + 1,
@@ -244,3 +163,83 @@ interrupt=function(e) .burn(e)
if(v %in% names(rep)) return(charToRaw(rep[[v]]))
x
}
+
+
+.getResponse <- function(raw=FALSE)
+{
+ env <- .redisEnv$current
+tryCatch({
+ con <- .redis()
+ l <- readLines(con=con, n=1)
+ if(length(l)==0) .burn("Empty")
+ l = l[[1]]
+ tryCatch(
+ env$count <- max(env$count - 1,0),
+ error = function(e) assign('count', 0, envir=env)
+ )
+ s <- substr(l, 1, 1)
+ if (nchar(l) < 2) {
+ if(s == "+") {
+ # '+' is a valid retrun message for at least one cmd (RANDOMKEY)
+ return("")
+ }
+ .burn("Invalid")
+ }
+ switch(s,
+ '-' = stop(substr(l,2,nchar(l))),
+ '+' = substr(l,2,nchar(l)),
+ ':' = as.numeric(substr(l,2,nchar(l))),
+ '$' = {
+ n <- as.numeric(substr(l,2,nchar(l)))
+ if (n < 0) {
+ return(NULL)
+ }
+ dat <- tryCatch(readBin(con, 'raw', n=n),
+ error=function(e) .redisError(e$message))
+ m <- length(dat)
+ if(m==n) {
+ l <- readLines(con,n=1) # Trailing \r\n
+ if(raw)
+ return(dat)
+ else
+ return(tryCatch(unserialize(dat),
+ error=function(e) rawToChar(dat)))
+ }
+# The message was not fully recieved in one pass.
+# We allocate a list to hold incremental messages and then concatenate it.
+# This perfromance enhancement was adapted from the Rbig server package,
+# written by Steve Weston and Pat Shields.
+ rlen <- 50
+ j <- 1
+ r <- vector('list',rlen)
+ r[j] <- list(dat)
+ while(m<n) {
+# Short read; we need to retrieve the rest of this message.
+ dat <- tryCatch(readBin(con, 'raw', n=(n-m)),
+ error=function (e) .redisError(e$message))
+ j <- j + 1
+ if(j>rlen) {
+ rlen <- 2*rlen
+ length(r) <- rlen
+ }
+ r[j] <- list(dat)
+ m <- m + length(dat)
+ }
+ l <- readLines(con,n=1) # Trailing \r\n
+ length(r) <- j
+ if(raw)
+ do.call(c,r)
+ else
+ tryCatch(unserialize(do.call(c,r)),
+ error=function(e) rawToChar(do.call(c,r)))
+ },
+ '*' = {
+ numVars <- as.integer(substr(l,2,nchar(l)))
+ if(numVars > 0L) {
+ replicate(numVars, .getResponse(raw=raw), simplify=FALSE)
+ } else NULL
+ },
+ stop('Unknown message type'))
+}, interrupt=function(e) .burn(e)
+)
+}
Oops, something went wrong.

0 comments on commit 5586ac8

Please sign in to comment.