/
doc.go
216 lines (185 loc) · 10.3 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/*
Summary: rmq passes msgpack2 messages over websockets between Golang and the R statistical language. It is an R package.
# rmq: R Messaging and Queuing
## Or: How to utilize Go libraries from R.
The much anticipated Go 1.5 release brought strong support for building C-style shared libraries (.so files) from Go source code and libraries.
*This is huge*. It opens up many exciting new possibilities. In this project (rmq), we explore using this new capability to extend R with Go libraries.
Package rmq provides messaging based on msgpack and websockets. It demonstrates calling from R into Golang (Go) libraries to extend R with functionality available in Go.
## why msgpack
Msgpack is binary and self-describing. It can be extremely fast to parse. Moreover I don't have to worry about where to get the schema .proto file. The thorny problem of how to *create* new types of objects when I'm inside R just goes away. The data is self-describing, and new structures can be created at run-time.
Msgpack supports a similar forward evolution/backwards compatibility strategy as protobufs. Hence it allows incremental rolling upgrades of large compute clusters using it as a protocol. That was the whole raison d'etre of protobufs. Old code ignores new data fields. New code uses defaults for missing fields when given old data.
Icing on the cake: msgpack (like websocket) is usable from javascript in the browser (unlike most everything else). Because it is very simple, msgpack has massive cross-language support (55 bindings are listed at http://msgpack.org). Overall, msgpack is flexible while being fast, simple and widely supported, making it a great fit for data exchange between interpretted and compiled environments.
## implementation
We use the Go library https://github.com/ugorji/go codec for msgpack encoding and decoding. This is a high performance implementation. We use it in a mode where it only supports the updated msgpack 2 (current) spec. This is critical for interoperability with other compiled languages that distinguish between utf8 strings and binary blobs (otherwise embedded '\0' zeros in blobs cause problems).
For websockets, we use the terrific https://github.com/gorilla/websocket library. As time permits in the future, we may extend more features aiming towards message queuing as well. The gorilla library supports securing your communication with TLS certificates.
##Status
Excellent. Tested on OSX and Linux. Documentation has been written and is available. The package is functionally complete for the RPC over websockets and msgpack based serialization. After interactive usage, I added SIGINT handling so that the web-server can be stopped during development with a simple Ctrl-c at the R console. The client side will be blocked during calls (it does not poll back to R while waiting on the network) but has a configurable timeout (default 5 seconds), that allows easy client-side error handling.
## structure of this repo
This repository is mainly structured as an R package. It is
designed to be built and installed into an R (statistical environment)
installation, using the standard tools for R.
This package doesn't directly create a re-usable go library. Instead
we target a c-shared library (rmq.so) that will install
into R using 'R CMD INSTALL rmq'. See: 'make install' or 'make build' followed by
doing `install.packages('./rmq_1.0.1.tar.gz', repos=NULL)`
from inside R (assuming the package is in your current directory;
if not then adjust the ./ part of the package path).
The code also serves as an example of how to
use golang inside R.
## embedding R in Golang
While RMQ is mainly designed to embed Go under R, it defines functions,
in particular SexpToIface(), that make embedding R in Go quite easy too. See the comments and
example in main() of the central rmq.go file (https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go)
for a demonstration.
*/
package rmq
/*
typedef int SEXP;
*/
import "C"
// FromMsgpack converts a serialized RAW vector of of msgpack2
// encoded bytes into an R object. We use msgpack2 so that there is
// a difference between strings (utf8 encoded) and binary blobs
// which can contain '\0' zeros. The underlying msgpack2 library
// is the awesome https://github.com/ugorji/go/tree/master/codec
// library from Ugorji Nwoke.
func FromMsgpack(s C.SEXP) C.SEXP {
// This is a stub for documentation of API and search purposes.
// See the actually implementation here:
// https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go
return s
}
// ToMsgpack converts an R object into serialized RAW vector
// of msgpack2 encoded bytes. We use msgpack2 so that there is
// a difference between strings (utf8 encoded) and binary blobs
// which can contain '\0' zeros. The underlying msgpack2 library
// is the awesome https://github.com/ugorji/go/tree/master/codec
// library from Ugorji Nwoke.
func ToMsgpack(s C.SEXP) C.SEXP {
// This is a stub for documentation of API and search purposes.
// See the actually implementation here:
// https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go
return s
}
// ListenAndServe is the server part that expects calls from client
// in the form of RmqWebsocketCall() invocations.
// The underlying websocket library is the battle tested
// https://github.com/gorilla/websocket library from the
// Gorilla Web toolkit. http://www.gorillatoolkit.org/
//
// addr_ is a string in "ip:port" format. The server
// will bind this address and port on the local host.
//
// handler_ is an R function that takes a single argument.
// It will be called back each time the server receives
// an incoming message. The returned value of handler
// becomes the reply to the client.
//
// rho_ in an R environment in which the handler_ callback
// will occur. The user-level wrapper rmq.server() provides
// a new environment for every call back by default, so
// most users won't need to worry about rho_.
//
// Return value: this is always R_NilValue.
//
// Semantics: ListenAndServe() will start a new
// webserver everytime it is called. If it exits
// due to a call into R_CheckUserInterrupt()
// or Rf_error(), then a background watchdog goroutine
// will notice the lack of heartbeating after 300ms,
// and will immediately shutdown the listening
// websocket server goroutine. Hence cleanup
// is fairly automatic.
//
// Signal handling:
//
// SIGINT (ctrl-c) is noted by R, and since we
// regularly call R_CheckUserInterrupt(), the
// user can stop the server by pressing ctrl-c
// at the R-console. The go-runtime, as embedded
// in the c-shared library, is not used to being
// embedded yet, and so its (system) signal handling
// facilities (e.g. signal.Notify) should *not* be
// used. We go to great pains to actually preserve
// the signal handling that R sets up and expects,
// and allow the go runtime to see any signals just
// creates heartache and crashes.
//
func ListenAndServe(addr_ C.SEXP, handler_ C.SEXP, rho_ C.SEXP) C.SEXP {
// This is a stub for documentation of API and search purposes.
// See the actually implementation here:
// https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go
return addr_
}
// RmqWebsocketCall() is the client part that talks to
// the server part waiting in ListenAndServe().
// ListenAndServe is the server part that expects calls from client
// in the form of RmqWebsocketCall() invocations.
// The underlying websocket library is the battle tested
// https://github.com/gorilla/websocket library from the
// Gorilla Web toolkit. http://www.gorillatoolkit.org/
//
// addr_ is an "ip:port" string: where to find the server;
// it should match the addr_ the server was started with.
//
// msg_ is the R object to be sent to the server.
//
// timeout_msec_ is a numeric count of milliseconds to
// wait for a reply from the server. Timeouts are the
// only way we handle servers that accept our connect
// and then crash or take too long. Although a timeout
// of 0 will wait forever, this is not recommended.
// SIGINT (ctrl-c) will not interrupt a waiting client,
// so do be sure to give it some sane timeout. The
// default is 5000 msec (5 seconds).
//
func RmqWebsocketCall(addr_ C.SEXP, msg_ C.SEXP, timeout_msec_ C.SEXP) C.SEXP {
// This is a stub for documentation of API and search purposes.
// See the actually implementation here:
// https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go
return addr_
}
// SexpToIface() does the heavy lifting of converting from
// an R value to a Go value. Initially just a subroutine
// of the internal encodeRIntoMsgpack(), it is also useful
// on its own for doing things like embedding R inside Go.
//
// Currently VECSXP, REALSXP, INTSXP, RAWSXP, STRSXP, and
// LGLSXP are supported. In other words, we decode: lists,
// numeric vectors, integer vectors, raw byte vectors,
// string vectors, boolean vectors, and recursively defined
// list elements.
//
// If list elements are named, the named list is turned
// into a map in Go.
func SexpToIface(s C.SEXP) interface{} {
// This is a stub for documentation of API and search purposes.
// See the actually implementation here:
// https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go
return interface{}(nil)
}
// ReadMsgpackFrame reads the msgpack frame at byteOffset in rawStream,
// decodes the 2-5 bytes of a msgpack binary array (either bin8, bin16,
// or bin32), and returns and the decoded-into-R object and the next
// byteOffset to use. This is a helper for dealing with large byte
// streams of msgpack bytes.
func ReadMsgpackFrame(rawStream C.SEXP, byteOffset C.SEXP) C.SEXP {
return rawStream
}
// ReadNewlineDelimJson reads a json object at byteOffset in rawStream, expects
// it to be newline terminated (see http://jsonlines.org/)), and returns the
// decoded-into-R object and the next byteOffset to use (the byte just after
// the terminating newline). This is a helper for dealing with large
// newline delimited streams of JSON text that are stored as raw byte arrays.
func ReadNewlineDelimJson(rawStream C.SEXP, byteOffset C.SEXP) C.SEXP {
return rawStream
}
// DecodeMsgpackBinArrayHeader parses the first 2-5 bytes of a
// msgpack-format serialized binary array and returns the
// headerSize, payloadSize and totalFramesize for the frame
// the starts at p[0]. This is a utility function for decoding
// msgpack objects that are always wrapped in a 2-5 byte binary
// array header.
func DecodeMsgpackBinArrayHeader(p []byte) (headerSize int, payloadSize int, totalFrameSize int, err error) {
return
}