From 9fa356e52e9a989fb7ed901af23bac0e3da363b6 Mon Sep 17 00:00:00 2001 From: Gernot Messow Date: Sun, 2 May 2021 21:54:40 +0200 Subject: [PATCH] Add server SendTo method; necessary changes and new test --- examples/basic_server/basic_server.go | 3 +- osc/osc.go | 91 ++++++++++++++++++++++----- osc/osc_test.go | 64 +++++++++++++++++-- 3 files changed, 136 insertions(+), 22 deletions(-) diff --git a/examples/basic_server/basic_server.go b/examples/basic_server/basic_server.go index 93aad96..8323df7 100644 --- a/examples/basic_server/basic_server.go +++ b/examples/basic_server/basic_server.go @@ -17,6 +17,7 @@ func main() { fmt.Println("Couldn't listen: ", err) } defer conn.Close() + server.SetConnection(conn) fmt.Println("### Welcome to go-osc receiver demo") fmt.Println("Press \"q\" to exit") @@ -25,7 +26,7 @@ func main() { fmt.Println("Start listening on", addr) for { - packet, err := server.ReceivePacket(conn) + packet, err := server.ReceivePacket() if err != nil { fmt.Println("Server error: " + err.Error()) os.Exit(1) diff --git a/osc/osc.go b/osc/osc.go index 7cb6f31..6df758a 100644 --- a/osc/osc.go +++ b/osc/osc.go @@ -24,13 +24,20 @@ const ( // Packet is the interface for Message and Bundle. type Packet interface { encoding.BinaryMarshaler + SenderAddr() net.Addr } // Message represents a single OSC message. An OSC message consists of an OSC // address pattern and zero or more arguments. type Message struct { - Address string - Arguments []interface{} + Address string + Arguments []interface{} + senderAddr net.Addr +} + +// SenderAddr returns the sender address of this message. Returns nil if no address is available +func (m *Message) SenderAddr() net.Addr { + return m.senderAddr } // Verify that Messages implements the Packet interface. @@ -41,9 +48,25 @@ var _ Packet = (*Message)(nil) // elements. The OSC-timetag is a 64-bit fixed point time tag. See // http://opensoundcontrol.org/spec-1_0 for more information. type Bundle struct { - Timetag Timetag - Messages []*Message - Bundles []*Bundle + Timetag Timetag + Messages []*Message + Bundles []*Bundle + senderAddr net.Addr +} + +// SenderAddr returns the sender address of this message. Returns nil if no address is available +func (b *Bundle) SenderAddr() net.Addr { + return b.senderAddr +} + +func (b *Bundle) setSenderAddr(src net.Addr) { + b.senderAddr = src + for _, m := range b.Messages { + m.senderAddr = src + } + for _, b := range b.Bundles { + b.setSenderAddr(src) + } } // Verify that Bundle implements the Packet interface. @@ -63,6 +86,7 @@ type Server struct { Addr string Dispatcher Dispatcher ReadTimeout time.Duration + conn net.PacketConn close func() error } @@ -536,22 +560,37 @@ func (s *Server) ListenAndServe() error { s.Dispatcher = NewStandardDispatcher() } + if s.conn == nil { + s.Listen() + } + + return s.Serve() +} + +// Listen creates the listening port and sets up the connection. +func (s *Server) Listen() error { ln, err := net.ListenPacket("udp", s.Addr) if err != nil { return err } + s.conn = ln s.close = ln.Close + return nil +} - return s.Serve(ln) +// SetConnection sets the connection to use for the server. This is for a case +// where you created the connection manually instead of using ListenAndServer. +func (s *Server) SetConnection(c net.PacketConn) { + s.conn = c } // Serve retrieves incoming OSC packets from the given connection and dispatches // retrieved OSC packets. If something goes wrong an error is returned. -func (s *Server) Serve(c net.PacketConn) error { +func (s *Server) Serve() error { var tempDelay time.Duration for { - msg, err := s.readFromConnection(c) + msg, err := s.readFromConnection() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { @@ -568,6 +607,7 @@ func (s *Server) Serve(c net.PacketConn) error { return err } tempDelay = 0 + go s.Dispatcher.Dispatch(msg) } } @@ -585,36 +625,50 @@ func (s *Server) CloseConnection() error { } // ReceivePacket listens for incoming OSC packets and returns the packet if one is received. -func (s *Server) ReceivePacket(c net.PacketConn) (Packet, error) { - return s.readFromConnection(c) +func (s *Server) ReceivePacket() (Packet, error) { + return s.readFromConnection() } // readFromConnection retrieves OSC packets. -func (s *Server) readFromConnection(c net.PacketConn) (Packet, error) { +func (s *Server) readFromConnection() (Packet, error) { if s.ReadTimeout != 0 { - if err := c.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil { + if err := s.conn.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil { return nil, err } } data := make([]byte, 65535) - n, _, err := c.ReadFrom(data) + n, src, err := s.conn.ReadFrom(data) if err != nil { return nil, err } var start int - p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n) + p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n, src) if err != nil { return nil, err } return p, nil } +// SendTo sends a message to the given address. The sender address will be the address and +// port the server is listening on. +func (s *Server) SendTo(packet Packet, addr net.Addr) error { + data, err := packet.MarshalBinary() + if err != nil { + return err + } + + if _, err = s.conn.WriteTo(data, addr); err != nil { + return err + } + return nil +} + // ParsePacket parses the given msg string and returns a Packet func ParsePacket(msg string) (Packet, error) { var start int - p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg)) + p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg), nil) if err != nil { return nil, err } @@ -622,7 +676,7 @@ func ParsePacket(msg string) (Packet, error) { } // receivePacket receives an OSC packet from the given reader. -func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { +func readPacket(reader *bufio.Reader, start *int, end int, src net.Addr) (Packet, error) { //var buf []byte buf, err := reader.Peek(1) if err != nil { @@ -635,6 +689,7 @@ func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { if err != nil { return nil, err } + packet.senderAddr = src return packet, nil } if buf[0] == '#' { // An OSC bundle starts with a '#' @@ -642,6 +697,8 @@ func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { if err != nil { return nil, err } + packet.setSenderAddr(src) + return packet, nil } @@ -681,7 +738,7 @@ func readBundle(reader *bufio.Reader, start *int, end int) (*Bundle, error) { } *start += 4 - p, err := readPacket(reader, start, end) + p, err := readPacket(reader, start, end, nil) if err != nil { return nil, err } diff --git a/osc/osc_test.go b/osc/osc_test.go index 80bf35b..bacc785 100644 --- a/osc/osc_test.go +++ b/osc/osc_test.go @@ -223,11 +223,12 @@ func TestServerMessageReceiving(t *testing.T) { return } defer c.Close() + server.SetConnection(c) // Start the client start <- true - packet, err := server.ReceivePacket(c) + packet, err := server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -320,10 +321,11 @@ func TestReadTimeout(t *testing.T) { t.Error(err) } defer c.Close() + server.SetConnection(c) // Start the client start <- true - p, err := server.ReceivePacket(c) + p, err := server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -334,13 +336,13 @@ func TestReadTimeout(t *testing.T) { } // Second receive should time out since client is delayed 150 milliseconds - if _, err = server.ReceivePacket(c); err == nil { + if _, err = server.ReceivePacket(); err == nil { t.Errorf("expected error") return } // Next receive should get it - p, err = server.ReceivePacket(c) + p, err = server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -614,6 +616,60 @@ func TestOscMessageMatch(t *testing.T) { } } +func TestServerSend(t *testing.T) { + targetServer := Server{ + Addr: "127.0.0.1:6677", + } + + go func() { + d := NewStandardDispatcher() + d.AddMsgHandler("/message/test", func(msg *Message) { + reply := NewMessage("/reply/test") + err := targetServer.SendTo(reply, msg.SenderAddr()) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + }) + targetServer.Dispatcher = d + targetServer.ListenAndServe() + }() + + time.Sleep(2 * time.Second) + + result := make(chan bool, 1) + + d := NewStandardDispatcher() + d.AddMsgHandler("/reply/test", func(msg *Message) { + result <- true + }) + clientServer := Server{ + Addr: "127.0.0.1:18536", + Dispatcher: d, + } + + err := clientServer.Listen() + if err != nil { + t.Errorf("Listen failed: %v", err) + } + go clientServer.Serve() + + msg := NewMessage("/message/test") + addr, _ := net.ResolveUDPAddr("udp", targetServer.Addr) + err = clientServer.SendTo(msg, addr) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + + select { + case r := <-result: + if !r { + t.Error("did not get expected response") + } + case <-time.After(2 * time.Second): + t.Error("unexpected timeout") + } +} + const zero = string(byte(0)) // nulls returns a string of `i` nulls.