Skip to content

Commit

Permalink
initial sample working
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim McGrath authored and Stefan Plantikow committed Oct 1, 2010
1 parent 3b68ea6 commit eceb576
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 48 deletions.
3 changes: 2 additions & 1 deletion Makefile
Expand Up @@ -4,9 +4,10 @@ PKGDIR=$(GOROOT)/pkg/$(GOOS)_$(GOARCH)

TARG=zmq
CGOFILES=zmq.go
CGO_CFLAGS=-I. -I "$(GOROOT)/include"
CGO_CFLAGS=-I. -I "$(GOROOT)/include" -I/usr/local/include
CGO_LDFLAGS=-lzmq
GOFMT=$(GOROOT)/bin/gofmt -tabwidth=2 -spaces=true -tabindent=false -w
#GOFMT=$(GOROOT)/bin/gofmt -w

include $(GOROOT)/src/Make.pkg

Expand Down
4 changes: 2 additions & 2 deletions clsrv.go
Expand Up @@ -14,7 +14,7 @@ func Server(ctx Context, ch chan bool, bchan chan bool, addr string) {

defer func() { ch <- true }()

srv, err := ctx.NewSocket(ZmqP2P)
srv, err := ctx.NewSocket(ZmqPair)
OkIf(err != nil, err)
defer srv.Close()

Expand Down Expand Up @@ -51,7 +51,7 @@ func Client(ctx Context, ch chan bool, tout int, addr string) {

defer func() { ch <- true }()

cl, err := ctx.NewSocket(ZmqP2P)
cl, err := ctx.NewSocket(ZmqPair)
OkIf(err != nil, err)
defer cl.Close()

Expand Down
92 changes: 47 additions & 45 deletions zmq.go
Expand Up @@ -19,24 +19,23 @@ import "C"
// ******** Global ZMQ Constants ********

const (
ZmqPoll = C.ZMQ_POLL
ZmqP2P = C.ZMQ_P2P // About to be renamed as ZMQ_PAIR
ZmqPair = C.ZMQ_P2P
ZmqPub = C.ZMQ_PUB
ZmqSub = C.ZMQ_SUB
ZmqReq = C.ZMQ_REQ
ZmqRep = C.ZMQ_REP
ZmqXReq = C.ZMQ_XREQ
ZmqXRep = C.ZMQ_XREP
ZmqUpstream = C.ZMQ_UPSTREAM
ZmqDownstream = C.ZMQ_DOWNSTREAM
ZmqNoBlock = C.ZMQ_NOBLOCK
ZmqNoFlush = C.ZMQ_NOFLUSH
ZmqPollIn = C.ZMQ_POLLIN
ZmqPollOut = C.ZMQ_POLLOUT
ZmqPollErr = C.ZMQ_POLLERR
ZmqHWM = C.ZMQ_HWM
ZmqLWM = C.ZMQ_LWM
ZmqPair = C.ZMQ_PAIR
ZmqPub = C.ZMQ_PUB
ZmqSub = C.ZMQ_SUB
ZmqReq = C.ZMQ_REQ
ZmqRep = C.ZMQ_REP
ZmqXReq = C.ZMQ_XREQ
ZmqXRep = C.ZMQ_XREP
ZmqUpstream = C.ZMQ_PULL
ZmqDownstream = C.ZMQ_PUSH
ZmqNoBlock = C.ZMQ_NOBLOCK
ZmqSndMore = C.ZMQ_SNDMORE
// ZmqNoFlush = C.ZMQ_NOFLUSH // coming back?
ZmqPollIn = C.ZMQ_POLLIN
ZmqPollOut = C.ZMQ_POLLOUT
ZmqPollErr = C.ZMQ_POLLERR
ZmqHWM = C.ZMQ_HWM
// ZmqLWM = C.ZMQ_LWM
ZmqSwap = C.ZMQ_SWAP
ZmqAffinity = C.ZMQ_AFFINITY
ZmqIdentitiy = C.ZMQ_IDENTITY
Expand All @@ -45,7 +44,7 @@ const (
ZmqUnsubscribe = C.ZMQ_UNSUBSCRIBE
ZmqRecoveryIVL = C.ZMQ_RECOVERY_IVL
ZmqMCastLoop = C.ZMQ_MCAST_LOOP
ZmqSendBuf = C.ZMQ_SNDBUF
ZmqSendBuf = C.ZMQ_SNDBUF // remove dups?
ZmqSndBuf = C.ZMQ_SNDBUF
ZmqRecvBuf = C.ZMQ_RCVBUF
ZmqRcvBuf = C.ZMQ_RCVBUF
Expand All @@ -60,11 +59,11 @@ type Provider interface {
NewContext(initArgs InitArgs) (Context, os.Error)
NewMessage() Message

StartWatch() Watch
// StartWatch() Watch

Version() (major int, minor int, pl int)

Sleep(secs int)
//Sleep(secs int)
}

type Provided interface {
Expand All @@ -73,15 +72,12 @@ type Provided interface {

// Arguments to New Context
type InitArgs struct {
AppThreads int
IoThreads int
Flags int
IoThreads int
}

// Sensible default init args
// AppThreads = EnvGOMAXPROCS(), IoThreads = 2, Flags = 0 /*no polling! */
// Sensible default init args as per most recent zmq docs - use one I/O thread by default.
func DefaultInitArgs() InitArgs {
return InitArgs{AppThreads: EnvGOMAXPROCS(), IoThreads: 2, Flags: 0}
return InitArgs{IoThreads: 1}
}

// Integer value of environment variable GOMAXPROCS if > 1, 1 otherwise
Expand All @@ -108,6 +104,10 @@ type Context interface {
Terminate() os.Error
}

func Crap() {
C.free_mem_coffer(0, 0)
}

type PollItem C.zmq_pollitem_t

type ProcFdFun func(fd int, events int8, revents int8)
Expand Down Expand Up @@ -193,7 +193,7 @@ type Socket interface {

SetPollItem(pi *PollItem, revents int8) os.Error

Flush() os.Error
// Flush() os.Error
}

// Watch interface
Expand Down Expand Up @@ -228,9 +228,7 @@ type lzmqContext uintptr
// with a live referene to the context.
func (p libZmqProvider) NewContext(args InitArgs) (Context, os.Error) {
contextPtr := C.zmq_init(
C.int(args.AppThreads),
C.int(args.IoThreads),
C.int(args.Flags))
C.int(args.IoThreads))

if IsCNullPtr(uintptr(contextPtr)) {
return nil, p.GetError()
Expand All @@ -245,10 +243,10 @@ func (p *libZmqProvider) NewMessage() Message {
return msg
}

func (p *libZmqProvider) StartWatch() Watch {
/*func (p *libZmqProvider) StartWatch() Watch {
watch := uintptr(C.zmq_stopwatch_start())
return lzmqWatch(watch)
}
}*/

// Type of error codes used by LibZmq
//
Expand All @@ -272,12 +270,13 @@ func (p *libZmqProvider) ErrorIf(cond bool) os.Error {
}
return nil
}
func (p *libZmqProvider) Sleep(secs int) {
if secs < 0 {
return
}
C.zmq_sleep(C.int(secs))
}

/*func (p *libZmqProvider) Sleep(secs int) {
if secs < 0 {
return
}
C.zmq_sleep(C.int(secs))
}*/

func (p *libZmqProvider) Version() (major int, minor int, pl int) {
C.zmq_version((*C.int)(unsafe.Pointer(&major)), (*C.int)(unsafe.Pointer(&minor)), (*C.int)(unsafe.Pointer(&pl)))
Expand Down Expand Up @@ -418,7 +417,8 @@ func (p *lzmqMessage) GetData(coffer *PtrCoffer) os.Error {
func (p *lzmqMessage) SetData(coffer *MemCoffer) os.Error {
data := unsafe.Pointer(coffer.GetBaseAddr())
// Unsure if this is correct
return p.Provider().OkIf(C.zmq_msg_init_data((*C.zmq_msg_t)(p), data, C.size_t(coffer.Cap()), &C.free_mem_coffer, unsafe.Pointer(coffer)) == 0)
// return p.Provider().OkIf(C.zmq_msg_init_data((*C.zmq_msg_t)(p), data, C.size_t(coffer.Cap()), &C.free_mem_coffer, unsafe.Pointer(coffer)) == 0)
return p.Provider().OkIf(C.zmq_msg_init_data((*C.zmq_msg_t)(p), data, C.size_t(coffer.Cap()), (*[0]uint8)(CCallableFreeFuncPtr()), unsafe.Pointer(coffer)) == 0)
}

func (p *lzmqMessage) ptr() *C.zmq_msg_t {
Expand Down Expand Up @@ -479,7 +479,7 @@ func (p lzmqSocket) Bind(address string) os.Error {
// Connect client socket
func (p lzmqSocket) Connect(address string) os.Error {
ptr := unsafe.Pointer(p)
// apparantly freed by zmq
// apparently freed by zmq
c_addr := C.CString(address)
return p.Provider().OkIf(C.zmq_connect(ptr, c_addr) == 0)
}
Expand Down Expand Up @@ -516,9 +516,11 @@ func (p lzmqSocket) Send(msg Message, flags int) os.Error {
return ret
}

func (p lzmqSocket) Flush() os.Error {
return p.Provider().OkIf(C.zmq_flush(unsafe.Pointer(p)) == 0)
// keep this
/*func (p lzmqSocket) Flush() os.Error {
return p.Provider().OkIf(C.zmq_flush(unsafe.Pointer(p)) == 0)
}
*/

// Closes this socket
//
Expand All @@ -531,12 +533,12 @@ func (p lzmqSocket) Close() os.Error {

// ******** Watches ********

type lzmqWatch uintptr
/*type lzmqWatch uintptr
func (p lzmqWatch) Stop() uint64 {
return uint64(C.zmq_stopwatch_stop(unsafe.Pointer(p)))
}

*/
//export free_mem_coffer
func freeMemCoffer(base uintptr, hint uintptr) {
((*MemCoffer)(unsafe.Pointer(hint))).Close()
Expand Down

0 comments on commit eceb576

Please sign in to comment.