From e22b728a9a7a54420d12c3eb96e9fcd9bd1558fb Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 17 Oct 2025 12:12:30 +0200 Subject: [PATCH 1/6] Added UDP Network API --- network-api/network-api.go | 148 ++++++++++++++++++++++++++++++++ network-api/network-api_test.go | 104 +++++++++++++++++++++- 2 files changed, 251 insertions(+), 1 deletion(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 3411ce6..11e5886 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -46,11 +46,16 @@ func Register(router *msgpackrouter.Router) { _ = router.RegisterMethod("tcp/connectSSL", tcpConnectSSL) + _ = router.RegisterMethod("udp/connect", udpConnect) + _ = router.RegisterMethod("udp/write", udpWrite) + _ = router.RegisterMethod("udp/read", udpRead) + _ = router.RegisterMethod("udp/close", udpClose) } var lock sync.RWMutex var liveConnections = make(map[uint]net.Conn) var liveListeners = make(map[uint]net.Listener) +var liveUdpConnections = make(map[uint]net.PacketConn) var nextConnectionID atomic.Uint32 // takeLockAndGenerateNextID generates a new unique ID for a connection or listener. @@ -328,3 +333,146 @@ func tcpConnectSSL(ctx context.Context, rpc *msgpackrpc.Connection, params []any unlock() return id, nil } + +func udpConnect(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 2 { + return nil, []any{1, "Invalid number of parameters, expected server address and port"} + } + serverAddr, ok := params[0].(string) + if !ok { + return nil, []any{1, "Invalid parameter type, expected string for server address"} + } + serverPort, ok := msgpackrpc.ToUint(params[1]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint16 for server port"} + } + + serverAddr = net.JoinHostPort(serverAddr, strconv.Itoa(int(serverPort))) + udpAddr, err := net.ResolveUDPAddr("udp", serverAddr) + if err != nil { + return nil, []any{2, "Failed to resolve UDP address: " + err.Error()} + } + udpConn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, []any{2, "Failed to connect to server: " + err.Error()} + } + + // Successfully opened UDP channel + + id, unlock := takeLockAndGenerateNextID() + liveUdpConnections[id] = udpConn + unlock() + return id, nil +} + +func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 4 { + return nil, []any{1, "Invalid number of parameters, expected udpConnId, dest address, dest port, payload"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected int for UDP connection ID"} + } + targetIP, ok := params[1].(string) + if !ok { + return nil, []any{1, "Invalid parameter type, expected string for server address"} + } + targetPort, ok := msgpackrpc.ToUint(params[2]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint16 for server port"} + } + data, ok := params[3].([]byte) + if !ok { + if dataStr, ok := params[3].(string); ok { + data = []byte(dataStr) + } else { + // If data is not []byte or string, return an error + return nil, []any{1, "Invalid parameter type, expected []byte or string for data to write"} + } + } + + lock.RLock() + udpConn, ok := liveUdpConnections[id] + lock.RUnlock() + if !ok { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + + targetAddr := net.JoinHostPort(targetIP, strconv.Itoa(int(targetPort))) + addr, err := net.ResolveUDPAddr("udp", targetAddr) // TODO: This is inefficient, implement some caching + if err != nil { + return nil, []any{3, "Failed to resolve target address: " + err.Error()} + } + if n, err := udpConn.WriteTo(data, addr); err != nil { + return nil, []any{4, "Failed to write to UDP connection: " + err.Error()} + } else { + return n, nil + } +} + +func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 2 { + return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read)"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint for UDP connection ID"} + } + lock.RLock() + udpConn, ok := liveUdpConnections[id] + lock.RUnlock() + if !ok { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + maxBytes, ok := msgpackrpc.ToUint(params[1]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint for max bytes to read"} + } + + buffer := make([]byte, maxBytes) + + n, addr, err := udpConn.ReadFrom(buffer) + if err != nil { + return nil, []any{3, "Failed to read from UDP connection: " + err.Error()} + } + host, portStr, err := net.SplitHostPort(addr.String()) + if err != nil { + // Should never fail, but... + return nil, []any{4, "Failed to parse source address: " + err.Error()} + } + port, err := strconv.Atoi(portStr) + if err != nil { + // Should never fail, but... + return nil, []any{4, "Failed to parse source address: " + err.Error()} + } + return []any{buffer[:n], host, port}, nil +} + +func udpClose(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 1 { + return nil, []any{1, "Invalid number of parameters, expected UDP connection ID"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected int for UDP connection ID"} + } + + lock.Lock() + udpConn, existsConn := liveUdpConnections[id] + if existsConn { + delete(liveUdpConnections, id) + } + lock.Unlock() + + if !existsConn { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + + // Close the connection if it exists + // We do not return an error to the caller if the close operation fails, as it is not critical, + // but we only log the error for debugging purposes. + if err := udpConn.Close(); err != nil { + return err.Error(), nil + } + return "", nil +} diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index 41166ff..990997d 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -156,7 +156,7 @@ const testCert = "-----BEGIN CERTIFICATE-----\n" + "HAIgNadMPgxv01dy59kCgzehgKzmKdTF0rG1SniYqnkLqPA=\n" + "-----END CERTIFICATE-----\n" -func TestNetworkAPI(t *testing.T) { +func TestTCPNetworkAPI(t *testing.T) { ctx := t.Context() var rpc *msgpackrpc.Connection listID, err := tcpListen(ctx, rpc, []any{"localhost", 9999}) @@ -236,3 +236,105 @@ func TestNetworkAPI(t *testing.T) { wg.Wait() } + +func TestUDPNetworkAPI(t *testing.T) { + ctx := t.Context() + conn1, err := udpConnect(ctx, nil, []any{"0.0.0.0", 9800}) + require.Nil(t, err) + + conn2, err := udpConnect(ctx, nil, []any{"0.0.0.0", 9900}) + require.Nil(t, err) + require.NotEqual(t, conn1, conn2) + + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Hello")}) + require.Nil(t, err) + require.Equal(t, 5, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []any{[]uint8("Hello"), "127.0.0.1", 9800}, res) + } + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) + require.Nil(t, err) + require.Equal(t, 3, res) + } + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Two")}) + require.Nil(t, err) + require.Equal(t, 3, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []any{[]uint8("One"), "127.0.0.1", 9800}, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []any{[]uint8("Two"), "127.0.0.1", 9800}, res) + } + { + res, err := udpClose(ctx, nil, []any{conn1}) + require.Nil(t, err) + require.Equal(t, "", res) + } + { + res, err := udpClose(ctx, nil, []any{conn2}) + require.Nil(t, err) + require.Equal(t, "", res) + } +} + +func TestUDPNetworkUnboundClientAPI(t *testing.T) { + ctx := t.Context() + conn1, err := udpConnect(ctx, nil, []any{"", 0}) + require.Nil(t, err) + + conn2, err := udpConnect(ctx, nil, []any{"0.0.0.0", 9900}) + require.Nil(t, err) + require.NotEqual(t, conn1, conn2) + + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Hello")}) + require.Nil(t, err) + require.Equal(t, 5, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("Hello"), res.([]any)[0]) + } + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) + require.Nil(t, err) + require.Equal(t, 3, res) + } + { + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Two")}) + require.Nil(t, err) + require.Equal(t, 3, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("One"), res.([]any)[0]) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("Two"), res.([]any)[0]) + } + { + res, err := udpClose(ctx, nil, []any{conn1}) + require.Nil(t, err) + require.Equal(t, "", res) + } + { + res, err := udpClose(ctx, nil, []any{conn2}) + require.Nil(t, err) + require.Equal(t, "", res) + } +} From 26a05f277dc884a1d19d08d22fbcddd3ea41ea1c Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 21 Oct 2025 12:55:19 +0200 Subject: [PATCH 2/6] Fix linter warnings --- network-api/network-api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 11e5886..8ed24dc 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -347,7 +347,7 @@ func udpConnect(ctx context.Context, rpc *msgpackrpc.Connection, params []any) ( return nil, []any{1, "Invalid parameter type, expected uint16 for server port"} } - serverAddr = net.JoinHostPort(serverAddr, strconv.Itoa(int(serverPort))) + serverAddr = net.JoinHostPort(serverAddr, fmt.Sprintf("%d", serverPort)) udpAddr, err := net.ResolveUDPAddr("udp", serverAddr) if err != nil { return nil, []any{2, "Failed to resolve UDP address: " + err.Error()} @@ -398,7 +398,7 @@ func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} } - targetAddr := net.JoinHostPort(targetIP, strconv.Itoa(int(targetPort))) + targetAddr := net.JoinHostPort(targetIP, fmt.Sprintf("%d", targetPort)) addr, err := net.ResolveUDPAddr("udp", targetAddr) // TODO: This is inefficient, implement some caching if err != nil { return nil, []any{3, "Failed to resolve target address: " + err.Error()} From 869c0d896cb85d695335b9e539f8ac41d2bf827d Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 24 Oct 2025 17:45:10 +0200 Subject: [PATCH 3/6] Added timeouts support --- network-api/network-api.go | 44 ++++++++++++++++++++++++++------- network-api/network-api_test.go | 22 +++++++++++++++++ 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 8ed24dc..e55aaed 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -218,8 +218,8 @@ func tcpCloseListener(ctx context.Context, rpc *msgpackrpc.Connection, params [] } func tcpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { - if len(params) != 2 { - return nil, []any{1, "Invalid number of parameters, expected (connection ID, max bytes to read)"} + if len(params) != 2 && len(params) != 3 { + return nil, []any{1, "Invalid number of parameters, expected (connection ID, max bytes to read[, optional timeout in ms])"} } id, ok := msgpackrpc.ToUint(params[0]) if !ok { @@ -235,12 +235,22 @@ func tcpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re if !ok { return nil, []any{1, "Invalid parameter type, expected int for max bytes to read"} } + var deadline time.Time // default value == no timeout + if len(params) == 2 { + // It seems that there is no way to set a 0 ms timeout (immediate return) on a TCP connection. + // Setting the read deadline to time.Now() will always returns an empty (zero bytes) + // read, so we set it by default to a very short duration in the future (1 ms). + deadline = time.Now().Add(time.Millisecond) + } else if ms, ok := msgpackrpc.ToInt(params[2]); !ok { + return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} + } else if ms > 0 { + deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) + } else if ms == 0 { + // No timeout + } buffer := make([]byte, maxBytes) - // It seems that the only way to make a non-blocking read is to set a read deadline. - // BTW setting the read deadline to time.Now() will always returns an empty (zero bytes) - // read, so we set it to a very short duration in the future. - if err := conn.SetReadDeadline(time.Now().Add(time.Millisecond)); err != nil { + if err := conn.SetReadDeadline(deadline); err != nil { return nil, []any{3, "Failed to set read timeout: " + err.Error()} } n, err := conn.Read(buffer) @@ -411,8 +421,8 @@ func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r } func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { - if len(params) != 2 { - return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read)"} + if len(params) != 2 && len(params) != 3 { + return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read[, optional timeout in ms])"} } id, ok := msgpackrpc.ToUint(params[0]) if !ok { @@ -428,10 +438,26 @@ func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re if !ok { return nil, []any{1, "Invalid parameter type, expected uint for max bytes to read"} } + var deadline time.Time // default value == no timeout + if len(params) == 2 { + // No timeout + } else if ms, ok := msgpackrpc.ToInt(params[2]); !ok { + return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} + } else if ms > 0 { + deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) + } else if ms == 0 { + // No timeout + } + if err := udpConn.SetReadDeadline(deadline); err != nil { + return nil, []any{3, "Failed to set read deadline: " + err.Error()} + } buffer := make([]byte, maxBytes) - n, addr, err := udpConn.ReadFrom(buffer) + if errors.Is(err, os.ErrDeadlineExceeded) { + // timeout + return nil, []any{5, "Timeout"} + } if err != nil { return nil, []any{3, "Failed to read from UDP connection: " + err.Error()} } diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index 990997d..78f3aea 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -19,6 +19,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/arduino/arduino-router/msgpackrpc" @@ -327,6 +328,27 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Nil(t, err) require.Equal(t, []uint8("Two"), res.([]any)[0]) } + + // Check timeouts + go func() { + time.Sleep(200 * time.Millisecond) + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Three")}) + require.Nil(t, err) + require.Equal(t, 5, res) + }() + { + start := time.Now() + res, err := udpRead(ctx, nil, []any{conn2, 100, 10}) + require.Less(t, time.Since(start), 20*time.Millisecond) + require.Equal(t, []any{5, "Timeout"}, err) + require.Nil(t, res) + } + { + res, err := udpRead(ctx, nil, []any{conn2, 100, 0}) + require.Nil(t, err) + require.Equal(t, []uint8("Three"), res.([]any)[0]) + } + { res, err := udpClose(ctx, nil, []any{conn1}) require.Nil(t, err) From adce95afe0357821264de45a782265d16f21f4a4 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 31 Oct 2025 15:48:19 +0100 Subject: [PATCH 4/6] Added read-buffers to udp packet reader --- network-api/network-api.go | 71 +++++++++++++++++++++++---------- network-api/network-api_test.go | 62 +++++++++++++++++++++------- 2 files changed, 98 insertions(+), 35 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index e55aaed..e76c14e 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -48,6 +48,7 @@ func Register(router *msgpackrouter.Router) { _ = router.RegisterMethod("udp/connect", udpConnect) _ = router.RegisterMethod("udp/write", udpWrite) + _ = router.RegisterMethod("udp/awaitRead", udpAwaitRead) _ = router.RegisterMethod("udp/read", udpRead) _ = router.RegisterMethod("udp/close", udpClose) } @@ -56,6 +57,7 @@ var lock sync.RWMutex var liveConnections = make(map[uint]net.Conn) var liveListeners = make(map[uint]net.Listener) var liveUdpConnections = make(map[uint]net.PacketConn) +var udpReadBuffers = make(map[uint][]byte) var nextConnectionID atomic.Uint32 // takeLockAndGenerateNextID generates a new unique ID for a connection or listener. @@ -420,28 +422,18 @@ func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r } } -func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { - if len(params) != 2 && len(params) != 3 { - return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read[, optional timeout in ms])"} +func udpAwaitRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 1 && len(params) != 2 { + return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID[, optional timeout in ms])"} } id, ok := msgpackrpc.ToUint(params[0]) if !ok { return nil, []any{1, "Invalid parameter type, expected uint for UDP connection ID"} } - lock.RLock() - udpConn, ok := liveUdpConnections[id] - lock.RUnlock() - if !ok { - return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} - } - maxBytes, ok := msgpackrpc.ToUint(params[1]) - if !ok { - return nil, []any{1, "Invalid parameter type, expected uint for max bytes to read"} - } var deadline time.Time // default value == no timeout - if len(params) == 2 { + if len(params) == 1 { // No timeout - } else if ms, ok := msgpackrpc.ToInt(params[2]); !ok { + } else if ms, ok := msgpackrpc.ToInt(params[1]); !ok { return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} } else if ms > 0 { deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) @@ -449,10 +441,16 @@ func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re // No timeout } + lock.RLock() + udpConn, ok := liveUdpConnections[id] + lock.RUnlock() + if !ok { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } if err := udpConn.SetReadDeadline(deadline); err != nil { return nil, []any{3, "Failed to set read deadline: " + err.Error()} } - buffer := make([]byte, maxBytes) + buffer := make([]byte, 64*1024) // 64 KB buffer n, addr, err := udpConn.ReadFrom(buffer) if errors.Is(err, os.ErrDeadlineExceeded) { // timeout @@ -471,7 +469,41 @@ func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re // Should never fail, but... return nil, []any{4, "Failed to parse source address: " + err.Error()} } - return []any{buffer[:n], host, port}, nil + + lock.Lock() + udpReadBuffers[id] = buffer[:n] + lock.Unlock() + return []any{n, host, port}, nil +} + +func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 2 && len(params) != 3 { + return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read)"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint for UDP connection ID"} + } + maxBytes, ok := msgpackrpc.ToUint(params[1]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint for max bytes to read"} + } + + lock.Lock() + buffer, exists := udpReadBuffers[id] + n := uint(len(buffer)) + if exists { + // keep the remainder of the buffer for the next read + if n > maxBytes { + udpReadBuffers[id] = buffer[maxBytes:] + n = maxBytes + } else { + udpReadBuffers[id] = nil + } + } + lock.Unlock() + + return buffer[:n], nil } func udpClose(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { @@ -485,9 +517,8 @@ func udpClose(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r lock.Lock() udpConn, existsConn := liveUdpConnections[id] - if existsConn { - delete(liveUdpConnections, id) - } + delete(liveUdpConnections, id) + delete(udpReadBuffers, id) lock.Unlock() if !existsConn { diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index 78f3aea..e853145 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -253,9 +253,13 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, 5, res) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) require.Nil(t, err) - require.Equal(t, []any{[]uint8("Hello"), "127.0.0.1", 9800}, res) + require.Equal(t, []any{5, "127.0.0.1", 9800}, res) + + res2, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("Hello"), res2) } { res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) @@ -268,14 +272,22 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, 3, res) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) + require.Nil(t, err) + require.Equal(t, []any{3, "127.0.0.1", 9800}, res) + + res2, err := udpRead(ctx, nil, []any{conn2, 100}) require.Nil(t, err) - require.Equal(t, []any{[]uint8("One"), "127.0.0.1", 9800}, res) + require.Equal(t, []uint8("One"), res2) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) require.Nil(t, err) - require.Equal(t, []any{[]uint8("Two"), "127.0.0.1", 9800}, res) + require.Equal(t, []any{3, "127.0.0.1", 9800}, res) + + res2, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("Two"), res2) } { res, err := udpClose(ctx, nil, []any{conn1}) @@ -304,9 +316,17 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, 5, res) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) + require.Nil(t, err) + require.Equal(t, 5, res.([]any)[0]) + + res2, err := udpRead(ctx, nil, []any{conn2, 2}) require.Nil(t, err) - require.Equal(t, []uint8("Hello"), res.([]any)[0]) + require.Equal(t, []uint8("He"), res2) + + res2, err = udpRead(ctx, nil, []any{conn2, 20}) + require.Nil(t, err) + require.Equal(t, []uint8("llo"), res2) } { res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) @@ -319,14 +339,22 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, 3, res) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) + require.Nil(t, err) + require.Equal(t, 3, res.([]any)[0]) + + res2, err := udpRead(ctx, nil, []any{conn2, 100}) require.Nil(t, err) - require.Equal(t, []uint8("One"), res.([]any)[0]) + require.Equal(t, []uint8("One"), res2) } { - res, err := udpRead(ctx, nil, []any{conn2, 100}) + res, err := udpAwaitRead(ctx, nil, []any{conn2}) require.Nil(t, err) - require.Equal(t, []uint8("Two"), res.([]any)[0]) + require.Equal(t, 3, res.([]any)[0]) + + res2, err := udpRead(ctx, nil, []any{conn2, 100}) + require.Nil(t, err) + require.Equal(t, []uint8("Two"), res2) } // Check timeouts @@ -338,15 +366,19 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { }() { start := time.Now() - res, err := udpRead(ctx, nil, []any{conn2, 100, 10}) + res, err := udpAwaitRead(ctx, nil, []any{conn2, 10}) require.Less(t, time.Since(start), 20*time.Millisecond) require.Equal(t, []any{5, "Timeout"}, err) require.Nil(t, res) } { - res, err := udpRead(ctx, nil, []any{conn2, 100, 0}) + res, err := udpAwaitRead(ctx, nil, []any{conn2, 0}) + require.Nil(t, err) + require.Equal(t, 5, res.([]any)[0]) + + res2, err := udpRead(ctx, nil, []any{conn2, 100, 0}) require.Nil(t, err) - require.Equal(t, []uint8("Three"), res.([]any)[0]) + require.Equal(t, []uint8("Three"), res2) } { From 6ea8906736c098163781de70df24ee0b74c2d8fc Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 31 Oct 2025 16:00:22 +0100 Subject: [PATCH 5/6] Let's make linter happy --- network-api/network-api.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index e76c14e..d4ec116 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -247,8 +247,6 @@ func tcpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} } else if ms > 0 { deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) - } else if ms == 0 { - // No timeout } buffer := make([]byte, maxBytes) @@ -431,14 +429,12 @@ func udpAwaitRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) return nil, []any{1, "Invalid parameter type, expected uint for UDP connection ID"} } var deadline time.Time // default value == no timeout - if len(params) == 1 { - // No timeout - } else if ms, ok := msgpackrpc.ToInt(params[1]); !ok { - return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} - } else if ms > 0 { - deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) - } else if ms == 0 { - // No timeout + if len(params) == 2 { + if ms, ok := msgpackrpc.ToInt(params[1]); !ok { + return nil, []any{1, "Invalid parameter type, expected int for timeout in ms"} + } else if ms > 0 { + deadline = time.Now().Add(time.Duration(ms) * time.Millisecond) + } } lock.RLock() From e9961b304b06f25c15d2c495cb45f4e7d3cd7499 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 31 Oct 2025 16:00:31 +0100 Subject: [PATCH 6/6] Allow tests to run in parallel --- network-api/network-api_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index e853145..079e71b 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -306,12 +306,12 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { conn1, err := udpConnect(ctx, nil, []any{"", 0}) require.Nil(t, err) - conn2, err := udpConnect(ctx, nil, []any{"0.0.0.0", 9900}) + conn2, err := udpConnect(ctx, nil, []any{"0.0.0.0", 9901}) require.Nil(t, err) require.NotEqual(t, conn1, conn2) { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Hello")}) + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Hello")}) require.Nil(t, err) require.Equal(t, 5, res) } @@ -329,12 +329,12 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, []uint8("llo"), res2) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("One")}) require.Nil(t, err) require.Equal(t, 3, res) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Two")}) + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Two")}) require.Nil(t, err) require.Equal(t, 3, res) } @@ -360,7 +360,7 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { // Check timeouts go func() { time.Sleep(200 * time.Millisecond) - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Three")}) + res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Three")}) require.Nil(t, err) require.Equal(t, 5, res) }()