forked from voldemort/voldemort
/
ClientRequestExecutor.java
301 lines (240 loc) · 10.6 KB
/
ClientRequestExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/*
* Copyright 2008-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.store.socket.clientrequest;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.apache.log4j.Level;
import voldemort.common.nio.CommBufferSizeStats;
import voldemort.common.nio.SelectorManagerWorker;
import voldemort.utils.Time;
/**
* ClientRequestExecutor represents a persistent link between a client and
* server and is used by the {@link ClientRequestExecutorPool} to execute
* {@link ClientRequest requests} for the client.
*
* Instances are maintained in a pool by {@link ClientRequestExecutorPool} using
* a checkout/checkin pattern. When an instance is checked out, the calling code
* has exclusive access to that instance. Then the
* {@link #addClientRequest(ClientRequest) request can be executed}.
*
* @see SelectorManagerWorker
* @see ClientRequestExecutorPool
*/
public class ClientRequestExecutor extends SelectorManagerWorker {
private ClientRequest<?> clientRequest;
private long expiration;
private boolean isExpired;
public ClientRequestExecutor(Selector selector,
SocketChannel socketChannel,
int socketBufferSize) {
// Not tracking or exposing the comm buffer statistics for now
super(selector, socketChannel, socketBufferSize, new CommBufferSizeStats());
isExpired = false;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public boolean isValid() {
if(isClosed())
return false;
Socket s = socketChannel.socket();
return !s.isClosed() && s.isBound() && s.isConnected();
}
public synchronized boolean checkTimeout() {
if(expiration <= 0)
return true;
if(System.nanoTime() <= expiration)
return true;
if(logger.isEnabledFor(Level.WARN))
logger.warn("Client request associated with " + socketChannel.socket() + " timed out");
isExpired = true;
close();
return false;
}
public synchronized void addClientRequest(ClientRequest<?> clientRequest) {
addClientRequest(clientRequest, -1);
}
public synchronized void addClientRequest(ClientRequest<?> clientRequest, long timeoutMs) {
addClientRequest(clientRequest, timeoutMs, 0);
}
public synchronized void addClientRequest(ClientRequest<?> clientRequest,
long timeoutMs,
long elapsedNs) {
if(logger.isTraceEnabled())
logger.trace("Associating client with " + socketChannel.socket());
this.clientRequest = clientRequest;
if(timeoutMs == -1) {
this.expiration = -1;
} else {
long nowNs = System.nanoTime();
if(elapsedNs > (Time.NS_PER_MS * timeoutMs)) {
this.expiration = nowNs;
} else {
this.expiration = nowNs + (Time.NS_PER_MS * timeoutMs) - elapsedNs;
}
if(this.expiration < nowNs)
throw new IllegalArgumentException("timeout " + timeoutMs + " not valid");
}
outputStream.getBuffer().clear();
boolean wasSuccessful = clientRequest.formatRequest(new DataOutputStream(outputStream));
if(logger.isTraceEnabled())
traceInputBufferState("About to clear read buffer");
if(inputStream.getBuffer().capacity() >= resizeThreshold)
inputStream.setBuffer(ByteBuffer.allocate(socketBufferSize));
else
inputStream.getBuffer().clear();
if(logger.isTraceEnabled())
traceInputBufferState("Cleared read buffer");
outputStream.getBuffer().flip();
if(wasSuccessful) {
SelectionKey selectionKey = socketChannel.keyFor(selector);
if(selectionKey != null) {
selectionKey.interestOps(SelectionKey.OP_WRITE);
// This wakeup is required because it's invoked by the calling
// code in a different thread than the SelectorManager.
selector.wakeup();
} else {
if(logger.isDebugEnabled())
logger.debug("Client associated with " + socketChannel.socket()
+ " was not registered with Selector " + selector
+ ", assuming initial protocol negotiation");
}
} else {
if(logger.isEnabledFor(Level.WARN))
logger.warn("Client associated with " + socketChannel.socket()
+ " did not successfully buffer output for request");
completeClientRequest();
}
}
@Override
public void close() {
// Due to certain code paths, close may be called in a recursive
// fashion. Rather than trying to handle all of the cases, simply keep
// track of whether we've been called before and only perform the logic
// once.
if(!isClosed.compareAndSet(false, true))
return;
completeClientRequest();
closeInternal();
}
@Override
protected void read(SelectionKey selectionKey) throws IOException {
if(!checkTimeout())
return;
int count = 0;
if((count = socketChannel.read(inputStream.getBuffer())) == -1)
throw new EOFException("EOF for " + socketChannel.socket());
if(logger.isTraceEnabled())
traceInputBufferState("Read " + count + " bytes");
if(count == 0)
return;
// Take note of the position after we read the bytes. We'll need it in
// case of incomplete reads later on down the method.
final int position = inputStream.getBuffer().position();
// Flip the buffer, set our limit to the current position and then set
// the position to 0 in preparation for reading in the RequestHandler.
inputStream.getBuffer().flip();
if(!clientRequest.isCompleteResponse(inputStream.getBuffer())) {
// Ouch - we're missing some data for a full request, so handle that
// and return.
handleIncompleteRequest(position);
return;
}
// At this point we have the full request (and it's not streaming), so
// rewind the buffer for reading and execute the request.
inputStream.getBuffer().rewind();
if(logger.isTraceEnabled())
logger.trace("Starting read for " + socketChannel.socket());
clientRequest.parseResponse(new DataInputStream(inputStream));
// At this point we've completed a full stand-alone request. So clear
// our input buffer and prepare for outputting back to the client.
if(logger.isTraceEnabled())
logger.trace("Finished read for " + socketChannel.socket());
selectionKey.interestOps(0);
completeClientRequest();
}
@Override
protected void write(SelectionKey selectionKey) throws IOException {
if(!checkTimeout())
return;
if(outputStream.getBuffer().hasRemaining()) {
// If we have data, write what we can now...
int count = socketChannel.write(outputStream.getBuffer());
if(logger.isTraceEnabled())
logger.trace("Wrote " + count + " bytes, remaining: "
+ outputStream.getBuffer().remaining() + " for "
+ socketChannel.socket());
} else {
if(logger.isTraceEnabled())
logger.trace("Wrote no bytes for " + socketChannel.socket());
}
// If there's more to write but we didn't write it, we'll take that to
// mean that we're done here. We don't clear or reset anything. We leave
// our buffer state where it is and try our luck next time.
if(outputStream.getBuffer().hasRemaining())
return;
// If we don't have anything else to write, that means we're done with
// the request! So clear the buffers (resizing if necessary).
if(outputStream.getBuffer().capacity() >= resizeThreshold)
outputStream.setBuffer(ByteBuffer.allocate(socketBufferSize));
else
outputStream.getBuffer().clear();
// If we're not streaming writes, signal the Selector that we're
// ready to read the next request.
selectionKey.interestOps(SelectionKey.OP_READ);
}
/**
* Null out our client request *before* calling complete because of the case
* where complete will cause a ClientRequestExecutor check-in (in
* SocketStore.NonblockingStoreCallbackClientRequest) and we'll end up
* recursing back here again when close is called in which case we'll try to
* check in the instance again which causes problems for the pool
* maintenance.
*/
private synchronized ClientRequest<?> atomicNullOutClientRequest() {
ClientRequest<?> local = clientRequest;
clientRequest = null;
expiration = 0;
return local;
}
/**
* Null out current clientRequest before calling complete. timeOut and
* complete must *not* be within a synchronized block since both eventually
* check in the client request executor. Such a check in can trigger
* additional synchronized methods deeper in the stack.
*/
private void completeClientRequest() {
ClientRequest<?> local = atomicNullOutClientRequest();
if(local == null) {
if(logger.isEnabledFor(Level.WARN))
logger.warn("No client associated with " + socketChannel.socket());
return;
}
if(isExpired)
local.timeOut();
else
local.complete();
if(logger.isTraceEnabled())
logger.trace("Marked client associated with " + socketChannel.socket() + " as complete");
}
}