Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions 02nio/nio02/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>

<!--
<!--引入httpClient-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
Expand All @@ -64,7 +69,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
-->

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.kimmking.gateway;

import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

public static ThreadPoolExecutor getThreadPoolExecutor(){
int cores = Runtime.getRuntime().availableProcessors() * 2;
System.out.println(Runtime.getRuntime().availableProcessors());
long keepAliveTime = 1000;
int queueSize = 2048;
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy();
ThreadPoolExecutor proxyService = new ThreadPoolExecutor(cores,
cores,
keepAliveTime,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("proxyService"),
handler);
return proxyService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
public interface HttpRequestFilter {

void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx);

}
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
package io.github.kimmking.gateway.inbound;

import io.github.kimmking.gateway.filter.HttpRequestFilter;
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
import io.github.kimmking.gateway.outbound.myselfhttpclient.MyHttpOutboundHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class HttpInboundHandler extends ChannelInboundHandlerAdapter implements HttpRequestFilter {

private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
private final String proxyServer;
private HttpOutboundHandler handler;

// 自己写的Httpclient组件
private MyHttpOutboundHandler myHandler;

public HttpInboundHandler(String proxyServer) {
this.proxyServer = proxyServer;
// 老师
handler = new HttpOutboundHandler(this.proxyServer);
// 自己写的Httpclient组件
myHandler = new MyHttpOutboundHandler(this.proxyServer);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
Expand All @@ -27,50 +41,55 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//logger.info("channelRead流量接口请求开始,时间为{}", startTime);
long startTime = System.currentTimeMillis();
logger.info("channelRead流量接口请求开始,时间为{}", startTime);
FullHttpRequest fullRequest = (FullHttpRequest) msg;
// String uri = fullRequest.uri();
// //logger.info("接收到的请求url为{}", uri);
// if (uri.contains("/test")) {
// handlerTest(fullRequest, ctx);
// }

handler.handle(fullRequest, ctx);

} catch(Exception e) {
// 自定义过滤器
filter(fullRequest,ctx);
// 自己写的HttpClient
myHandler.handler(fullRequest,ctx);
// 老师写的
// handler.handle(fullRequest, ctx);

} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}

// private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
// FullHttpResponse response = null;
// try {
// String value = "hello,kimmking";
// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
// response.headers().set("Content-Type", "application/json");
// response.headers().setInt("Content-Length", response.content().readableBytes());
//
// } catch (Exception e) {
// logger.error("处理测试接口出错", e);
// response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
// } finally {
// if (fullRequest != null) {
// if (!HttpUtil.isKeepAlive(fullRequest)) {
// ctx.write(response).addListener(ChannelFutureListener.CLOSE);
// } else {
// response.headers().set(CONNECTION, KEEP_ALIVE);
// ctx.write(response);
// }
// }
// }
// }
//
// @Override
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// cause.printStackTrace();
// ctx.close();
// }
@Override
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
HttpHeaders headers = fullRequest.headers();
headers.set("nio","BAIFUKUAN");
}
private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
FullHttpResponse response = null;
try {
String value = "hello,kimmking";
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
response.headers().set("Content-Type", "application/json");
response.headers().setInt("Content-Length", response.content().readableBytes());

} catch (Exception e) {
logger.error("处理测试接口出错", e);
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
} finally {
if (fullRequest != null) {
if (!HttpUtil.isKeepAlive(fullRequest)) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set("CONNECTION", KEEP_ALIVE);
ctx.write(response);
}
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ public class HttpInboundServer {
private static Logger logger = LoggerFactory.getLogger(HttpInboundServer.class);

private int port;

private String proxyServer;

public HttpInboundServer(int port, String proxyServer) {
this.port=port;
this.port = port;
this.proxyServer = proxyServer;
}

public void run() throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap b = new ServerBootstrap();
Expand All @@ -43,8 +43,10 @@ public void run() throws Exception {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpInboundInitializer(this.proxyServer));
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpInboundInitializer(this.proxyServer));

Channel ch = b.bind(port).sync().channel();
logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
//package io.github.kimmking.gateway.outbound;
//
//import io.netty.bootstrap.Bootstrap;
//import io.netty.channel.ChannelFuture;
//import io.netty.channel.ChannelInitializer;
//import io.netty.channel.ChannelOption;
//import io.netty.channel.EventLoopGroup;
//import io.netty.channel.nio.NioEventLoopGroup;
//import io.netty.channel.socket.SocketChannel;
//import io.netty.channel.socket.nio.NioSocketChannel;
//import io.netty.handler.codec.http.HttpRequestEncoder;
//import io.netty.handler.codec.http.HttpResponseDecoder;
//
//public class NettyHttpClient {
// package io.github.kimmking.gateway.outbound;
//
// import io.netty.bootstrap.Bootstrap;
// import io.netty.channel.ChannelFuture;
// import io.netty.channel.ChannelInitializer;
// import io.netty.channel.ChannelOption;
// import io.netty.channel.EventLoopGroup;
// import io.netty.channel.nio.NioEventLoopGroup;
// import io.netty.channel.socket.SocketChannel;
// import io.netty.channel.socket.nio.NioSocketChannel;
// import io.netty.handler.codec.http.HttpRequestEncoder;
// import io.netty.handler.codec.http.HttpResponseDecoder;
//
// public class NettyHttpClient {
// public void connect(String host, int port) throws Exception {
// EventLoopGroup workerGroup = new NioEventLoopGroup();
//
Expand All @@ -25,7 +25,7 @@
// public void initChannel(SocketChannel ch) throws Exception {
// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
// ch.pipeline().addLast(new HttpResponseDecoder());
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
// // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
// ch.pipeline().addLast(new HttpRequestEncoder());
// ch.pipeline().addLast(new HttpClientOutboundHandler());
// }
Expand All @@ -34,7 +34,7 @@
// // Start the client.
// ChannelFuture f = b.connect(host, port).sync();
//
//
//
// f.channel().write(request);
// f.channel().flush();
// f.channel().closeFuture().sync();
Expand All @@ -48,4 +48,4 @@
// NettyHttpClient client = new NettyHttpClient();
// client.connect("127.0.0.1", 8844);
// }
//}
// }
Loading