-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
WebTransport.h
233 lines (199 loc) · 8.18 KB
/
WebTransport.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/CancellationToken.h>
#include <folly/Expected.h>
#include <folly/Optional.h>
#include <folly/futures/Future.h>
#include <folly/io/IOBuf.h>
#include <proxygen/lib/http/HTTPMessage.h>
namespace proxygen {
// Generic WebTransport interface
//
// Principles:
//
// 1. It should be easy to write simple applications
// 2. The backpressure and error handling APIs should be understandable
// 3. The same generic API should work for proxygen/lib and proxygen::coro
//
// Futures is the best way to implement #3 because they can be easily
// awaited in a coroutine.
//
// Note there are no APIs to await new streams opened by the peer,
// datagrams sent by the peer, or session closure.
// => These signals are delivered via the HTTP API
class WebTransport {
public:
virtual ~WebTransport() = default;
// Errors that can be returned from API
enum class ErrorCode {
GENERIC_ERROR = 0x00,
INVALID_STREAM_ID,
STREAM_CREATION_ERROR,
SEND_ERROR
};
static bool isConnectMessage(const proxygen::HTTPMessage& msg) {
static const std::string kWebTransport{"webtransport"};
return msg.isRequest() &&
msg.getMethod() == proxygen::HTTPMethod::CONNECT &&
msg.getUpgradeProtocol() &&
*msg.getUpgradeProtocol() == kWebTransport;
}
static constexpr uint64_t kFirstErrorCode = 0x52e4a40fa8db;
static constexpr uint64_t kLastErrorCode = 0x52e5ac983162;
static uint64_t toHTTPErrorCode(uint32_t n) {
return kFirstErrorCode + n + (n / 0x1e);
}
static bool isEncodedApplicationErrorCode(uint64_t x) {
return x >= kFirstErrorCode && x <= kLastErrorCode &&
((x - 0x21) % 0x1f) != 0;
}
static folly::Expected<uint32_t, WebTransport::ErrorCode>
toApplicationErrorCode(uint64_t h) {
if (!isEncodedApplicationErrorCode(h)) {
// This is not for us
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
uint64_t shifted = h - kFirstErrorCode;
uint64_t appErrorCode = shifted - (shifted / 0x1f);
DCHECK_LE(appErrorCode, std::numeric_limits<uint32_t>::max());
return static_cast<uint32_t>(appErrorCode);
}
static constexpr uint32_t kInternalError =
std::numeric_limits<uint32_t>::max();
class Exception : public std::runtime_error {
public:
explicit Exception(uint32_t inError)
: std::runtime_error(folly::to<std::string>(
"Peer reset or abandoned stream with error=", inError)),
error(inError) {
}
uint32_t error;
};
// The result of a read() operation
struct StreamData {
std::unique_ptr<folly::IOBuf> data;
bool fin;
};
// Base class for StreamReadHandle / StreamWriteHandle
class StreamHandleBase {
public:
virtual ~StreamHandleBase() = default;
virtual uint64_t getID() = 0;
// The caller may register a CancellationCallback on this token to be
// notified of asynchronous cancellation of the stream by the peer.
//
// For StreamWriteHandle in particular, the handle is still valid in a
// CancellationCallback, but not after that. If the app doesn't terminate
// the stream from the callback, the stream will be reset automatically.
virtual folly::CancellationToken getCancelToken() = 0;
};
// Handle for read streams
class StreamReadHandle : public StreamHandleBase {
public:
~StreamReadHandle() override = default;
// Wait for data to be delivered on the stream. If the stream is reset by
// the peer, a StreamReadHandle::Exception will be raised in the Future with
// the error code. The Future may observe other exceptions such as
// folly::OperationCancelled if the session was closed, etc.
//
// The StreamReadHandle is invalid after reading StreamData with fin=true,
// or an exception.
virtual folly::SemiFuture<StreamData> readStreamData() = 0;
using ReadStreamDataFn =
std::function<void(StreamReadHandle*, folly::Try<StreamData>)>;
void awaitNextRead(
folly::Executor* exec,
const ReadStreamDataFn& readCb,
folly::Optional<std::chrono::milliseconds> timeout = folly::none) {
auto fut = readStreamData();
if (timeout) {
fut = std::move(fut).within(*timeout);
}
std::move(fut).via(exec).thenTry([this, readCb](auto streamData) {
readCb(this, std::move(streamData));
});
}
// Notify the peer to stop sending data. The StreamReadHandle is invalid
// after this API call.
virtual folly::Expected<folly::Unit, ErrorCode> stopSending(
uint32_t error) = 0;
};
// Handle for write streams
class StreamWriteHandle : public StreamHandleBase {
public:
~StreamWriteHandle() override = default;
// Write the data and optional fin to the stream. The returned Future will
// complete when the stream is available for more writes.
//
// The StreamWriteHandle becomes invalid after calling writeStreamData with
// fin=true or calling resetStream.
//
// If the peer sends a STOP_SENDING, the app is notified via the
// CancellationToken for this handle, and the code can be queried via
// stopSendingErrorCode. The app SHOULD reset the stream from a
// CancellationCallback. Calling writeStreamData from the callback will
// fail with a WebTransport::Exception with the stopSendingErrorCode.
// After the cancellation callback, the StreamWriteHandle is invalid.
virtual folly::Expected<folly::SemiFuture<folly::Unit>, ErrorCode>
writeStreamData(std::unique_ptr<folly::IOBuf> data, bool fin) = 0;
// Reset the stream with the given error
virtual folly::Expected<folly::Unit, ErrorCode> resetStream(
uint32_t error) = 0;
// Error code from the peer's STOP_SENDING message
folly::Optional<uint32_t> stopSendingErrorCode() {
return stopSendingErrorCode_;
}
protected:
folly::Optional<uint32_t> stopSendingErrorCode_;
};
// Handle for bidirectional streams
struct BidiStreamHandle {
StreamReadHandle* readHandle;
StreamWriteHandle* writeHandle;
};
// Create a new unidirectional stream
//
// Returns a StreamWriteHandle to the new stream if successful, or ErrorCode,
// including in cases where stream credit is exhausted
virtual folly::Expected<StreamWriteHandle*, ErrorCode> createUniStream() = 0;
// Create a new bididirectional stream
//
// Returns a BidiStreamHandle to the new stream if successful, or ErrorCode,
// including in cases where stream credit is exhausted. Note the application
// needs to call readStreamData to read from the read half.
virtual folly::Expected<BidiStreamHandle, ErrorCode> createBidiStream() = 0;
// Wait for credit to create a stream of the given type. If stream credit
// is available, will immediately return a ready SemiFuture.
virtual folly::SemiFuture<folly::Unit> awaitUniStreamCredit() = 0;
virtual folly::SemiFuture<folly::Unit> awaitBidiStreamCredit() = 0;
// API using stream IDs
// These methods may be used if the app wants to manipulate open streams
// without holding their handles
virtual folly::Expected<folly::SemiFuture<StreamData>,
WebTransport::ErrorCode>
readStreamData(uint64_t id) = 0;
virtual folly::Expected<folly::SemiFuture<folly::Unit>, ErrorCode>
writeStreamData(uint64_t id,
std::unique_ptr<folly::IOBuf> data,
bool fin) = 0;
virtual folly::Expected<folly::Unit, ErrorCode> resetStream(
uint64_t streamId, uint32_t error) = 0;
virtual folly::Expected<folly::Unit, ErrorCode> stopSending(
uint64_t streamId, uint32_t error) = 0;
// Sends the buffer as a datagram
virtual folly::Expected<folly::Unit, ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> datagram) = 0;
// Close the WebTransport session, with an optional error
//
// Any pending futures will complete with a folly::OperationCancelled
// exception
virtual folly::Expected<folly::Unit, ErrorCode> closeSession(
folly::Optional<uint32_t> error = folly::none) = 0;
};
} // namespace proxygen