Skip to content
This repository has been archived by the owner on Apr 24, 2018. It is now read-only.

Commit

Permalink
多连接服务端
Browse files Browse the repository at this point in the history
  • Loading branch information
cwdtom committed Apr 16, 2018
1 parent 66bbd3a commit 1267624
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
@@ -1,7 +1,7 @@
# Poseidon


![Version](https://img.shields.io/badge/version-2.2.1-green.svg)
![Version](https://img.shields.io/badge/version-2.3.0-green.svg)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](http://opensource.org/licenses/MIT)

## Overview
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.cwdtom</groupId>
<artifactId>poseidon</artifactId>
<version>2.2.1</version>
<version>2.3.0</version>
<description>gather logs in different places</description>
<packaging>jar</packaging>
<name>Poseidon</name>
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/com/github/cwdtom/poseidon/PoseidonRegister.java
Expand Up @@ -26,12 +26,16 @@
*/
@Slf4j
public class PoseidonRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware {
/**
* 连接数等于cpu核心数
*/
private static final Integer THREAD_SUM = Runtime.getRuntime().availableProcessors();
/**
* 线程池
*/
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3,
5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
new DefaultThreadFactory(3));
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(THREAD_SUM, THREAD_SUM,
5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(THREAD_SUM),
new DefaultThreadFactory(THREAD_SUM));
/**
* spring环境变量
*/
Expand Down Expand Up @@ -61,7 +65,7 @@ public Thread newThread(Runnable r) {
this.count.decrementAndGet();
return null;
}
return new Thread(r);
return new Thread(r, "poseidon-" + this.count.toString());
}
}

Expand All @@ -84,8 +88,10 @@ public void registerBeanDefinitions(
String[] tmp = host.split(":");
// 间隔时间秒->毫秒,为空时初始化默认10秒
Long reconnectInterval = ri == null ? 10 * 1000 : Long.parseLong(ri) * 1000;
PoseidonRegister.threadPoolExecutor.execute(
new PoseidonSend(tmp[0], Integer.parseInt(tmp[1]), reconnectInterval));
for (int i = 0; i < PoseidonRegister.THREAD_SUM; i++) {
PoseidonRegister.threadPoolExecutor.execute(
new PoseidonSend(tmp[0], Integer.parseInt(tmp[1]), reconnectInterval));
}
} else if (port != null && host == null) {
// server端
// 开放指定端口接收日志
Expand Down
Expand Up @@ -48,6 +48,7 @@ public void run() {
* 启动socket连接
*/
private void start() {
// cpu核数
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
Expand Down
Expand Up @@ -13,6 +13,7 @@
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 监听注册端口
Expand All @@ -27,6 +28,10 @@ public class PoseidonSocket implements Runnable {
* 端口
*/
private Integer port;
/**
* 连接计数
*/
private static AtomicInteger count = new AtomicInteger(0);

@Override
public void run() {
Expand Down Expand Up @@ -91,5 +96,15 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
log.warn(ctx.channel().remoteAddress().toString() + " is offline.");
ctx.channel().close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("ON: connection sum is " + PoseidonSocket.count.incrementAndGet());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("OFF: connection sum is " + PoseidonSocket.count.decrementAndGet());
}
}
}

0 comments on commit 1267624

Please sign in to comment.