Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

1. App-> EchoApp, ExecutorApp으로 세분화

 1) EchoApp: 기존의 EchoApp기능과 동일함
 2) ExecutorApp: Biz Logic을 태울경우에 사용하는 ExecutionHandler를 추가한 App

2. 기능 변경
 1) PingPongHandler: Client -> Server로 주기적인 Ping 처리
 2) UptimeClientHandler: 서버가 끊어진 경우 재접속 처리

3. Stress Test 보강
 - StressTestServerApp, StressTestServerHandler

4. BsonEncoder/Decoder refactoring

5. 기본 환경 변경
  1) netty 버젼 변경: 3.2.7.Final -> 3.4.4.Final 변경
  2) JUnit 변경: 3.8.1 -> 4.8
  3) SLF4J의 Logger변경 : JDK14 -> LOG4J 변경
  • Loading branch information...
commit ce5e466c71140ca670059e6534908da457d3642e 1 parent e9a7b32
@hypermin authored
View
27 java/pom.xml
@@ -8,7 +8,8 @@
<packaging>jar</packaging>
<name>echobot</name>
- <url>http://maven.apache.org</url>
+ <description>kakao echobot</description>
+ <url>http://kakao.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -18,7 +19,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
+ <version>4.8</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -32,29 +33,41 @@
<version>2.7.2</version>
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
+ <groupId>io.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.2.7.Final</version>
+ <version>3.4.4.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
+ <artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.16</version>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
- <mainClass>com.kakao.bot.echobot.App</mainClass>
+ <mainClass>com.kakao.bot.echobot.EchoApp</mainClass>
</manifest>
</archive>
<descriptorRefs>
View
119 java/src/main/java/com/kakao/bot/echobot/App.java
@@ -1,119 +0,0 @@
-package com.kakao.bot.echobot;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Executors;
-
-import org.bson.BSON;
-import org.bson.BSONObject;
-import org.bson.BasicBSONObject;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import com.mongodb.util.JSON;
-
-/**
- * EchoBot
- */
-public class App {
-
- public static String host = "localhost";
- public static int port = 8080;
-
- private static NioClientSocketChannelFactory clientSocketChannel = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-
- public static void main(String[] args) {
-
- if (args.length > 1) {
- host = args[0];
- System.out.println(host);
- port = Integer.parseInt(args[1]);
- System.out.println(port);
- }
- createClient();
-
- }
-
- public static void createClient() {
- final ClientBootstrap bootstrap = new ClientBootstrap(
- clientSocketChannel);
- final BotProtocol botImpl = new EchoBotProtocolImpl();
-
- bootstrap.setOption("connectTimeoutMillis", 1000);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = pipeline();
- pipeline.addLast("decoder", new BsonDecoder());
- pipeline.addLast("encoder", new BsonEncoder());
- pipeline.addLast("handler", new BotHandler(bootstrap, botImpl,
- "testtest", "testtest"));
- return pipeline;
- }
- });
-
- bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
- bootstrap.connect();
- }
-
- public static class EchoBotProtocolImpl extends BotProtocol {
- @Override
- public void handle_request(Channel channel, BSONObject bsonIn) {
- try {
- long user_key = (Long) bsonIn.get("user_key");
- long room_key = (Long) bsonIn.get("room_key");
- int msgId = (Integer) bsonIn.get("msg_id");
-
- String message = (String) bsonIn.get("message");
-
- BSONObject bsonOut = new BasicBSONObject();
- bsonOut.put("type", "response");
- bsonOut.put("room_key", room_key);
- bsonOut.put("user_key", user_key);
- bsonOut.put("msg_id", msgId);
- bsonOut.put("message", "1:" + message);
- List msgs = new ArrayList();
- msgs.add("2:" + message);
- msgs.add("3:"
- + (new StringBuffer(message)).reverse().toString());
-
- logger.debug("OUT => " + bsonOut);
-
- channel.write(BSON.encode(bsonOut));
- } catch (Exception e1) {
- e1.printStackTrace();
- }
- }
-
- @Override
- public void handle_add(Channel channel, BSONObject e) {
- long userKey = (Long) e.get("user_key");
- long roomKey = (Long) e.get("room_key");
- int msgId = (Integer) e.get("msg_id");
-
- BSONObject bOut = new BasicBSONObject();
- bOut.put("type", "response");
- bOut.put("room_key", roomKey);
- bOut.put("user_key", userKey);
- bOut.put("msg_id", msgId);
- bOut.put("message", "thank you for add me.");
-
- channel.write(BSON.encode(bOut));
-
- logger.debug("LOGIN user(%d) ", userKey);
- }
- }
-}
View
95 java/src/main/java/com/kakao/bot/echobot/BotHandler.java
@@ -1,95 +0,0 @@
-package com.kakao.bot.echobot;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import org.bson.BSON;
-import org.bson.BSONObject;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.util.JSON;
-
-public class BotHandler extends SimpleChannelUpstreamHandler {
- static BSONObject loginPkt;
-
- BotProtocol protocol;
- final ClientBootstrap bootstrap;
-
- Logger logger = LoggerFactory.getLogger(BotHandler.class);
-
- Class[] paramTypes = new Class[] { Channel.class, BSONObject.class };
-
-
- BotHandler(final ClientBootstrap bootstrap, BotProtocol protocol, String id, String pass) {
- this.bootstrap = bootstrap;
- this.protocol = protocol;
- loginPkt = (BSONObject)JSON.parse("{ \"type\":\"login\", \"id\":\"" + id + "\", \"pass\":\"" + pass + "\" } ");
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
- // logic insert
- Object obj = e.getMessage();
- if (obj instanceof byte[]) {
- BSONObject bsonIn = BSON.decode((byte[]) obj);
-
- logger.debug("BsonIn = " + bsonIn);
-
- String type = (String) bsonIn.get("type");
- if (type == null) {
- // error
- logger.debug("null");
- } else {
-
- Method method;
- try {
- method = BotProtocol.class.getMethod("handle_"+type, paramTypes);
- method.invoke(protocol, ctx.getChannel(), bsonIn);
- } catch (SecurityException ex) {
- ex.printStackTrace();
- } catch (NoSuchMethodException e1) {
- e1.printStackTrace();
- logger.info("method type not exist" + type);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- } else {
- logger.debug("unknown " + obj.toString());
- }
- }
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
- logger.debug("channel Closed. reconnect.");
- try { Thread.sleep(5000); } catch (Exception ex) { logger.debug(ex.getMessage()); }
- bootstrap.connect();
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- logger.debug("connected.");
- ctx.getChannel().write(loginPkt);
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- logger.debug("channelDisconnected");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- e.getChannel().close();
- }
-}
View
40 java/src/main/java/com/kakao/bot/echobot/BotProtocol.java
@@ -1,40 +0,0 @@
-package com.kakao.bot.echobot;
-
-import org.bson.BSON;
-import org.bson.BSONObject;
-import org.bson.BasicBSONObject;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BotProtocol {
- Logger logger = LoggerFactory.getLogger(BotProtocol.class);
-
- // must be implemented
- public void handle_ping(Channel channel, BSONObject e) {
- BSONObject out = new BasicBSONObject();
- out.put("type", "pong");
- out.put("time", (Long) e.get("time"));
- channel.write(BSON.encode(out));
- }
-
- // must be implemented
- public void handle_request(Channel channel, BSONObject e) {
- }
-
- public void handle_add(Channel channel, BSONObject e) {
- logger.debug("LOGIN user(%d) ", (Long) e.get("user_key"));
- }
-
- public void handle_block(Channel channel, BSONObject e) {
- logger.debug("BLOCKED user(%d) ", (Long) e.get("user_key"));
- }
-
- public void handle_leave(Channel channel, BSONObject e) {
- logger.debug("LEAVE user(%d) ", (Long) e.get("user_key"));
- }
-
- public void handle_result(Channel channel, BSONObject e) {
- logger.debug("Result : " + e);
- }
-}
View
63 java/src/main/java/com/kakao/bot/echobot/BsonDecoder.java
@@ -1,36 +1,73 @@
package com.kakao.bot.echobot;
+import org.bson.BSON;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BsonDecoder extends FrameDecoder {
+ private static final Logger LOG = LoggerFactory.getLogger(BsonDecoder.class);
+ private static final int MAXDATASIZE = 10*1024; // 10KB가 넘으면 illegal data로 인식함
+
+
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
- throws Exception {
- int readable = buffer.readableBytes();
+ protected Object decode(ChannelHandlerContext channelhandlercontext,
+ Channel channel, ChannelBuffer channelbuffer) throws Exception {
+ try {
+ byte[] bytes = decodeToByteArray (channelhandlercontext, channel, channelbuffer);
+
+ // bytes가 null 이면 이것은 illegal 접근으로 판단함, UpStream으로 data를 전달하지 않음
+ if ( bytes == null ) {
+ return null;
+ }
+ return BSON.decode(bytes);
+ } catch (Exception e) {
+ LOG.warn("exception occurred during decoding process", e);
+ return null;
+ }
+ }
+
+ protected byte[] decodeToByteArray (ChannelHandlerContext channelhandlercontext,
+ Channel channel, ChannelBuffer channelbuffer) {
+
+ int readable = channelbuffer.readableBytes();
+ // channelbuffer에 readable size에 대한 validation check
if (readable < 5) {
+ LOG.warn("readable byte is too small!!, must be over 5 bytes, readable:"+readable);
return null;
}
-
- buffer.markReaderIndex();
- int size = Integer.reverseBytes(buffer.getInt(buffer.readerIndex()));
- if (size < 0 || size > 100000) {
- channel.close();
+
+ /**
+ * BSON은 ByteOrder가 little-endian임, java는 big-endian이기 때문에 변환할 필요가 있음
+ * http://bsonspec.org/#/specification
+ */
+ channelbuffer.markReaderIndex();
+ int size = Integer.reverseBytes(channelbuffer.getInt(channelbuffer.readerIndex()));
+ // size가 0보다 작음 => overflow 된 경우임
+ // size가 maxHeapSize보다 큰 경우 => flooding 상태
+ if (size < 0 || size > MAXDATASIZE) {
+ LOG.warn("size invalid!! size:"+size+", max:"+MAXDATASIZE + ", readable:"+readable);
return null;
}
-
+
+ // size가 readable 보다 큰 경우!!
if (size > readable) {
- buffer.resetReaderIndex();
+ // The whole bytes were not received yet - return null
+ // This method will be invoked again when more packets are
+ // received and appended to the buffer.
+
+ // Reset to the marked position to read the length field again
+ // next time
+ channelbuffer.resetReaderIndex();
return null;
}
byte[] bt = new byte[size];
- buffer.readBytes(bt);
-
+ channelbuffer.readBytes(bt);
return bt;
}
-
}
View
39 java/src/main/java/com/kakao/bot/echobot/BsonEncoder.java
@@ -2,31 +2,36 @@
import org.bson.BSON;
import org.bson.BSONObject;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BsonEncoder extends OneToOneEncoder {
+ private static final Logger LOG = LoggerFactory.getLogger(BsonEncoder.class);
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (msg instanceof byte[]) {
- ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
- buffer.writeBytes((byte[]) msg);
-
- return buffer;
- } else if (msg instanceof BSONObject) {
- System.out.println("encode:BSONObject");
- ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
- buffer.writeBytes(BSON.encode((BSONObject) msg));
-
- return buffer;
- } else {
- System.out.println(this.getClass().getName() + " : unknown type." + msg);
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+ ChannelBuffer buffer = null;
+
+ // null 인경우에는 DownStream으로 전달이 안됨
+ if ( !( obj instanceof BSONObject ) ) {
+ LOG.warn ("obj isn't BSONObject, "+obj);
+ return null;
+ }
+ BSONObject bsonOut = (BSONObject) obj;
+ byte[] baOut = null;
+ try {
+ buffer = ChannelBuffers.dynamicBuffer();
+ baOut = BSON.encode(bsonOut);
+ buffer.writeBytes( baOut );
+ }catch (Exception e) {
+ LOG.warn("exception occurred by"+obj, e);
}
- return null;
+ return buffer;
}
}
View
94 java/src/main/java/com/kakao/bot/echobot/EchoApp.java
@@ -0,0 +1,94 @@
+package com.kakao.bot.echobot;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+/**
+ * EchoBot
+ */
+public class EchoApp {
+
+ private static String host = "localhost";
+ private static int port = 10010;
+
+ /**
+ * OioClientSocketChannelFactory는 request가 많이 몰릴 경우 Live-Lock 현상이 발생했음
+ * NioClientSocketChannelFactory를 이용하면 Live-Lock 문제를 해결할 수 있음
+ */
+ private static final ClientSocketChannelFactory clientSocketChannel = newClientSocketChannelFactory (true);
+
+ private static ClientSocketChannelFactory newClientSocketChannelFactory (boolean nio) {
+ if ( nio ) {
+ return new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
+ }
+ return new OioClientSocketChannelFactory ( Executors.newCachedThreadPool() );
+ }
+
+ public static void main(String[] args) {
+
+ if (args.length > 1) {
+ host = args[0];
+ System.out.println(host);
+ port = Integer.parseInt(args[1]);
+ System.out.println(port);
+ }
+ createClient();
+ }
+
+
+ private static void createClient() {
+ final ClientBootstrap bootstrap = new ClientBootstrap(clientSocketChannel);
+ final Timer timer = new HashedWheelTimer();
+
+ bootstrap.setOption("connectTimeoutMillis", 1000);
+ /**
+ * tcpNoDelay 옵션, true: Nagle Algorithm을 사용하지 않음, false: Nagle Algorithm을 사용함
+ * 작은 패킷에 대해서 응답을 빠르게 줘야 할 경우(채팅 서버, 게임서버 등은 true로 사용하는게 낳음)
+ * Nagle Algorithm은 작은 패킷을 모아서 한번에 보내주는 것으로 latency이 조금 있더라도 overal performance측면에서 좋음
+ */
+ bootstrap.setOption("tcpNoDelay", true);
+ /**
+ * KeepAlive는 종단 시스템중 하나가 다움될 때 발생할 수 있는 한쪽만 열린 상태를 정리하는 것
+ * - Checking for dead peers
+ * - Preventing disconnection due to network inactivity
+ */
+ bootstrap.setOption("keepAlive", true);
+ /**
+ * reuseAddress는 클라이언트 접속시 할당되는 Port를 기다림없이 재활용하고 싶을때
+ */
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("sendBufferSize", 20*1024*1024); // 20MB, 상황에 따라 조절
+ bootstrap.setOption("receiveBufferSize", 20*1024*1024); // 20MB, 상황에 따라 조절
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
+
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("uptimeClient", new UptimeClientHandler(bootstrap, timer, 3000)); // connection 이 끊어졌는지 체크!!
+
+ pipeline.addLast("decoder", new BsonDecoder());
+ pipeline.addLast("encoder", new BsonEncoder());
+ // 1분에 한번씩 ping check
+ pipeline.addLast("pingpong", new PingPongHandler(bootstrap, timer, 10*1000));
+ // 100ms뒤에 응답을 주는 Echo
+ pipeline.addLast("handler", new EchoBotHandler(bootstrap, timer, "tayo", "pass", 0));
+ return pipeline;
+ }
+ });
+
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
+ bootstrap.connect();
+ }
+}
View
182 java/src/main/java/com/kakao/bot/echobot/EchoBotHandler.java
@@ -0,0 +1,182 @@
+package com.kakao.bot.echobot;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONObject;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EchoBotHandler extends SimpleChannelUpstreamHandler{
+ Logger LOG = LoggerFactory.getLogger(EchoBotHandler.class);
+ private final ClientBootstrap bootstrap;
+ private final Timer timer;
+ private final String id;
+ private final String pass;
+ private final int delay;
+
+ private static final Map<String, TYPE> map = new HashMap<String, TYPE> ();
+
+ public enum TYPE{
+ ADD ("add"),
+ BLOCK ("block"),
+ LEAVE ("leave"),
+ REQUEST ("request"),
+ RESPONSE ("response"),
+ LOGIN("login"),
+ PING("ping"),
+ PONG("pong"),
+ RESULT ("result");
+
+ private final String type;
+ private TYPE(String type) {
+ this.type = type;
+ }
+ private final String getType () {
+ return this.type;
+ }
+
+ public static TYPE parse (String type) {
+ if ( map.isEmpty() ) {
+ for ( TYPE item : values() ) {
+ map.put(item.getType(), item);
+ }
+ }
+ return map.get(type);
+ }
+ }
+
+ public EchoBotHandler(final ClientBootstrap bootstrap, Timer timer, String id, String pass, int delay) {
+ this.bootstrap = bootstrap;
+ this.timer = timer;
+ this.id = id;
+ this.pass = pass;
+ this.delay = delay;
+ }
+
+ private InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) bootstrap.getOption("remoteAddress");
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
+ Object obj = e.getMessage();
+ if ( !( obj instanceof BSONObject ) ) {
+ LOG.info("obj isn't type of BSONObject, obj:{}", obj);
+ return;
+ }
+
+ BSONObject bsonIn = (BSONObject)obj;
+ TYPE type = TYPE.parse((String) bsonIn.get("type"));
+ switch (type) {
+ case ADD: handleAdd (type, e.getChannel(), bsonIn); break;
+ case BLOCK: handleBlock (type, e.getChannel(), bsonIn); break;
+ case LEAVE: handleLeave (type, e.getChannel(), bsonIn); break;
+ case REQUEST: handleRequest(type, e.getChannel(), bsonIn); break;
+ case RESULT: handleResult (type, e.getChannel(), bsonIn); break;
+ case PING: handlePing (type, e.getChannel(), bsonIn); break;
+ case PONG: handlePong (type, e.getChannel(), bsonIn); break;
+ default: LOG.warn("invalid type:{}, bsonIn:{}", type, bsonIn);
+ }
+ }
+
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) {
+ LOG.debug("connected to {}", getRemoteAddress());
+ BSONObject login = new BasicBSONObject ();
+ login.put("type", "login");
+ login.put("id", this.id);
+ login.put("pass", this.pass);
+
+ e.getChannel().write(login);
+ }
+
+
+ protected void handlePing(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("PING ts({}) ", (Long) bsonIn.get("time"));
+ BSONObject out = new BasicBSONObject();
+ out.put("type", "pong");
+ out.put("time", (Long) bsonIn.get("time"));
+ channel.write(out);
+ }
+
+ protected void handlePong(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("PONG ts({})", (Long) bsonIn.get("time"));
+ }
+
+ protected void handleRequest(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("REQUEST user(%d) ", (Long) bsonIn.get("user_key"));
+
+ long userKey = (Long) bsonIn.get("user_key");
+ long roomKey = (Long) bsonIn.get("room_key");
+ int msgId = (Integer) bsonIn.get("msg_id");
+
+ String message = (String) bsonIn.get("message");
+
+ final BSONObject bsonOut = new BasicBSONObject();
+ bsonOut.put("type", "response");
+ bsonOut.put("room_key", roomKey);
+ bsonOut.put("user_key", userKey);
+ bsonOut.put("msg_id", msgId);
+ bsonOut.put("message", "1:" + message);
+ List<String> msgs = new ArrayList<String>();
+ msgs.add("2:" + message);
+ msgs.add("3:" +(new StringBuffer(message)).reverse().toString());
+
+ if ( delay > 0 ) {
+ timer.newTimeout( new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ LOG.debug("OUT => " + bsonOut);
+ channel.write(bsonOut);
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ } else {
+ LOG.debug("OUT => " + bsonOut);
+ channel.write(bsonOut);
+ }
+ }
+
+ protected void handleAdd(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("ADD user(%d) ", (Long) bsonIn.get("user_key"));
+
+ long userKey = (Long) bsonIn.get("user_key");
+ long roomKey = (Long) bsonIn.get("room_key");
+ int msgId = (Integer) bsonIn.get("msg_id");
+ BSONObject bOut = new BasicBSONObject();
+ bOut.put("type", "response");
+ bOut.put("room_key", roomKey);
+ bOut.put("user_key", userKey);
+ bOut.put("msg_id", msgId);
+ bOut.put("message", "thank you for add me.");
+
+ channel.write(bOut);
+ LOG.debug("LOGIN user(%d) ", userKey);
+ }
+
+ protected void handleBlock(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("BLOCKED user(%d) ", (Long) bsonIn.get("user_key"));
+ }
+
+ protected void handleLeave(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("LEAVE user(%d) ", (Long) bsonIn.get("user_key"));
+ }
+
+ protected void handleResult(TYPE type, final Channel channel, BSONObject bsonIn) {
+ LOG.debug("Result : " + bsonIn);
+ }
+}
View
115 java/src/main/java/com/kakao/bot/echobot/ExecutorApp.java
@@ -0,0 +1,115 @@
+package com.kakao.bot.echobot;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+/**
+ * Biz Logic을 태울 경우에 사용함!!
+ * MINA의 2중 Thread Pool Filter 개념을 차용(executionHandler를 2중으로 사용함)
+ * -> 실제 Nio에서는 Boss TP 1개, Worker TP 1개, OMA TP * 2개임
+ * 성능 테스트시에 상황에 맞게 튜닝 필요
+ * 추가정보: JCO 2009년 발표자료, 자바 네트워킹 기초에서 응용까지, http://gleamynode.net/file_download/11/JCO2009.pdf
+ */
+public class ExecutorApp {
+
+ private static String host = "localhost";
+ private static int port = 11010;
+
+ /**
+ * OioClientSocketChannelFactory는 request가 많이 몰릴 경우 Live-Lock 현상이 발생했음
+ * NioClientSocketChannelFactory를 이용하면 Live-Lock 문제를 해결할 수 있음
+ */
+ private static final ClientSocketChannelFactory clientSocketChannel = newClientSocketChannelFactory (true);
+
+ private static ClientSocketChannelFactory newClientSocketChannelFactory (boolean nio) {
+ if ( nio ) {
+ return new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
+ }
+ return new OioClientSocketChannelFactory ( Executors.newCachedThreadPool() );
+ }
+
+ public static void main(String[] args) {
+
+ if (args.length > 1) {
+ host = args[0];
+ System.out.println(host);
+ port = Integer.parseInt(args[1]);
+ System.out.println(port);
+ }
+ createClient();
+ }
+
+
+ private static void createClient() {
+ final ClientBootstrap bootstrap = new ClientBootstrap(clientSocketChannel);
+ final Timer timer = new HashedWheelTimer();
+
+ //상황에 따라 조절 필요~
+ final int maxMemory = 10*1024*1024;
+ final int coreThreadCnt = 8;
+
+ final OrderedMemoryAwareThreadPoolExecutor executor =
+ new OrderedMemoryAwareThreadPoolExecutor( coreThreadCnt, maxMemory, maxMemory);
+ final ExecutionHandler executionHandler = new ExecutionHandler(executor);
+
+ final OrderedMemoryAwareThreadPoolExecutor executor2 =
+ new OrderedMemoryAwareThreadPoolExecutor( coreThreadCnt, maxMemory, maxMemory);
+ final ExecutionHandler executionHandler2 = new ExecutionHandler(executor2);
+
+ bootstrap.setOption("connectTimeoutMillis", 1000);
+ /**
+ * tcpNoDelay 옵션, true: Nagle Algorithm을 사용하지 않음, false: Nagle Algorithm을 사용함
+ * 작은 패킷에 대해서 응답을 빠르게 줘야 할 경우(채팅 서버, 게임서버 등은 true로 사용하는게 낳음)
+ * Nagle Algorithm은 작은 패킷을 모아서 한번에 보내주는 것으로 latency이 조금 있더라도 overal performance측면에서 좋음
+ */
+ bootstrap.setOption("tcpNoDelay", true);
+ /**
+ * KeepAlive는 종단 시스템중 하나가 다움될 때 발생할 수 있는 한쪽만 열린 상태를 정리하는 것
+ * - Checking for dead peers
+ * - Preventing disconnection due to network inactivity
+ */
+ bootstrap.setOption("keepAlive", true);
+ /**
+ * reuseAddress는 클라이언트 접속시 할당되는 Port를 기다림없이 재활용하고 싶을때
+ */
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("sendBufferSize", 20*1024*1024); // 20MB, 상황에 따라 조절
+ bootstrap.setOption("receiveBufferSize", 20*1024*1024); // 20MB, 상황에 따라 조절
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
+
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("executor", executionHandler);
+ pipeline.addLast("uptimeClient", new UptimeClientHandler(bootstrap, timer, 3000)); // connection 이 끊어졌는지 체크!!
+
+ pipeline.addLast("decoder", new BsonDecoder());
+ pipeline.addLast("encoder", new BsonEncoder());
+
+ pipeline.addLast("executor2", executionHandler2);
+ // 1분에 한번씩 ping check
+ pipeline.addLast("pingpong", new PingPongHandler(bootstrap, timer, 60*1000));
+ // 100ms뒤에 응답을 주는 Echo
+ pipeline.addLast("handler", new EchoBotHandler(bootstrap, timer, "tayo", "pass", 100));
+ return pipeline;
+ }
+ });
+
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port));
+ bootstrap.connect();
+ }
+}
View
122 java/src/main/java/com/kakao/bot/echobot/PingPongHandler.java
@@ -0,0 +1,122 @@
+package com.kakao.bot.echobot;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONObject;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PingPongHandler extends SimpleChannelUpstreamHandler
+{
+ private static final Logger LOG = LoggerFactory.getLogger(PingPongHandler.class);
+ private static final long INIT_TIME = -1;
+
+ final ClientBootstrap bootstrap;
+ private final Timer timer;
+ private final int pingInterval;
+ private long pingTime = INIT_TIME;
+ private long pongTime = INIT_TIME;
+
+ public PingPongHandler(ClientBootstrap bootstrap, Timer timer, int interval) {
+ this.bootstrap = bootstrap;
+ this.timer = timer;
+ this.pingInterval = interval;
+ }
+
+ private InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) bootstrap.getOption("remoteAddress");
+ }
+
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception
+ {
+ Object obj = e.getMessage();
+
+ BSONObject pong = null;
+ if ( !(obj instanceof BSONObject) ) {
+ LOG.trace("obj isn't type of DomainObject, obj:{}",obj);
+ ctx.sendUpstream(e);
+ return;
+ }
+ pong = (BSONObject)obj;
+
+ // Pong Type 인지 체크함
+ if ( !pong.get("type").equals("pong") ) {
+ LOG.trace("obj isn't type of Pong, obj:{}",obj);
+ ctx.sendUpstream(e);
+ return;
+ }
+
+ // Pong에 대해서는 response를 줄 필요가 없음
+ pongTime = (Long) pong.get("time");
+
+ LOG.trace("Pong replied, pong:{}, pingTime:", pong, pingTime);
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ LOG.info("Connected to: {}",getRemoteAddress());
+ ctx.sendUpstream(e);
+ if ( pingInterval > 0 ) {
+ timer.newTimeout( new PingPongSchedule (e), pingInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void resetTime () {
+ this.pingTime = INIT_TIME;
+ this.pongTime = INIT_TIME;
+ }
+
+
+ class PingPongSchedule implements TimerTask {
+ final ChannelStateEvent e;
+ PingPongSchedule (ChannelStateEvent e) {
+ this.e = e;
+ }
+ public void run(Timeout timeout) {
+
+ /**
+ * 이전 ping시간과 pong시간을 체크
+ * - ping과 pong이 다르면 응답을 안준 케이스임 => 재접속
+ */
+ if ( pingTime != pongTime ) {
+ LOG.warn("ping/pong failed, ping:"+pingTime+", pong:"+pongTime+" ===> doing closed()");
+ e.getChannel().close();
+
+ resetTime ();
+ return;
+ }
+
+ pingTime = System.currentTimeMillis();
+ BSONObject ping = toPing(pingTime);
+ e.getChannel().write(ping);
+ LOG.debug("[send] ping:{}"+ping);
+
+ if ( pingInterval > 0 ) {
+ timer.newTimeout( new PingPongSchedule (e), pingInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private final BSONObject toPing (long pingTime) {
+ BSONObject bson = new BasicBSONObject ();
+ bson.put("type", "ping");
+ bson.put("time", pingTime);
+ return bson;
+ }
+ }
+
+
+
+}
View
102 java/src/main/java/com/kakao/bot/echobot/UptimeClientHandler.java
@@ -0,0 +1,102 @@
+package com.kakao.bot.echobot;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.WriteTimeoutException;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UptimeClientHandler extends SimpleChannelUpstreamHandler
+{
+ private static final Logger LOG = LoggerFactory.getLogger(UptimeClientHandler.class);
+ final ClientBootstrap bootstrap;
+ private final Timer timer;
+ private final int reconnectionDelay;
+ private long startTime = -1;
+
+ public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer, int reconnectionDelay) {
+ this.bootstrap = bootstrap;
+ this.timer = timer;
+ this.reconnectionDelay = reconnectionDelay;
+ }
+
+ InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) bootstrap.getOption("remoteAddress");
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ if ( LOG.isInfoEnabled() ) {
+ LOG.info(getMsgHead()+"Disconnected from: {}", getRemoteAddress());
+ }
+ ctx.sendUpstream(e);
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(getMsgHead()+"Sleeping for: {} ms", reconnectionDelay);
+ }
+
+ timer.newTimeout( new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ LOG.warn(getMsgHead()+"Reconnecting to: {}, reconnection delay:{}",getRemoteAddress(), reconnectionDelay);
+ bootstrap.connect();
+ }
+ }, reconnectionDelay, TimeUnit.MILLISECONDS);
+ ctx.sendUpstream(e);
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ if (startTime < 0) {
+ startTime = System.currentTimeMillis();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(getMsgHead()+"Connected to: {}",getRemoteAddress());
+ }
+ ctx.sendUpstream(e);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ Throwable cause = e.getCause();
+ LOG.warn("ctx:"+ctx+", Unexpected exception from downstream.", cause);
+
+ if (cause instanceof ConnectException) {
+ startTime = -1;
+ LOG.warn(getMsgHead()+"Failed to connect: {}", cause.getMessage());
+ }
+ if (cause instanceof ReadTimeoutException) {
+ // The connection was OK but there was no traffic for last period.
+ LOG.warn(getMsgHead()+"Disconnecting due to no inbound traffic, {}",cause.getMessage());
+ } else if ( cause instanceof WriteTimeoutException) {
+ LOG.warn(getMsgHead()+"Disconnecting due to no outbound traffic, {}", cause.getMessage());
+ } else {
+ LOG.warn(getMsgHead()+"Exception occurred!!", cause);
+
+ }
+ e.getChannel().close();
+ }
+
+ private String getMsgHead () {
+ if ( startTime < 0 ) {
+ return "[SERVER IS DOWN]";
+ } else {
+ return String.format("[UPTIME: %5ds]", (System.currentTimeMillis() - startTime) / 1000);
+ }
+ }
+}
View
47 java/src/main/resources/log4j.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <!-- Appenders -->
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out, System.err" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%t] %-5p: %c.%M - %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="debugFileAppender" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="file" value="log/debug-file.log" />
+ <param name="datePattern" value="'.'yyyy-MM-dd" />
+ <param name="append" value="true" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%t] %-5p: %c.%M - %m%n" />
+ <!-- <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" /> -->
+ </layout>
+ </appender>
+
+ <appender name="async-file" class="org.apache.log4j.AsyncAppender">
+ <param name="locationInfo" value="true" />
+ <param name="bufferSize" value="1024" />
+ <appender-ref ref="debugFileAppender" />
+ </appender>
+
+ <!-- Application loggers -->
+
+ <logger name="com.kakao.bot.echobot.EchoApp">
+ <level value="info" />
+ </logger>
+
+ <logger name="com.kakao.bot.echobot.StressTestServerHandler">
+ <level value="info" />
+ </logger>
+
+ <!-- Root Logger -->
+ <root>
+ <priority value="warn" />
+ <appender-ref ref="async-file" />
+ <appender-ref ref="console" />
+ </root>
+
+</log4j:configuration>
View
69 java/src/test/java/com/kakao/bot/echobot/StressTestServerApp.java
@@ -0,0 +1,69 @@
+/**
+ *
+ */
+package com.kakao.bot.echobot;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author hypermin
+ *
+ */
+public class StressTestServerApp
+{
+ private static final Logger LOG = LoggerFactory.getLogger(StressTestServerApp.class);
+ private static int port = 10010;
+ private static int requestPerSeconds = 10000;
+ private static int testTime = 20;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("StressTestServer start");
+ if (args.length > 1) {
+ port = Integer.parseInt(args[0]);
+ LOG.info("port:{}",port);
+ }
+ if ( args.length == 3) {
+ requestPerSeconds = Integer.parseInt(args[1]);
+ testTime = Integer.parseInt(args[2]);
+ LOG.info("rps:{}, testTime:{}",requestPerSeconds, testTime);
+ }
+ startServer (port, requestPerSeconds, testTime);
+ }
+
+ private static void startServer (final int port, final int rps, final int tt) {
+ LOG.info("port:{}, rps:{}, testTime:{}",new Object[]{port, rps, tt});
+
+ // Configure the server.
+ ServerBootstrap bootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("bsonDecoder", new BsonDecoder()); //
+ pipeline.addLast("bsonEncoder", new BsonEncoder());
+ pipeline.addLast("handler", new StressTestServerHandler( rps, tt ));
+ return pipeline;
+ }
+ });
+
+ // Bind and start to accept incoming connections.
+ InetSocketAddress address = new InetSocketAddress( port );
+ bootstrap.bind(address);
+ System.out.println("bind. address:"+address);
+
+ }
+}
View
181 java/src/test/java/com/kakao/bot/echobot/StressTestServerHandler.java
@@ -0,0 +1,181 @@
+package com.kakao.bot.echobot;
+
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONObject;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StressTestServerHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StressTestServerHandler.class);
+
+ private final AtomicLong sendCnt = new AtomicLong ();
+ private final AtomicLong recvCnt = new AtomicLong ();
+ private final AtomicLong sendCntPS = new AtomicLong ();
+ private final AtomicLong recvCntPS = new AtomicLong ();
+
+ private final AtomicInteger executeTime = new AtomicInteger ();
+
+ private final int requestPerSeconds;
+ private final int testTime;
+ private long start = 0;
+
+ private boolean first = true;
+ private static boolean useDetailLog = false;
+
+ ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
+ ScheduledFuture<?> timerFuture = null;
+
+
+ public StressTestServerHandler (int requestPerSeconds, int testTime) {
+ LOG.info("BotStressRandomTestHandler start, rps:"+requestPerSeconds+", time:"+testTime);
+ this.requestPerSeconds = requestPerSeconds;
+ this.testTime = testTime;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ final MessageEvent e) {
+ Object obj = e.getMessage();
+ BSONObject bsonIn = null;
+ BSONObject bsonOut = null;
+ do {
+ if ( !( obj instanceof BSONObject) ) {
+ LOG.info("obj isn't type of BSONObject, obj:"+obj);
+ break;
+ }
+ bsonIn = (BSONObject)obj;
+ if ( useDetailLog ) {
+ LOG.info("BsonIn = " + bsonIn);
+ }
+
+ String type = (String) bsonIn.get("type");
+ if (type == null) {
+ LOG.info("null");
+ } else if (type.equals("request")) {
+ LOG.debug("request");
+ } else if (type.equals("response")) {
+ recvCnt.incrementAndGet();
+ recvCntPS.incrementAndGet();
+ sendResult (e);
+ } else if (type.equals("login")) {
+ LOG.debug("login ");
+ sendResult (e);
+ startRandomTest (e);
+ } else if (type.equals("result")) {
+ LOG.debug("result ");
+ } else if (type.equals("add")) {
+ LOG.debug("add");
+ sendResult ( e );
+ } else if ( type.equals("ping")) {
+ LOG.debug("ping:{}", bsonIn);
+ bsonOut = new BasicBSONObject();
+ bsonOut.put("type", "pong");
+ bsonOut.put("time", bsonIn.get("time"));
+ e.getChannel().write(bsonOut);
+ }
+ } while(false);
+ }
+
+ private void sendResult (final MessageEvent e) {
+ BSONObject bOut = new BasicBSONObject();
+ bOut.put("type", "result");
+ bOut.put("code", 200);
+ bOut.put("msg_id", 0 );
+ bOut.put("msg", "테스트" );
+ if ( useDetailLog ) {
+ LOG.info("BsonOut = " + bOut );
+ }
+ e.getChannel().write(bOut);
+ }
+
+ private void startRandomTest (final MessageEvent e) {
+ if ( !first ) return;
+ start = System.currentTimeMillis();
+ first = false;
+
+ final Random rand = new Random (System.currentTimeMillis());
+
+ class MyWorker extends TimerTask {
+ public void run() {
+ try {
+ if ( executeTime.getAndIncrement() >= testTime ) {
+ LOG.info("[stop]"+getStatistics() );
+ return;
+ }
+
+ for ( int i = 0; i < requestPerSeconds; i++ ) {
+ BSONObject bOut = new BasicBSONObject();
+ int msgId = Math.abs( rand.nextInt() % 10000);
+ bOut.put("type", "request");
+ bOut.put("user_key", Math.abs( rand.nextLong() % 10000) );
+ bOut.put("room_key", Math.abs( rand.nextLong() % 10000) );
+ bOut.put("msg_id", msgId );
+ bOut.put("country_iso", "KR");
+ bOut.put("message", getMessage(msgId) );
+ if ( useDetailLog ) {
+ LOG.info("BsonOut = " + bOut );
+ }
+ e.getChannel().write( bOut );
+ sendCnt.incrementAndGet();
+ sendCntPS.incrementAndGet();
+ }
+ LOG.info("[send]"+getStatistics() );
+ } finally {
+ sendCntPS.set(0);
+ recvCntPS.set(0);
+ }
+ }
+ };
+
+ if (timerFuture != null) {
+ //cancel execution of the future task (TimerPopTask())
+ //If task is already running, do not interrupt it.
+ timerFuture.cancel(false);
+ }
+ timerFuture = timer.scheduleAtFixedRate( new MyWorker(), 0, 1, TimeUnit.SECONDS);
+ }
+
+ private String getStatistics () {
+ return "send:("+ sendCnt.intValue()+", "+sendCntPS.intValue()+"), receive:("+recvCnt.intValue()+", "+recvCntPS.intValue()+"), execute:("+executeTime.intValue()+"/"+ testTime +"), ts:"+getDuration();
+ }
+
+ private String getMessage (int msgId) {
+ return "테스트, msgId:"+msgId;
+ }
+
+ private long getDuration () {
+ return System.currentTimeMillis() - start;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) {
+ LOG.info("connected.");
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ LOG.info("connection closed.");
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.