Skip to content

Commit aa67a9e

Browse files
committed
Added unit-test for testing message congestion.
Also added a placeholder on the msgpackrouter constructor to specify the desired queue size (it will be implemented later).
1 parent 8cfea03 commit aa67a9e

File tree

3 files changed

+66
-3
lines changed

3 files changed

+66
-3
lines changed

internal/msgpackrouter/router.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,26 @@ type Router struct {
3333
routesLock sync.Mutex
3434
routes map[string]*msgpackrpc.Connection
3535
routesInternal map[string]RouterRequestHandler
36+
sendQueueSize int
3637
}
3738

38-
func New() *Router {
39+
func New(perConnSendQueueSize int) *Router {
3940
return &Router{
4041
routes: make(map[string]*msgpackrpc.Connection),
4142
routesInternal: make(map[string]RouterRequestHandler),
43+
sendQueueSize: perConnSendQueueSize,
4244
}
4345
}
4446

47+
// SetSendQueueSize sets the size of the send queue for each connection.
48+
// Only new connections will be affected by this change, existing connections
49+
// will keep their current send queue size.
50+
func (r *Router) SetSendQueueSize(size int) {
51+
r.routesLock.Lock()
52+
defer r.routesLock.Unlock()
53+
r.sendQueueSize = size
54+
}
55+
4556
func (r *Router) Accept(conn io.ReadWriteCloser) <-chan struct{} {
4657
res := make(chan struct{})
4758
go func() {

internal/msgpackrouter/router_test.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestBasicRouterFunctionality(t *testing.T) {
9292
})
9393
go cl2.Run()
9494

95-
router := msgpackrouter.New()
95+
router := msgpackrouter.New(0)
9696
router.Accept(ch1b)
9797
router.Accept(ch2b)
9898

@@ -160,3 +160,55 @@ func TestBasicRouterFunctionality(t *testing.T) {
160160
require.Contains(t, cl1Notifications.String(), "notification: ping [b 14 true true]")
161161
cl1NotificationsMux.Unlock()
162162
}
163+
164+
func TestMessageForwarderCongestionControl(t *testing.T) {
165+
// Test parameters
166+
queueSize := 5
167+
msgLatency := 100 * time.Millisecond
168+
// Run a batch of 20 requests, and expect them to take more than 400 ms
169+
// in total because the router should throttle requests in batch of 5.
170+
batchSize := queueSize * 4
171+
expectedLatency := msgLatency * time.Duration(batchSize/queueSize)
172+
173+
// Make a client that simulates a slow response
174+
ch1a, ch1b := newFullPipe()
175+
cl1 := msgpackrpc.NewConnection(ch1a, ch1a, func(ctx context.Context, logger msgpackrpc.FunctionLogger, method string, params []any) (_result any, _err any) {
176+
time.Sleep(msgLatency)
177+
return true, nil
178+
}, nil, nil)
179+
go cl1.Run()
180+
181+
// Make a second client to send requests, without any delay
182+
ch2a, ch2b := newFullPipe()
183+
cl2 := msgpackrpc.NewConnection(ch2a, ch2a, nil, nil, nil)
184+
go cl2.Run()
185+
186+
// Setup router
187+
router := msgpackrouter.New(queueSize) // max 5 pending messages per connection
188+
router.Accept(ch1b)
189+
router.Accept(ch2b)
190+
191+
{
192+
// Register a method on the first client
193+
result, reqErr, err := cl1.SendRequest(context.Background(), "$/register", []any{"test"})
194+
require.Equal(t, true, result)
195+
require.Nil(t, reqErr)
196+
require.NoError(t, err)
197+
}
198+
199+
// Run batch of requests from cl2 to cl1
200+
start := time.Now()
201+
var wg sync.WaitGroup
202+
for range batchSize {
203+
wg.Go(func() {
204+
_, _, err := cl2.SendRequest(t.Context(), "test", []any{})
205+
require.NoError(t, err)
206+
})
207+
}
208+
wg.Wait()
209+
elapsed := time.Since(start)
210+
211+
// Check that the elapsed time is greater than expectedLatency
212+
fmt.Println("Elapsed time for requests:", elapsed)
213+
require.Greater(t, elapsed, expectedLatency, "Expected elapsed time to be greater than %s", expectedLatency)
214+
}

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func startRouter(cfg Config) error {
155155
}
156156

157157
// Run router
158-
router := msgpackrouter.New()
158+
router := msgpackrouter.New(0)
159159

160160
// Register TCP network API methods
161161
networkapi.Register(router)

0 commit comments

Comments
 (0)