Skip to content

Commit

Permalink
http2: change default frame scheduler to round robin
Browse files Browse the repository at this point in the history
The priority scheduler allows stream starvation (see golang/go#58804)
and is CPU intensive. In addition, the RFC 7540 prioritization
scheme it implements was deprecated in RFC 9113 and does not appear
to have ever had significant adoption.

Add a simple round-robin scheduler and enable it by default.

For golang/go#58804

Change-Id: I5c5143aa9bc339fc0894f70d773fa7c0d7d87eef
Reviewed-on: https://go-review.googlesource.com/c/net/+/478735
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Bryan Mills <bcmills@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
  • Loading branch information
neild committed May 15, 2023
1 parent 2b0b97d commit 120fc90
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 2 deletions.
2 changes: 1 addition & 1 deletion http2/server.go
Expand Up @@ -441,7 +441,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
sc.writeSched = NewPriorityWriteScheduler(nil)
sc.writeSched = newRoundRobinWriteScheduler()
}

// These start at the RFC-specified defaults. If there is a higher
Expand Down
3 changes: 2 additions & 1 deletion http2/writesched.go
Expand Up @@ -184,7 +184,8 @@ func (wr *FrameWriteRequest) replyToWriter(err error) {

// writeQueue is used by implementations of WriteScheduler.
type writeQueue struct {
s []FrameWriteRequest
s []FrameWriteRequest
prev, next *writeQueue
}

func (q *writeQueue) empty() bool { return len(q.s) == 0 }
Expand Down
119 changes: 119 additions & 0 deletions http2/writesched_roundrobin.go
@@ -0,0 +1,119 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http2

import (
"fmt"
"math"
)

type roundRobinWriteScheduler struct {
// control contains control frames (SETTINGS, PING, etc.).
control writeQueue

// streams maps stream ID to a queue.
streams map[uint32]*writeQueue

// stream queues are stored in a circular linked list.
// head is the next stream to write, or nil if there are no streams open.
head *writeQueue

// pool of empty queues for reuse.
queuePool writeQueuePool
}

// newRoundRobinWriteScheduler constructs a new write scheduler.
// The round robin scheduler priorizes control frames
// like SETTINGS and PING over DATA frames.
// When there are no control frames to send, it performs a round-robin
// selection from the ready streams.
func newRoundRobinWriteScheduler() WriteScheduler {
ws := &roundRobinWriteScheduler{
streams: make(map[uint32]*writeQueue),
}
return ws
}

func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
if ws.streams[streamID] != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
q := ws.queuePool.get()
ws.streams[streamID] = q
if ws.head == nil {
ws.head = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.head.prev
q.next = ws.head
q.prev.next = q
q.next.prev = q
}
}

func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
q := ws.streams[streamID]
if q == nil {
return
}
if q.next == q {
// This was the only open stream.
ws.head = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.head == q {
ws.head = q.next
}
}
delete(ws.streams, streamID)
ws.queuePool.put(q)
}

func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}

func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
if wr.isControl() {
ws.control.push(wr)
return
}
q := ws.streams[wr.StreamID()]
if q == nil {
// This is a closed stream.
// wr should not be a HEADERS or DATA frame.
// We push the request onto the control queue.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
ws.control.push(wr)
return
}
q.push(wr)
}

func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
// Control and RST_STREAM frames first.
if !ws.control.empty() {
return ws.control.shift(), true
}
if ws.head == nil {
return FrameWriteRequest{}, false
}
q := ws.head
for {
if wr, ok := q.consume(math.MaxInt32); ok {
ws.head = q.next
return wr, true
}
q = q.next
if q == ws.head {
break
}
}
return FrameWriteRequest{}, false
}
65 changes: 65 additions & 0 deletions http2/writesched_roundrobin_test.go
@@ -0,0 +1,65 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http2

import (
"reflect"
"testing"
)

func TestRoundRobinScheduler(t *testing.T) {
const maxFrameSize = 16
sc := &serverConn{maxFrameSize: maxFrameSize}
ws := newRoundRobinWriteScheduler()
streams := make([]*stream, 4)
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 20) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{})
wr := FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: make([]byte, maxFrameSize*(i+1)),
endStream: false,
},
stream: streams[i],
}
ws.Push(wr)
}
const controlFrames = 2
for i := 0; i < controlFrames; i++ {
ws.Push(makeWriteNonStreamRequest())
}

// We should get the control frames first.
for i := 0; i < controlFrames; i++ {
wr, ok := ws.Pop()
if !ok || wr.StreamID() != 0 {
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
}
}

// Each stream should write maxFrameSize bytes until it runs out of data.
// Stream 1 has one frame of data, 2 has two frames, etc.
want := []uint32{1, 2, 3, 4, 2, 3, 4, 3, 4, 4}
var got []uint32
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
got = append(got, wr.StreamID())
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}

0 comments on commit 120fc90

Please sign in to comment.