/
VertxHttp2NetSocket.java
271 lines (228 loc) · 6.35 KB
/
VertxHttp2NetSocket.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.core.http.impl;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.CharsetUtil;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Stream<C> implements NetSocket {
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
private Handler<Buffer> dataHandler;
private Handler<Void> drainHandler;
public VertxHttp2NetSocket(C connection, Http2Stream stream) {
super(connection, stream);
}
// Stream impl
@Override
void callEnd() {
try {
if (endHandler != null) {
// Give opportunity to send a last chunk
endHandler.handle(null);
}
} finally {
end();
}
}
@Override
void callHandler(Buffer buf) {
if (dataHandler != null) {
dataHandler.handle(buf);
}
}
@Override
void callReset(long errorCode) {
handleException(new StreamResetException(errorCode));
}
@Override
void handleException(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}
@Override
void handleClose() {
if (closeHandler != null) {
closeHandler.handle(null);
}
}
@Override
void handleInterestedOpsChanged() {
Handler<Void> handler = this.drainHandler;
if (handler != null && !writeQueueFull()) {
vertx.runOnContext(v -> {
handler.handle(null);
});
}
}
// NetSocket impl
@Override
public NetSocket exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public NetSocket handler(Handler<Buffer> handler) {
dataHandler = handler;
return this;
}
@Override
public NetSocket pause() {
return this;
}
@Override
public NetSocket resume() {
return this;
}
@Override
public NetSocket endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
@Override
public NetSocket write(Buffer data) {
writeData(data.getByteBuf(), false);
return this;
}
@Override
public NetSocket setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public NetSocket drainHandler(Handler<Void> handler) {
drainHandler = handler;
return this;
}
@Override
public boolean writeQueueFull() {
return isNotWritable();
}
@Override
public String writeHandlerID() {
return null;
}
@Override
public NetSocket write(String str) {
writeData(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8), false);
return this;
}
@Override
public NetSocket write(String str, String enc) {
if (enc == null) {
write(str);
} else {
writeData(Unpooled.copiedBuffer(str, Charset.forName(enc)), false);
}
return this;
}
@Override
public NetSocket sendFile(String filename, long offset, long length) {
return sendFile(filename, offset, length, null);
}
@Override
public NetSocket sendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler) {
Context resultCtx = resultHandler != null ? vertx.getOrCreateContext() : null;
File file = vertx.resolveFile(filename);
if (!file.exists()) {
if (resultHandler != null) {
resultCtx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(new FileNotFoundException())));
} else {
// log.error("File not found: " + filename);
}
return this;
}
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (IOException e) {
if (resultHandler != null) {
resultCtx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(e)));
} else {
//log.error("Failed to send file", e);
}
return this;
}
long contentLength = Math.min(length, file.length() - offset);
FileStreamChannel fileChannel = new FileStreamChannel(ar -> {
if (resultHandler != null) {
resultCtx.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture());
});
}
}, this, offset, contentLength);
drainHandler(fileChannel.drainHandler);
channel.eventLoop().register(fileChannel);
fileChannel.pipeline().fireUserEventTriggered(raf);
return this;
}
@Override
public SocketAddress remoteAddress() {
return conn.remoteAddress();
}
@Override
public SocketAddress localAddress() {
return conn.localAddress();
}
@Override
public void end() {
writeData(Unpooled.EMPTY_BUFFER, true);
}
@Override
public void end(Buffer buffer) {
writeData(buffer.getByteBuf(), true);
}
@Override
public void close() {
end();
}
@Override
public NetSocket closeHandler(@Nullable Handler<Void> handler) {
closeHandler = handler;
return this;
}
@Override
public NetSocket upgradeToSsl(Handler<Void> handler) {
throw new UnsupportedOperationException("todo");
}
@Override
public boolean isSsl() {
throw new UnsupportedOperationException("todo");
}
@Override
public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException {
return conn.getPeerCertificateChain();
}
}