forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
http_stream.go
123 lines (105 loc) · 2.85 KB
/
http_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package http3
import (
"errors"
"fmt"
"github.com/danielpfeifer02/quic-go-prio-packs"
)
// A Stream is a HTTP/3 stream.
// When writing to and reading from the stream, data is framed in HTTP/3 DATA frames.
type Stream quic.Stream
// The stream conforms to the quic.Stream interface, but instead of writing to and reading directly
// from the QUIC stream, it writes to and reads from the HTTP stream.
type stream struct {
quic.Stream
buf []byte
onFrameError func()
bytesRemainingInFrame uint64
}
var _ Stream = &stream{}
func newStream(str quic.Stream, onFrameError func()) *stream {
return &stream{
Stream: str,
onFrameError: onFrameError,
buf: make([]byte, 0, 16),
}
}
func (s *stream) Read(b []byte) (int, error) {
if s.bytesRemainingInFrame == 0 {
parseLoop:
for {
frame, err := parseNextFrame(s.Stream, nil)
if err != nil {
return 0, err
}
switch f := frame.(type) {
case *headersFrame:
// skip HEADERS frames
continue
case *dataFrame:
s.bytesRemainingInFrame = f.Length
break parseLoop
default:
s.onFrameError()
// parseNextFrame skips over unknown frame types
// Therefore, this condition is only entered when we parsed another known frame type.
return 0, fmt.Errorf("peer sent an unexpected frame: %T", f)
}
}
}
var n int
var err error
if s.bytesRemainingInFrame < uint64(len(b)) {
n, err = s.Stream.Read(b[:s.bytesRemainingInFrame])
} else {
n, err = s.Stream.Read(b)
}
s.bytesRemainingInFrame -= uint64(n)
return n, err
}
func (s *stream) hasMoreData() bool {
return s.bytesRemainingInFrame > 0
}
func (s *stream) Write(b []byte) (int, error) {
s.buf = s.buf[:0]
s.buf = (&dataFrame{Length: uint64(len(b))}).Append(s.buf)
if _, err := s.Stream.Write(s.buf); err != nil {
return 0, err
}
return s.Stream.Write(b)
}
var errTooMuchData = errors.New("peer sent too much data")
type lengthLimitedStream struct {
*stream
contentLength int64
read int64
resetStream bool
}
var _ Stream = &lengthLimitedStream{}
func newLengthLimitedStream(str *stream, contentLength int64) *lengthLimitedStream {
return &lengthLimitedStream{
stream: str,
contentLength: contentLength,
}
}
func (s *lengthLimitedStream) checkContentLengthViolation() error {
if s.read > s.contentLength || s.read == s.contentLength && s.hasMoreData() {
if !s.resetStream {
s.CancelRead(quic.StreamErrorCode(ErrCodeMessageError))
s.CancelWrite(quic.StreamErrorCode(ErrCodeMessageError))
s.resetStream = true
}
return errTooMuchData
}
return nil
}
func (s *lengthLimitedStream) Read(b []byte) (int, error) {
if err := s.checkContentLengthViolation(); err != nil {
return 0, err
}
n, err := s.stream.Read(b[:min(int64(len(b)), s.contentLength-s.read)])
s.read += int64(n)
if err := s.checkContentLengthViolation(); err != nil {
return n, err
}
return n, err
}