Skip to content

Commit

Permalink
Merge 9fa356e into 3287e18
Browse files Browse the repository at this point in the history
  • Loading branch information
gpayer committed May 2, 2021
2 parents 3287e18 + 9fa356e commit c9b0fe0
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 22 deletions.
3 changes: 2 additions & 1 deletion examples/basic_server/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func main() {
fmt.Println("Couldn't listen: ", err)
}
defer conn.Close()
server.SetConnection(conn)

fmt.Println("### Welcome to go-osc receiver demo")
fmt.Println("Press \"q\" to exit")
Expand All @@ -25,7 +26,7 @@ func main() {
fmt.Println("Start listening on", addr)

for {
packet, err := server.ReceivePacket(conn)
packet, err := server.ReceivePacket()
if err != nil {
fmt.Println("Server error: " + err.Error())
os.Exit(1)
Expand Down
91 changes: 74 additions & 17 deletions osc/osc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ const (
// Packet is the interface for Message and Bundle.
type Packet interface {
encoding.BinaryMarshaler
SenderAddr() net.Addr
}

// Message represents a single OSC message. An OSC message consists of an OSC
// address pattern and zero or more arguments.
type Message struct {
Address string
Arguments []interface{}
Address string
Arguments []interface{}
senderAddr net.Addr
}

// SenderAddr returns the sender address of this message. Returns nil if no address is available
func (m *Message) SenderAddr() net.Addr {
return m.senderAddr
}

// Verify that Messages implements the Packet interface.
Expand All @@ -41,9 +48,25 @@ var _ Packet = (*Message)(nil)
// elements. The OSC-timetag is a 64-bit fixed point time tag. See
// http://opensoundcontrol.org/spec-1_0 for more information.
type Bundle struct {
Timetag Timetag
Messages []*Message
Bundles []*Bundle
Timetag Timetag
Messages []*Message
Bundles []*Bundle
senderAddr net.Addr
}

// SenderAddr returns the sender address of this message. Returns nil if no address is available
func (b *Bundle) SenderAddr() net.Addr {
return b.senderAddr
}

func (b *Bundle) setSenderAddr(src net.Addr) {
b.senderAddr = src
for _, m := range b.Messages {
m.senderAddr = src
}
for _, b := range b.Bundles {
b.setSenderAddr(src)
}
}

// Verify that Bundle implements the Packet interface.
Expand All @@ -63,6 +86,7 @@ type Server struct {
Addr string
Dispatcher Dispatcher
ReadTimeout time.Duration
conn net.PacketConn
close func() error
}

Expand Down Expand Up @@ -536,22 +560,37 @@ func (s *Server) ListenAndServe() error {
s.Dispatcher = NewStandardDispatcher()
}

if s.conn == nil {
s.Listen()
}

return s.Serve()
}

// Listen creates the listening port and sets up the connection.
func (s *Server) Listen() error {
ln, err := net.ListenPacket("udp", s.Addr)
if err != nil {
return err
}
s.conn = ln

s.close = ln.Close
return nil
}

return s.Serve(ln)
// SetConnection sets the connection to use for the server. This is for a case
// where you created the connection manually instead of using ListenAndServer.
func (s *Server) SetConnection(c net.PacketConn) {
s.conn = c
}

// Serve retrieves incoming OSC packets from the given connection and dispatches
// retrieved OSC packets. If something goes wrong an error is returned.
func (s *Server) Serve(c net.PacketConn) error {
func (s *Server) Serve() error {
var tempDelay time.Duration
for {
msg, err := s.readFromConnection(c)
msg, err := s.readFromConnection()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
Expand All @@ -568,6 +607,7 @@ func (s *Server) Serve(c net.PacketConn) error {
return err
}
tempDelay = 0

go s.Dispatcher.Dispatch(msg)
}
}
Expand All @@ -585,44 +625,58 @@ func (s *Server) CloseConnection() error {
}

// ReceivePacket listens for incoming OSC packets and returns the packet if one is received.
func (s *Server) ReceivePacket(c net.PacketConn) (Packet, error) {
return s.readFromConnection(c)
func (s *Server) ReceivePacket() (Packet, error) {
return s.readFromConnection()
}

// readFromConnection retrieves OSC packets.
func (s *Server) readFromConnection(c net.PacketConn) (Packet, error) {
func (s *Server) readFromConnection() (Packet, error) {
if s.ReadTimeout != 0 {
if err := c.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil {
if err := s.conn.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil {
return nil, err
}
}

data := make([]byte, 65535)
n, _, err := c.ReadFrom(data)
n, src, err := s.conn.ReadFrom(data)
if err != nil {
return nil, err
}

var start int
p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n)
p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n, src)
if err != nil {
return nil, err
}
return p, nil
}

// SendTo sends a message to the given address. The sender address will be the address and
// port the server is listening on.
func (s *Server) SendTo(packet Packet, addr net.Addr) error {
data, err := packet.MarshalBinary()
if err != nil {
return err
}

if _, err = s.conn.WriteTo(data, addr); err != nil {
return err
}
return nil
}

// ParsePacket parses the given msg string and returns a Packet
func ParsePacket(msg string) (Packet, error) {
var start int
p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg))
p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg), nil)
if err != nil {
return nil, err
}
return p, nil
}

// receivePacket receives an OSC packet from the given reader.
func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) {
func readPacket(reader *bufio.Reader, start *int, end int, src net.Addr) (Packet, error) {
//var buf []byte
buf, err := reader.Peek(1)
if err != nil {
Expand All @@ -635,13 +689,16 @@ func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) {
if err != nil {
return nil, err
}
packet.senderAddr = src
return packet, nil
}
if buf[0] == '#' { // An OSC bundle starts with a '#'
packet, err := readBundle(reader, start, end)
if err != nil {
return nil, err
}
packet.setSenderAddr(src)

return packet, nil
}

Expand Down Expand Up @@ -681,7 +738,7 @@ func readBundle(reader *bufio.Reader, start *int, end int) (*Bundle, error) {
}
*start += 4

p, err := readPacket(reader, start, end)
p, err := readPacket(reader, start, end, nil)
if err != nil {
return nil, err
}
Expand Down
64 changes: 60 additions & 4 deletions osc/osc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,12 @@ func TestServerMessageReceiving(t *testing.T) {
return
}
defer c.Close()
server.SetConnection(c)

// Start the client
start <- true

packet, err := server.ReceivePacket(c)
packet, err := server.ReceivePacket()
if err != nil {
t.Errorf("server error: %v", err)
return
Expand Down Expand Up @@ -320,10 +321,11 @@ func TestReadTimeout(t *testing.T) {
t.Error(err)
}
defer c.Close()
server.SetConnection(c)

// Start the client
start <- true
p, err := server.ReceivePacket(c)
p, err := server.ReceivePacket()
if err != nil {
t.Errorf("server error: %v", err)
return
Expand All @@ -334,13 +336,13 @@ func TestReadTimeout(t *testing.T) {
}

// Second receive should time out since client is delayed 150 milliseconds
if _, err = server.ReceivePacket(c); err == nil {
if _, err = server.ReceivePacket(); err == nil {
t.Errorf("expected error")
return
}

// Next receive should get it
p, err = server.ReceivePacket(c)
p, err = server.ReceivePacket()
if err != nil {
t.Errorf("server error: %v", err)
return
Expand Down Expand Up @@ -614,6 +616,60 @@ func TestOscMessageMatch(t *testing.T) {
}
}

func TestServerSend(t *testing.T) {
targetServer := Server{
Addr: "127.0.0.1:6677",
}

go func() {
d := NewStandardDispatcher()
d.AddMsgHandler("/message/test", func(msg *Message) {
reply := NewMessage("/reply/test")
err := targetServer.SendTo(reply, msg.SenderAddr())
if err != nil {
t.Errorf("SendTo failed: %v", err)
}
})
targetServer.Dispatcher = d
targetServer.ListenAndServe()
}()

time.Sleep(2 * time.Second)

result := make(chan bool, 1)

d := NewStandardDispatcher()
d.AddMsgHandler("/reply/test", func(msg *Message) {
result <- true
})
clientServer := Server{
Addr: "127.0.0.1:18536",
Dispatcher: d,
}

err := clientServer.Listen()
if err != nil {
t.Errorf("Listen failed: %v", err)
}
go clientServer.Serve()

msg := NewMessage("/message/test")
addr, _ := net.ResolveUDPAddr("udp", targetServer.Addr)
err = clientServer.SendTo(msg, addr)
if err != nil {
t.Errorf("SendTo failed: %v", err)
}

select {
case r := <-result:
if !r {
t.Error("did not get expected response")
}
case <-time.After(2 * time.Second):
t.Error("unexpected timeout")
}
}

const zero = string(byte(0))

// nulls returns a string of `i` nulls.
Expand Down

0 comments on commit c9b0fe0

Please sign in to comment.