-
Notifications
You must be signed in to change notification settings - Fork 4k
/
TTransport.h
357 lines (322 loc) · 11.5 KB
/
TTransport.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
#include <thrift/Thrift.h>
#include <thrift/TConfiguration.h>
#include <thrift/transport/TTransportException.h>
#include <memory>
#include <string>
namespace apache {
namespace thrift {
namespace transport {
/**
* Helper template to hoist readAll implementation out of TTransport
*/
template <class Transport_>
uint32_t readAll(Transport_& trans, uint8_t* buf, uint32_t len) {
uint32_t have = 0;
uint32_t get = 0;
while (have < len) {
get = trans.read(buf + have, len - have);
if (get <= 0) {
throw TTransportException(TTransportException::END_OF_FILE, "No more data to read.");
}
have += get;
}
return have;
}
/**
* Generic interface for a method of transporting data. A TTransport may be
* capable of either reading or writing, but not necessarily both.
*
*/
class TTransport {
public:
TTransport(std::shared_ptr<TConfiguration> config = nullptr) {
if(config == nullptr) {
configuration_ = std::shared_ptr<TConfiguration> (new TConfiguration());
} else {
configuration_ = config;
}
resetConsumedMessageSize();
}
/**
* Virtual deconstructor.
*/
virtual ~TTransport() = default;
/**
* Whether this transport is open.
*/
virtual bool isOpen() const { return false; }
/**
* Tests whether there is more data to read or if the remote side is
* still open. By default this is true whenever the transport is open,
* but implementations should add logic to test for this condition where
* possible (i.e. on a socket).
* This is used by a server to check if it should listen for another
* request.
*/
virtual bool peek() { return isOpen(); }
/**
* Opens the transport for communications.
*
* @return bool Whether the transport was successfully opened
* @throws TTransportException if opening failed
*/
virtual void open() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
}
/**
* Closes the transport.
*/
virtual void close() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
}
/**
* Attempt to read up to the specified number of bytes into the string.
*
* @param buf Reference to the location to write the data
* @param len How many bytes to read
* @return How many bytes were actually read
* @throws TTransportException If an error occurs
*/
uint32_t read(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return read_virt(buf, len);
}
virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
}
/**
* Reads the given amount of data in its entirety no matter what.
*
* @param s Reference to location for read data
* @param len How many bytes to read
* @return How many bytes read, which must be equal to size
* @throws TTransportException If insufficient data was read
*/
uint32_t readAll(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return readAll_virt(buf, len);
}
virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
return apache::thrift::transport::readAll(*this, buf, len);
}
/**
* Called when read is completed.
* This can be over-ridden to perform a transport-specific action
* e.g. logging the request to a file
*
* @return number of bytes read if available, 0 otherwise.
*/
virtual uint32_t readEnd() {
// default behaviour is to do nothing
return 0;
}
/**
* Writes the string in its entirety to the buffer.
*
* Note: You must call flush() to ensure the data is actually written,
* and available to be read back in the future. Destroying a TTransport
* object does not automatically flush pending data--if you destroy a
* TTransport object with written but unflushed data, that data may be
* discarded.
*
* @param buf The data to write out
* @throws TTransportException if an error occurs
*/
void write(const uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
write_virt(buf, len);
}
virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
}
/**
* Called when write is completed.
* This can be over-ridden to perform a transport-specific action
* at the end of a request.
*
* @return number of bytes written if available, 0 otherwise
*/
virtual uint32_t writeEnd() {
// default behaviour is to do nothing
return 0;
}
/**
* Flushes any pending data to be written. Typically used with buffered
* transport mechanisms.
*
* @throws TTransportException if an error occurs
*/
virtual void flush() {
// default behaviour is to do nothing
}
/**
* Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
* Does not consume the bytes read (i.e.: a later read will return the same
* data). This method is meant to support protocols that need to read
* variable-length fields. They can attempt to borrow the maximum amount of
* data that they will need, then consume (see next method) what they
* actually use. Some transports will not support this method and others
* will fail occasionally, so protocols must be prepared to use read if
* borrow fails.
*
* @oaram buf A buffer where the data can be stored if needed.
* If borrow doesn't return buf, then the contents of
* buf after the call are undefined. This parameter may be
* nullptr to indicate that the caller is not supplying storage,
* but would like a pointer into an internal buffer, if
* available.
* @param len *len should initially contain the number of bytes to borrow.
* If borrow succeeds, *len will contain the number of bytes
* available in the returned pointer. This will be at least
* what was requested, but may be more if borrow returns
* a pointer to an internal buffer, rather than buf.
* If borrow fails, the contents of *len are undefined.
* @return If the borrow succeeds, return a pointer to the borrowed data.
* This might be equal to \c buf, or it might be a pointer into
* the transport's internal buffers.
* @throws TTransportException if an error occurs
*/
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
T_VIRTUAL_CALL();
return borrow_virt(buf, len);
}
virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { return nullptr; }
/**
* Remove len bytes from the transport. This should always follow a borrow
* of at least len bytes, and should always succeed.
* TODO(dreiss): Is there any transport that could borrow but fail to
* consume, or that would require a buffer to dump the consumed data?
*
* @param len How many bytes to consume
* @throws TTransportException If an error occurs
*/
void consume(uint32_t len) {
T_VIRTUAL_CALL();
consume_virt(len);
}
virtual void consume_virt(uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
}
/**
* Returns the origin of the transports call. The value depends on the
* transport used. An IP based transport for example will return the
* IP address of the client making the request.
* If the transport doesn't know the origin Unknown is returned.
*
* The returned value can be used in a log message for example
*/
virtual const std::string getOrigin() const { return "Unknown"; }
std::shared_ptr<TConfiguration> getConfiguration() { return configuration_; }
void setConfiguration(std::shared_ptr<TConfiguration> config) {
if (config != nullptr) configuration_ = config;
}
/**
* Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
* Will throw if we already consumed too many bytes or if the new size is larger than allowed.
*
* @param size real message size
*/
void updateKnownMessageSize(long int size)
{
long int consumed = knownMessageSize_ - remainingMessageSize_;
resetConsumedMessageSize(size);
countConsumedMessageBytes(consumed);
}
/**
* Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
*
* @param numBytes numBytes bytes of data
*/
void checkReadBytesAvailable(long int numBytes)
{
if (remainingMessageSize_ < numBytes)
throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
}
protected:
std::shared_ptr<TConfiguration> configuration_;
long int remainingMessageSize_;
long int knownMessageSize_;
inline long int getRemainingMessageSize() { return remainingMessageSize_; }
inline void setRemainingMessageSize(long int remainingMessageSize) { remainingMessageSize_ = remainingMessageSize; }
inline int getMaxMessageSize() { return configuration_->getMaxMessageSize(); }
inline long int getKnownMessageSize() { return knownMessageSize_; }
void setKnownMessageSize(long int knownMessageSize) { knownMessageSize_ = knownMessageSize; }
/**
* Resets RemainingMessageSize to the configured maximum
*
* @param newSize configured size
*/
void resetConsumedMessageSize(long newSize = -1)
{
// full reset
if (newSize < 0)
{
knownMessageSize_ = getMaxMessageSize();
remainingMessageSize_ = getMaxMessageSize();
return;
}
// update only: message size can shrink, but not grow
if (newSize > knownMessageSize_)
throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
knownMessageSize_ = newSize;
remainingMessageSize_ = newSize;
}
/**
* Consumes numBytes from the RemainingMessageSize.
*
* @param numBytes Consumes numBytes
*/
void countConsumedMessageBytes(long int numBytes)
{
if (remainingMessageSize_ >= numBytes)
{
remainingMessageSize_ -= numBytes;
}
else
{
remainingMessageSize_ = 0;
throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
}
}
};
/**
* Generic factory class to make an input and output transport out of a
* source transport. Commonly used inside servers to make input and output
* streams out of raw clients.
*
*/
class TTransportFactory {
public:
TTransportFactory() = default;
virtual ~TTransportFactory() = default;
/**
* Default implementation does nothing, just returns the transport given.
*/
virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
return trans;
}
};
}
}
} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_