Permalink
Browse files

Prototype new TCP stack

  • Loading branch information...
1 parent a560d65 commit c79532885e464887bd331bbe5b82a6df976420d7 @bwlewis committed Sep 9, 2013
Showing with 86 additions and 79 deletions.
  1. +3 −4 R/controlCMD.R
  2. +2 −2 R/libsock.R
  3. +40 −58 R/redis-internal.R
  4. +38 −12 src/libsock.c
  5. +3 −3 src/libsock.h
View
@@ -25,18 +25,17 @@
`redisConnect` <-
function(host='localhost', port=6379, password=NULL,
- returnRef=FALSE, timeout=2678399L)
+ returnRef=FALSE, nodelay=FALSE)
{
.redisEnv$current <- new.env()
-# con <- socketConnection(host,port,open='a+b',blocking=TRUE,timeout=timeout)
- con <- .openConnection(host=host, port=port)
+ con <- .openConnection(host=host, port=port, nodelay=nodelay)
# Stash state in the redis enivronment describing this connection:
assign('con',con,envir=.redisEnv$current)
assign('host',host,envir=.redisEnv$current)
assign('port',port,envir=.redisEnv$current)
assign('pipeline',FALSE,envir=.redisEnv$current)
- assign('timeout',timeout,envir=.redisEnv$current)
+ assign('nodelay',nodelay,envir=.redisEnv$current)
# Count is for pipelined communication, it keeps track of the number of
# getResponse calls that are pending.
assign('count',0,envir=.redisEnv$current)
View
@@ -51,8 +51,8 @@
.Call('SOCK_NAME', as.integer(socket), PACKAGE='rredis')
}
-.SOCK_CONNECT = function(host, port)
+.SOCK_CONNECT = function(host, port, nodelay=0)
{
.Call('SOCK_CONNECT', as.character(host), as.integer(port),
- PACKAGE='rredis')
+ as.integer(nodelay), PACKAGE='rredis')
}
View
@@ -12,17 +12,16 @@
e$con
}
-.openConnection <- function(host, port, mode="a+b", timeout=0)
+.openConnection <- function(host, port, nodelay=FALSE)
{
-# socketConnection(env$host, env$port,open=mode,
-# blocking=FALSE, timeout=timeout)
-# mode, timeout no longer used
- .SOCK_CONNECT(host, port)
+ stopifnot(typeof(host)=="character")
+ stopifnot(class(port)=="numeric")
+ stopifnot(typeof(nodelay)=="logical")
+ .SOCK_CONNECT(host, port, as.integer(nodelay))
}
.closeConnection <- function(s)
{
-# close(s)
.SOCK_CLOSE(s)
}
@@ -35,9 +34,7 @@
con <- .redis()
.closeConnection(con)
# May stop with an error here on connect fail
-# con <- socketConnection(env$host, env$port,open='a+b',
-# blocking=FALSE, timeout=env$timeout)
- con <- .openConnection(env$host, env$port)
+ con <- .openConnection(env$host, env$port, env$nodelay)
assign('con',con,envir=env)
if(!is.null(e)) print(as.character(e))
stop(msg)
@@ -59,8 +56,6 @@
.burn <- function(e)
{
con <- .redis()
-# while(socketSelect(list(con),timeout=1L))
-# readBin(con, raw(), 1000000L)
.SOCK_RECV(con)
.redisError("Interrupted communincation with Redis",e)
}
@@ -114,7 +109,6 @@ redisCmd <- function(CMD, ..., raw=FALSE)
f <- match.call()
n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='')
-# writeBin(.raw(hdr), con)
.SOCK_SEND(con, .raw(hdr))
tryCatch({
for(j in seq_len(n)) {
@@ -125,9 +119,6 @@ redisCmd <- function(CMD, ..., raw=FALSE)
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
-# writeBin(.raw(hdr), con)
-# writeBin(v, con)
-# writeBin(.raw('\r\n'), con)
.SOCK_SEND(con, .raw(hdr))
.SOCK_SEND(con, v)
.SOCK_SEND(con, .raw("\r\n"))
@@ -157,28 +148,24 @@ redisCmd <- function(CMD, ..., raw=FALSE)
# Check to see if a rename list exists and use it if it does...we also
rep = c()
if(exists("rename",envir=.redisEnv)) rep = get("rename",envir=.redisEnv)
-# cat(hdr, file=con)
.SOCK_SEND(con, hdr)
-tryCatch({
- for(j in seq_len(n)) {
+ tryCatch({
+ for(j in seq_len(n)) {
if(j==1)
v <- .renameCommand(eval(f[[j+1]],envir=sys.frame(-1)), rep)
else
v <- eval(f[[j+1]],envir=sys.frame(-1))
- if(!is.raw(v)) v <- .cerealize(v)
- l <- length(v)
- hdr <- paste('$', as.character(l), '\r\n', sep='')
-# cat(hdr, file=con)
-# writeBin(v, con)
-# cat('\r\n', file=con)
- .SOCK_SEND(con, hdr)
- .SOCK_SEND(con, v)
- .SOCK_SEND(con, "\r\n")
- }
-},
-error=function(e) {.redisError("Invalid agrument");invisible()},
-interrupt=function(e) .burn(e)
-)
+ if(!is.raw(v)) v <- .cerealize(v)
+ l <- length(v)
+ hdr <- paste('$', as.character(l), '\r\n', sep='')
+ .SOCK_SEND(con, hdr)
+ .SOCK_SEND(con, v)
+ .SOCK_SEND(con, "\r\n")
+ }
+ },
+ error=function(e) {.redisError("Invalid agrument");invisible()},
+ interrupt=function(e) .burn(e)
+ )
.getResponse(raw=TRUE)
}
@@ -190,30 +177,27 @@ interrupt=function(e) .burn(e)
x
}
-
.getResponse <- function(raw=FALSE)
{
env <- .redisEnv$current
-tryCatch({
- con <- .redis()
-browser()
-# l <- readLines(con=con, n=1)
- l <- .SOCK_GETLINE(con)
- if(length(l)==0) .burn("Empty")
- l = l[[1]]
- tryCatch(
- env$count <- max(env$count - 1,0),
- error = function(e) assign('count', 0, envir=env)
- )
- s <- substr(l, 1, 1)
- if (nchar(l) < 2) {
- if(s == "+") {
- # '+' is a valid retrun message for at least one cmd (RANDOMKEY)
- return("")
+ tryCatch({
+ con <- .redis()
+ l <- .SOCK_GETLINE(con)
+
+ if(length(l)==0) .burn("Empty")
+ tryCatch(
+ env$count <- max(env$count - 1,0),
+ error = function(e) assign('count', 0, envir=env)
+ )
+ s <- substr(l, 1, 1)
+ if (nchar(l) < 2) {
+ if(s == "+") {
+ # '+' is a valid retrun message for at least one cmd (RANDOMKEY)
+ return("")
+ }
+ .burn("Invalid")
}
- .burn("Invalid")
- }
- switch(s,
+ switch(s,
'-' = stop(substr(l,2,nchar(l))),
'+' = substr(l,2,nchar(l)),
':' = as.numeric(substr(l,2,nchar(l))),
@@ -222,13 +206,10 @@ browser()
if (n < 0) {
return(NULL)
}
-# dat <- tryCatch(readBin(con, 'raw', n=n),
-# error=function(e) .redisError(e$message))
dat <- tryCatch(.SOCK_RECV_N(con, N=n),
error=function(e) .redisError(e$message))
m <- length(dat)
if(m==n) {
-# l <- readLines(con,n=1) # Trailing \r\n
l <- .SOCK_GETLINE(con) # Trailing \r\n
if(raw)
return(dat)
@@ -244,7 +225,8 @@ browser()
replicate(numVars, .getResponse(raw=raw), simplify=FALSE)
} else NULL
},
- stop('Unknown message type'))
-}, interrupt=function(e) .burn(e)
-)
+ stop('Unknown message type')
+ )
+ }, interrupt=function(e) .burn(e)
+ )
}
View
@@ -6,8 +6,10 @@
#else
#include <stdlib.h>
#include <string.h>
+#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h> // TCP_NODELAY
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
@@ -52,12 +54,14 @@ R_unload_rredis(DllInfo * info)
/* tcpconnect
* connect to the specified host and port, returning a socket
+ * If nodelay=1, then disable Nagle, otherwise keep it.
*/
int
-tcpconnect (char *host, int port)
+tcpconnect (char *host, int port, int nodelay)
{
struct hostent *h;
struct sockaddr_in sa;
+ int j;
#ifdef WIN32
SOCKET s;
#else
@@ -96,6 +100,12 @@ tcpconnect (char *host, int port)
}
signal(SIGPIPE, SIG_IGN);
#endif
+ if(nodelay==1)
+ {
+ j = 1;
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &j, sizeof(j));
+ if(j!=1) warning("Unable to set TCP_NODELAY");
+ }
return (int)s;
}
@@ -129,11 +139,12 @@ SEXP SOCK_NAME(SEXP S)
return ScalarInteger(ntohs(sin.sin_port));
}
-SEXP SOCK_CONNECT(SEXP HOST, SEXP PORT)
+SEXP SOCK_CONNECT(SEXP HOST, SEXP PORT, SEXP NAGLE)
{
char *host = (char *)CHAR(STRING_ELT(HOST, 0));
int port = INTEGER(PORT)[0];
- return ScalarInteger(tcpconnect(host, port));
+ int nagle = INTEGER(NAGLE)[0];
+ return ScalarInteger(tcpconnect(host, port, nagle));
}
SEXP SOCK_POLL (SEXP FDS, SEXP TIMEOUT, SEXP EVENTS)
@@ -340,20 +351,24 @@ SEXP SOCK_GETLINE(SEXP S)
#else
int s = INTEGER(S)[0];
#endif
- int bufsize = MBUF;
- buf = (char *)malloc(MBUF);
+ int bufsize = 512;
+ buf = (char *)malloc(512);
if(!buf) return ans;
k = 0;
- pfds.fd = s;
- pfds.events = POLLIN;
- h = poll(&pfds, 1, 50);
- while(h>0)
+// pfds.fd = s;
+// pfds.events = POLLIN;
+// h = poll(&pfds, 1, 50);
+ while(1)
{
j = recv(s, &c, 1, 0);
- if(j<1) break;
+ if(j<1)
+ {
+ if(j<0 && errno != EAGAIN) error("Redis read error");
+ continue;
+ }
buf[k] = c;
k++;
- if(k>2 && buf[k-1]==10 && buf[k-2]==13)
+ if(k>=2 && buf[k-1]==10 && buf[k-2]==13)
{
buf[k-2]=0;
buf[k-1]=0;
@@ -364,7 +379,7 @@ SEXP SOCK_GETLINE(SEXP S)
bufsize = bufsize + MBUF;
buf = (char *)realloc(buf, bufsize);
}
- h = poll(&pfds, 1, 50);
+// h = poll(&pfds, 1, 50);
}
PROTECT(ans=allocVector(STRSXP,1));
SET_STRING_ELT(ans, 0, mkChar(buf));
@@ -407,3 +422,14 @@ SEXP SOCK_RECV_N(SEXP S, SEXP N)
UNPROTECT(1);
return ans;
}
+
+SEXP SOCK_NOOP(SEXP S)
+{
+ SEXP ans = R_NilValue;
+#ifdef WIN32
+ SOCKET s = (SOCKET)INTEGER(S)[0];
+#else
+ int s = INTEGER(S)[0];
+#endif
+ return ans;
+}
View
@@ -2,14 +2,14 @@
#define RXBUF 16384 /* TCP receive buffer */
#define MBUF 1048576 /* Message buffer base size */
-int tcpconnect (char *, int);
+int tcpconnect (char *, int, int);
SEXP SOCK_CLOSE (SEXP);
-SEXP SOCK_CONNECT (SEXP, SEXP);
+SEXP SOCK_CONNECT (SEXP, SEXP, SEXP);
SEXP SOCK_GETLINE(SEXP);
SEXP SOCK_NAME (SEXP);
SEXP SOCK_POLL (SEXP, SEXP, SEXP);
SEXP SOCK_RECV (SEXP, SEXP, SEXP, SEXP);
-SEXP_SOCK_RECV_N(SEXP, SEXP);
+SEXP SOCK_RECV_N(SEXP, SEXP);
SEXP SOCK_SEND (SEXP, SEXP);
#ifdef WIN32

0 comments on commit c795328

Please sign in to comment.