Skip to content
This repository has been archived by the owner on Apr 24, 2018. It is now read-only.

Commit

Permalink
1.修复bug
Browse files Browse the repository at this point in the history
2.更新版本
  • Loading branch information
cwdtom committed Mar 22, 2018
1 parent dbc7eb6 commit ef8b4d4
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 61 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Poseidon


![Version](https://img.shields.io/badge/version-1.3.0-green.svg)
![Version](https://img.shields.io/badge/version-2.0.0-green.svg)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](http://opensource.org/licenses/MIT)

## Overview
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.cwdtom</groupId>
<artifactId>poseidon</artifactId>
<version>1.3.0</version>
<version>2.0.0</version>
<description>gather logs in different places</description>
<packaging>jar</packaging>
<name>Poseidon</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class Message {
public Message (Integer level, String msg) {
byte[] data = msg.getBytes();
this.length = data.length;
this.length = level;
this.level = level;
this.data = data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
import lombok.EqualsAndHashCode;
import org.slf4j.Marker;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* slf4j日志拦截器
Expand All @@ -22,19 +21,16 @@
@EqualsAndHashCode(callSuper = true)
@Data
public class PoseidonFilter extends TurboFilter {
public static TransferQueue<Message> queue = new LinkedTransferQueue<>();
public static Queue<Message> queue = new ConcurrentLinkedQueue<>();

@Override
public FilterReply decide(Marker marker, Logger logger, Level level, String format, Object[] params, Throwable t) {
if (level.toInt() < Level.INFO_INT || format == null) {
return FilterReply.NEUTRAL;
}
try {
byte[] data = String.format("[%s] %s", logger.getName(), format).getBytes("utf-8");
// 推送至队列中
PoseidonFilter.queue.offer(new Message(data.length, level.levelInt, data));
} catch (UnsupportedEncodingException ignored) {
}
String msg = String.format("[%s] %s", logger.getName(), format);
// 推送至队列中
PoseidonFilter.queue.offer(new Message(Level.INFO_INT, msg));
return FilterReply.ACCEPT;
}
}
62 changes: 25 additions & 37 deletions src/main/java/com/github/cwdtom/poseidon/socket/PoseidonSend.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.cwdtom.poseidon.socket;

import ch.qos.logback.classic.Level;
import com.github.cwdtom.poseidon.entity.Message;
import com.github.cwdtom.poseidon.filter.PoseidonFilter;
import io.netty.bootstrap.Bootstrap;
Expand All @@ -12,6 +11,8 @@
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Level;


/**
* 发送日志消息
Expand Down Expand Up @@ -52,15 +53,13 @@ private void start() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 1, 0));
p.addLast(new Encoder());
p.addLast(new SendHandler());
p.addLast(new IdleStateHandler(0, 15, 0));
p.addLast(new HeartbeatHandler());
}
});
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.connect(this.ip, this.port).sync();
ChannelFuture channelFuture = bootstrap.connect(this.ip, this.port).sync();
if (channelFuture.isSuccess()) {
// 重连时间间隔初始化
this.reconnectInterval = this.initReconnectInterval;
Expand All @@ -70,16 +69,15 @@ protected void initChannel(SocketChannel socketChannel) {
} catch (InterruptedException e) {
log.warn("poseidon start fail", e);
} finally {
reconnect(channelFuture);
eventLoopGroup.shutdownGracefully();
reconnect();
}
}

/**
* 重连
*
* @param channelFuture 连接
*/
private void reconnect(ChannelFuture channelFuture) {
private void reconnect() {
try {
// 防止频繁重连,消耗资源
Thread.sleep(this.reconnectInterval);
Expand All @@ -90,13 +88,8 @@ private void reconnect(ChannelFuture channelFuture) {
log.error("poseidon reconnect fail, exit");
return;
}
if (null != channelFuture) {
if (channelFuture.channel() != null && channelFuture.channel().isOpen()) {
channelFuture.channel().close();
}
}
// 重连
start();
this.start();
}

/**
Expand All @@ -116,31 +109,26 @@ protected void encode(ChannelHandlerContext channelHandlerContext, Message messa
* 处理发送
*/
private class SendHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
Message tmp = null;
try {
while (true) {
tmp = PoseidonFilter.queue.take();
// 阻塞
ctx.writeAndFlush(tmp);
}
} catch (InterruptedException e) {
// 发送失败时将失败message放回队列
if (tmp != null) {
PoseidonFilter.queue.offer(tmp);
}
}
}
}
private Integer idleCount = 0;
private Integer heartbeatInterval = 15;

/**
* 处理心跳
*/
private class HeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 处理心跳
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ctx.writeAndFlush(new Message(Level.INFO_INT, "heartbeat"));
if (PoseidonFilter.queue.isEmpty()) {
this.idleCount++;
if (this.idleCount > this.heartbeatInterval) {
ctx.writeAndFlush(new Message(Level.INFO_INT, "heartbeat"));
this.idleCount = 0;
}
return;
}
this.idleCount = 0;
while (!PoseidonFilter.queue.isEmpty()) {
ctx.writeAndFlush(PoseidonFilter.queue.poll());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ protected void initChannel(SocketChannel socketChannel) {
p.addLast(new Decode());
p.addLast(new HandlerMessage());
p.addLast(new IdleStateHandler(60, 0, 0));
p.addLast(new HeartbeatHandler());
}
});
try {
Expand Down Expand Up @@ -89,6 +88,8 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB
}
byte[] body = new byte[length];
byteBuf.readBytes(body);
// 释放已读buffer
byteBuf.discardReadBytes();
list.add(new Message(length, level, body));
}
}
Expand All @@ -97,7 +98,6 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB
* 处理消息
*/
private class HandlerMessage extends SimpleChannelInboundHandler<Message> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
InetSocketAddress isa = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
Expand All @@ -118,15 +118,14 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message
log.info(logStr);
}
}
}

/**
* 处理心跳
*/
private class HeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 处理心跳
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.channel().close().sync();
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
log.warn(ctx.channel().remoteAddress().toString() + " is offline.");
ctx.channel().close();
}
}
}
4 changes: 2 additions & 2 deletions src/test/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ spring.application.name=poseidon

# 以下两个配置只需存在一个,存在port表示本项目为master项目,存在master.host表示本项目为salve项目
# socket端口地址
poseidon.port=10001
# poseidon.port=10001
# 记录主机地址
# poseidon.master.host=127.0.0.1:10002
poseidon.master.host=127.0.0.1:10000
# 尝试重连时间间隔,单位秒
poseidon.reconnect-interval=10

0 comments on commit ef8b4d4

Please sign in to comment.