From e3737c9a423051b85655a9e967295e829370c583 Mon Sep 17 00:00:00 2001 From: Raphael Tiersch Date: Wed, 5 Sep 2018 12:57:02 +0200 Subject: [PATCH] First dirty fix. There are still a few methods I want to look into --- conn.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/conn.go b/conn.go index 2f7dfc5..a7d407c 100644 --- a/conn.go +++ b/conn.go @@ -28,6 +28,7 @@ type Conn struct { readTimeout time.Duration writeTimeout time.Duration closed bool + closeMutex *sync.Mutex options *connOptions } @@ -68,7 +69,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) writer := frame.NewWriter(conn) c := &Conn{ - conn: conn, + conn: conn, + closeMutex: &sync.Mutex{}, } options, err := newConnOptions(c, opts) @@ -296,6 +298,8 @@ func processLoop(c *Conn, writer *frame.Writer) { close(ch) } + c.closeMutex.Lock() + defer c.closeMutex.Unlock() c.closed = true c.conn.Close() @@ -366,6 +370,8 @@ func sendError(m map[string]chan *frame.Frame, err error) { // with the STOMP server is closed and any further attempt to write // to the server will fail. func (c *Conn) Disconnect() error { + c.closeMutex.Lock() + defer c.closeMutex.Unlock() if c.closed { return nil } @@ -389,6 +395,8 @@ func (c *Conn) Disconnect() error { // This method should be used only as last resort when there are fatal // network errors that prevent to do a proper disconnect from the server. func (c *Conn) MustDisconnect() error { + c.closeMutex.Lock() + defer c.closeMutex.Unlock() if c.closed { return nil } @@ -410,6 +418,8 @@ func (c *Conn) MustDisconnect() error { // Any number of options can be specified in opts. See the examples for usage. Options include whether // to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries. func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error { + c.closeMutex.Lock() + defer c.closeMutex.Unlock() if c.closed { return ErrAlreadyClosed } @@ -551,10 +561,13 @@ func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Fr } go sub.readLoop(ch) + // TODO is this safe? There is no check if writeCh is actually open. c.writeCh <- request return sub, nil } +// TODO check further for race conditions + // Ack acknowledges a message received from the STOMP server. // If the message was received on a subscription with AckMode == AckAuto, // then no operation is performed.