@@ -7,26 +7,28 @@
import org.kiteq.protocol.KiteRemoting.BytesMessage;
import org.kiteq.protocol.KiteRemoting.StringMessage;

import java.util.List;

/**
* @author gaofeihang
* @since Feb 25, 2015
*/
public interface KiteClient {
void setPublishTopics(String[] topics);
void setBindings(Binding[] bindings);

void setPublishTopics(List<String> topics);

void setBindings(List<Binding> bindings);

SendResult sendStringMessage(StringMessage message) throws NoKiteqServerException;

SendResult sendBytesMessage(BytesMessage message) throws NoKiteqServerException;

SendResult sendTxMessage(StringMessage message, TxCallback txCallback) throws NoKiteqServerException;

SendResult sendTxMessage(BytesMessage message, TxCallback txCallback) throws NoKiteqServerException;
void start();

void init() throws Exception;

void close();

}
@@ -0,0 +1,79 @@
package org.kiteq.client;

import org.kiteq.client.message.Message;
import org.kiteq.client.message.MessageListener;
import org.kiteq.client.message.TxResponse;
import org.kiteq.client.util.AckUtils;
import org.kiteq.client.util.MessageUtils;

import org.kiteq.protocol.KiteRemoting;
import org.kiteq.protocol.Protocol;
import org.kiteq.protocol.packet.KitePacket;
import org.kiteq.remoting.listener.RemotingListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* Created by blackbeans on 12/15/15.
*/
public class QRemotingListener implements RemotingListener {

private static final Logger LOGGER = LoggerFactory.getLogger(QRemotingListener.class);


private MessageListener listener;

public QRemotingListener(MessageListener listener) {
this.listener = listener;
}

@Override
public KitePacket txAckReceived(final KitePacket packet) {
final KiteRemoting.TxACKPacket txAck = (KiteRemoting.TxACKPacket) packet.getMessage();
final TxResponse txResponse = TxResponse.parseFrom(txAck);

KiteRemoting.TxACKPacket.Builder builder = txAck.toBuilder();
try {
listener.onMessageCheck(txResponse);
builder.setStatus(txResponse.getStatus());
} catch (Exception e) {
//设置为回滚
builder.setStatus(2);
builder.setFeedback(e.getMessage());
}

KitePacket response = new KitePacket(packet.getHeader().getOpaque(), Protocol.CMD_TX_ACK, builder.build());
return response;
}


@Override
public KitePacket bytesMessageReceived(final KitePacket packet) {
final KiteRemoting.BytesMessage message = (KiteRemoting.BytesMessage) packet.getMessage();

return innerReceived(packet, MessageUtils.convertMessage(message));

}

@Override
public KitePacket stringMessageReceived(final KitePacket packet) {
final KiteRemoting.StringMessage message = (KiteRemoting.StringMessage) packet.getMessage();

return innerReceived(packet, MessageUtils.convertMessage(message));
}

private KitePacket innerReceived(KitePacket packet, Message message) {
boolean succ = false;
try {
succ = listener.onMessage(message);
} catch (Exception e) {
LOGGER.error("bytesMessageReceived|FAIL|", e);
succ = false;
}
KiteRemoting.DeliverAck ack = AckUtils.buildDeliverAck(message.getHeader(), succ);
KitePacket response = new KitePacket(packet.getHeader().getOpaque(), Protocol.CMD_DELIVER_ACK, ack);
return response;
}

}
@@ -0,0 +1,59 @@
package org.kiteq.client.binding;


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* 变化监听器监听器
*
*/
public abstract class AbstractChangeWatcher implements CuratorWatcher{

private static final Logger logger = LoggerFactory.getLogger(AbstractChangeWatcher.class);

protected CuratorFramework zkClient;

public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

@Override
public void process(WatchedEvent watchedEvent) throws Exception {
switch (watchedEvent.getType()){
case NodeChildrenChanged:
List<String> nodes =this.zkClient.getChildren().forPath(watchedEvent.getPath());
String topic = watchedEvent.getPath().substring(watchedEvent.getPath().lastIndexOf("/") + 1);
this.qServerNodeChange(topic,nodes);
logger.info("NodeChildrenChanged|"+watchedEvent.getPath(),nodes);
break;
case NodeDeleted:
//ignored
break;
case NodeCreated:
break;
//ignore
case NodeDataChanged:
break;
//ignore
}

//add watcher
this.zkClient.checkExists().usingWatcher(this);
}


/**
* qserver节点变更
* @param topic
* @param address
*/
protected abstract void qServerNodeChange(String topic,List<String> address);


}

This file was deleted.

@@ -0,0 +1,122 @@
package org.kiteq.client.binding;


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.kiteq.commons.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;


/**
*
* Qmanager的管理
* Created by blackbeans on 12/15/15.
*/
public class QServerManager {

private static final Logger logger = LoggerFactory.getLogger(QServerManager.class);

public static final String PATH_SERVER = "/kiteq/server/";

private static final String PATH_PRODUCER = "/kiteq/pub";

private static final String PATH_CONSUMER = "/kiteq/sub";

private CuratorFramework zkClient;
private String zkAddr;


public void init() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
zkClient = CuratorFrameworkFactory.newClient(zkAddr, retryPolicy);
zkClient.start();
}

public void setZkAddr(String zkAddr) {
this.zkAddr = zkAddr;
}

public CuratorFramework getZkClient() {
return zkClient;
}

/**
* 通过topic获取QServer
*KITEQ_SERVER = KITEQ + "/server" // 临时节点 # /kiteq/server/${topic}/ip:port
** @param topic
* @return
*/
public List<String> pullAndWatchQServer(String topic, final AbstractChangeWatcher watcher) {
String path = PATH_SERVER + "/" + topic;
List<String> children = Collections.EMPTY_LIST;
try {
children = this.zkClient.getChildren().usingWatcher(watcher).forPath(path);
} catch (Exception e) {
logger.error("pullAndWatchQServer|FAIL|" + path, e);
}
return children;
}


/**
* // KITEQ_PUB = KITEQ + "/pub" // 临时节点 # /kiteq/pub/${topic}/${groupId}/ip:port
* 发布本地的发送者发送的消息类型
*
* @param group
* @param topics
* @param publisherTag
* @throws Exception
*/
public void publishTopics(String group, String publisherTag, List<String> topics) throws Exception {
for (String topic : topics) {
String path = PATH_PRODUCER + "/" + topic + "/" + publisherTag;
String eppath = this.zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
logger.info("publishTopics|SUCC|" + eppath);
}
}


/**
* KITEQ_SUB = KITEQ + "/sub" // 持久订阅/或者临时订阅 # /kiteq/sub/${topic}/${groupId}-bind/#$data(bind)
* 发布本地的发送者发送的消息类型
*
* @param group
* @param binds
* @throws Exception
*/
public void subscribeTopics(String group, List<Binding> binds) throws Exception {
Map<String, List<Binding>> topics2Binds = new HashMap<String, List<Binding>>();
for (Binding bind : binds) {
bind.setGroupId(group);
if (topics2Binds.containsKey(bind.getTopic())) {
topics2Binds.get(bind.getTopic()).add(bind);
} else {
List<Binding> tmp = new ArrayList<Binding>();
tmp.add(bind);
topics2Binds.put(bind.getTopic(), tmp);
}
}

//按照topic推送订阅关系
for (Map.Entry<String, List<Binding>> entry : topics2Binds.entrySet()) {
//开始推送订阅关系
String path = PATH_CONSUMER + "/" + entry.getKey() + "/" + group + "-bind";
this.zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);

Stat stat = this.zkClient.setData().forPath(path, JsonUtils.toJSON(entry.getValue()).getBytes("UTF-8"));
logger.info("subscribeTopics|Subscribe|SUCC|" + path + "|" + JsonUtils.toJSON(entry.getValue()));
}
}


public void destroy() {
zkClient.close();
}

}
@@ -6,7 +6,7 @@
import org.kiteq.client.ClientManager;
import org.kiteq.client.KiteClient;
import org.kiteq.client.binding.Binding;
import org.kiteq.client.binding.BindingManager;
import org.kiteq.client.binding.QServerManager;
import org.kiteq.client.message.*;
import org.kiteq.commons.exception.NoKiteqServerException;
import org.kiteq.commons.stats.KiteStats;
@@ -24,6 +24,7 @@
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;

/**
* @author gaofeihang
@@ -33,68 +34,89 @@ public class DefaultKiteClient implements KiteClient {

private static final Logger logger = LoggerFactory.getLogger(DefaultKiteClient.class);

private String[] publishTopics;
private Binding[] bindings;
private List<String> publishTopics;
private List<Binding> bindings;
private String groupId;

private BindingManager bindingManager;
private QServerManager qserverManager;

private final ClientManager clientManager;
private ClientManager clientManager;

private final ClientConfigs clientConfigs;
private ClientConfigs clientConfigs;

public DefaultKiteClient(String zkAddr, ClientConfigs clientConfigs) {
this(zkAddr, clientConfigs, new ListenerAdapter() {
private MessageListener listener;

});
private String zkHosts;

public void setListener(MessageListener listener) {
this.listener = listener;
}

public DefaultKiteClient(String zkAddr, ClientConfigs clientConfigs, MessageListener listener) {
this.bindingManager = BindingManager.getInstance(zkAddr);
public void setClientConfigs(ClientConfigs clientConfigs) {
this.clientConfigs = clientConfigs;
}

clientManager = new ClientManager(bindingManager, clientConfigs, listener);
public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
}


public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

@Override
public void setPublishTopics(String[] topics) {
public void setPublishTopics(List<String> topics) {
this.publishTopics = topics;
}

@Override
public void setBindings(Binding[] bindings) {
public void setBindings( List<Binding> bindings) {
this.bindings = bindings;
}

@Override
public void start() {
if (publishTopics != null) {
for (String topic : publishTopics) {
clientManager.refreshServers(topic, bindingManager.getServerList(topic));
}

String producerName = getProducerName();
for (String topic : publishTopics) {
bindingManager.registerProducer(topic, clientConfigs.groupId, producerName);

bindingManager.registerClientManager(topic, clientManager);
}
public void init() throws Exception {

//启动Qserver
this.qserverManager = new QServerManager();
this.qserverManager.setZkAddr(this.zkHosts);
this.qserverManager.init();

//创建client的管理者
this.clientManager = new ClientManager(qserverManager, clientConfigs, this.listener);

//收集所有的topic
Set<String> topics = new HashSet<String>();
if(null == publishTopics && !publishTopics.isEmpty()) {
//发送方这个分组信息
this.qserverManager.publishTopics(this.groupId, getProducerName(), this.publishTopics);
topics.addAll(this.publishTopics);
}

if (bindings != null) {
for (Binding binding : bindings) {
String topic = binding.getTopic();
clientManager.refreshServers(topic, bindingManager.getServerList(topic));
topics.add(topic);
}
}

for (Binding binding : bindings) {
bindingManager.registerClientManager(binding.getTopic(), clientManager);
}
//初始化kiteq的客户端管理
this.clientManager.setTopics(topics);
this.clientManager.init();


//推送本地的订阅关系
qserverManager.subscribeTopics(this.groupId,bindings);

logger.info("DefaultKiteClient|Init|SUCC|...");

bindingManager.registerConsumer(bindings);
}
}

private String getProducerName() {
private static String getProducerName() {
String producerName;
String jvmName = ManagementFactory.getRuntimeMXBean().getName();
if (StringUtils.isEmpty(jvmName)) {
@@ -115,7 +137,7 @@ private String getProducerName() {
public void close() {
ThreadPoolManager.shutdown();
KiteStats.close();
bindingManager.close();
qserverManager.destroy();
clientManager.close();
}

@@ -178,14 +200,14 @@ private void handleTxCallback(TxCallback txCallback, Header header) throws NoKit
}

private void sendMessage(byte cmdType, Message message, Header header) throws NoKiteqServerException {
KiteIOClient client = clientManager.get(header.getTopic());
KiteIOClient client = clientManager.findClient(header.getTopic());
client.send(cmdType, message);
}

private SendResult innerSendMessage(byte cmdType, Message message, Header header) throws NoKiteqServerException {
SendResult result = new SendResult();
try {
KiteIOClient kiteIOClient = clientManager.get(header.getTopic());
KiteIOClient kiteIOClient = clientManager.findClient(header.getTopic());
MessageStoreAck ack = kiteIOClient.sendAndGet(cmdType, message);

if (ack == null) {
@@ -11,6 +11,8 @@
import org.kiteq.client.message.Message;
import org.kiteq.commons.util.ParamUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@@ -48,11 +50,20 @@ public boolean onMessage(Message message) {
return true;
}
};
KiteClient[] clients = new KiteClient[clientNum];
DefaultKiteClient[] clients = new DefaultKiteClient[clientNum];
for (int i = 0; i < clientNum; i++) {
clients[i] = new DefaultKiteClient(zkAddr, clientConfigs, listener);
clients[i].setBindings(new Binding[]{Binding.bindDirect(groupId, topic, messageType, 1000, true)});
clients[i].start();
clients[i] = new DefaultKiteClient();
List<Binding> binds = new ArrayList<Binding>();
binds.add(Binding.bindDirect(groupId, topic, messageType, 1000, true));
clients[i].setBindings(binds);
clients[i].setZkHosts(zkAddr);
clients[i].setListener(listener);
clients[i].setClientConfigs(clientConfigs);
try {
clients[i].init();
} catch (Exception e) {
e.printStackTrace();
}
}

TimeUnit.HOURS.sleep(1);
@@ -5,11 +5,17 @@
import org.apache.log4j.Logger;
import org.kiteq.client.ClientConfigs;
import org.kiteq.client.KiteClient;
import org.kiteq.client.binding.Binding;
import org.kiteq.client.impl.DefaultKiteClient;
import org.kiteq.client.message.Message;
import org.kiteq.client.message.MessageListener;
import org.kiteq.client.message.TxResponse;
import org.kiteq.commons.exception.NoKiteqServerException;
import org.kiteq.commons.util.ParamUtils;
import org.kiteq.commons.util.ThreadUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -51,11 +57,30 @@ public static void main(String[] args) throws InterruptedException {
LOGGER.info("workerNum=" + workerNum);

ClientConfigs clientConfigs = new ClientConfigs(groupId, secretKey);
KiteClient[] clients = new KiteClient[clientNum];
DefaultKiteClient[] clients = new DefaultKiteClient[clientNum];
for (int i = 0; i < clientNum; i++) {
clients[i] = new DefaultKiteClient(zkAddr, clientConfigs);
clients[i].setPublishTopics(new String[]{topic});
clients[i].start();
clients[i] = new DefaultKiteClient();
List<String> binds = new ArrayList<String>();
binds.add(topic);
clients[i].setPublishTopics(binds);
clients[i].setZkHosts(zkAddr);
clients[i].setListener(new MessageListener() {
@Override
public boolean onMessage(Message message) {
return false;
}

@Override
public void onMessageCheck(TxResponse tx) {

}
});
clients[i].setClientConfigs(clientConfigs);
try {
clients[i].init();
} catch (Exception e) {
e.printStackTrace();
}
}

ExecutorService executor = Executors.newCachedThreadPool();
@@ -1,5 +1,7 @@
package org.kiteq.example;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -13,9 +15,7 @@
import org.kiteq.client.ClientConfigs;
import org.kiteq.client.KiteClient;
import org.kiteq.client.impl.DefaultKiteClient;
import org.kiteq.client.message.ListenerAdapter;
import org.kiteq.client.message.TxCallback;
import org.kiteq.client.message.TxResponse;
import org.kiteq.client.message.*;
import org.kiteq.commons.exception.NoKiteqServerException;
import org.kiteq.commons.util.ParamUtils;
import org.kiteq.commons.util.ThreadUtils;
@@ -68,11 +68,30 @@ public void onMessageCheck(TxResponse response) {
}
}
};
KiteClient[] clients = new KiteClient[clientNum];
DefaultKiteClient[] clients = new DefaultKiteClient[clientNum];
for (int i = 0; i < clientNum; i++) {
clients[i] = new DefaultKiteClient(zkAddr, clientConfigs, listener);
clients[i].setPublishTopics(new String[]{topic});
clients[i].start();
clients[i] = new DefaultKiteClient();
List<String> binds = new ArrayList<String>();
binds.add(topic);
clients[i].setPublishTopics(binds);
clients[i].setZkHosts(zkAddr);
clients[i].setListener(new MessageListener() {
@Override
public boolean onMessage(Message message) {
return false;
}

@Override
public void onMessageCheck(TxResponse tx) {

}
});
clients[i].setClientConfigs(clientConfigs);
try {
clients[i].init();
} catch (Exception e) {
e.printStackTrace();
}
}

ExecutorService executor = Executors.newCachedThreadPool();
@@ -2,7 +2,6 @@

import com.google.protobuf.Message;
import org.kiteq.protocol.packet.KitePacket;
import org.kiteq.remoting.listener.KiteListener;

import java.util.Set;

@@ -17,8 +16,6 @@ public interface KiteIOClient {
<T> T sendAndGet(byte cmdType, Message message);

public void sendResponse(KitePacket packet);

void registerListener(KiteListener listener);

void start() throws Exception;

@@ -28,7 +25,7 @@ public interface KiteIOClient {

void close();

String getServerUrl();
String getHostPort();

Set<String> getAcceptedTopics();

@@ -3,14 +3,12 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.kiteq.commons.stats.KiteStats;
import org.kiteq.protocol.KiteRemoting;
import org.kiteq.commons.threadpool.ThreadPoolManager;
import org.kiteq.protocol.Protocol;
import org.kiteq.protocol.packet.KitePacket;
import org.kiteq.remoting.listener.KiteListener;
import org.kiteq.remoting.listener.ListenerManager;
import org.kiteq.remoting.listener.RemotingListener;
import org.kiteq.remoting.response.KiteResponse;
import org.kiteq.remoting.response.ResponseFuture;
import org.kiteq.remoting.utils.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -19,35 +17,51 @@
* @since Feb 11, 2015
*/
public class KiteClientHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(KiteClientHandler.class);


private RemotingListener remotingListener;

public KiteClientHandler(RemotingListener remotingListener) {
this.remotingListener = remotingListener;
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {

if (msg instanceof KitePacket) {
KiteStats.recordRead();

KitePacket packet = (KitePacket) msg;
byte cmdType = packet.getHeader().getCmdType();
if (cmdType == Protocol.CMD_CONN_AUTH ||
cmdType == Protocol.CMD_MESSAGE_STORE_ACK ||
cmdType == Protocol.CMD_HEARTBEAT) {
ResponseFuture.receiveResponse(new KiteResponse(packet.getHeader().getOpaque(), packet.getMessage()));
} else {
KiteListener listener = ListenerManager.getListener(ChannelUtils.getChannelId(ctx.channel()));
if (cmdType == Protocol.CMD_TX_ACK) {
listener.txAckReceived(packet);
} else if (cmdType == Protocol.CMD_BYTES_MESSAGE) {
listener.bytesMessageReceived( packet);
} else if (cmdType == Protocol.CMD_STRING_MESSAGE) {
listener.stringMessageReceived(packet);
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {

ThreadPoolManager.getWorkerExecutor().execute(new Runnable() {
@Override
public void run() {
if (msg instanceof KitePacket) {
KiteStats.recordRead();

KitePacket packet = (KitePacket) msg;
byte cmdType = packet.getHeader().getCmdType();
if (cmdType == Protocol.CMD_CONN_AUTH ||
cmdType == Protocol.CMD_MESSAGE_STORE_ACK ||
cmdType == Protocol.CMD_HEARTBEAT) {
ResponseFuture.receiveResponse(new KiteResponse(packet.getHeader().getOpaque(), packet.getMessage()));
} else {
KitePacket response = null;
if (cmdType == Protocol.CMD_TX_ACK) {
response = KiteClientHandler.this.remotingListener.txAckReceived(packet);
} else if (cmdType == Protocol.CMD_BYTES_MESSAGE) {
response = KiteClientHandler.this.remotingListener.bytesMessageReceived(packet);
} else if (cmdType == Protocol.CMD_STRING_MESSAGE) {
response = KiteClientHandler.this.remotingListener.stringMessageReceived(packet);
} else {
logger.error("Received unknown package: " + packet);
}

if (null != response) {
//发送回执
ctx.channel().writeAndFlush(response);
}
}
} else {
logger.error("Received unknown package: " + packet);
logger.warn("Illegal message {}", msg);
}
}
} else {
logger.warn("Illegal message {}", msg);
}
});
}
}
@@ -1,14 +1,12 @@
package org.kiteq.remoting.client.impl;

import com.google.common.collect.MapMaker;
import com.google.protobuf.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.kiteq.commons.stats.KiteStats;
import org.kiteq.commons.util.HostPort;
import org.kiteq.commons.util.NamedThreadFactory;
@@ -19,11 +17,9 @@
import org.kiteq.remoting.client.handler.KiteClientHandler;
import org.kiteq.remoting.codec.KiteDecoder;
import org.kiteq.remoting.codec.KiteEncoder;
import org.kiteq.remoting.listener.KiteListener;
import org.kiteq.remoting.listener.ListenerManager;
import org.kiteq.remoting.listener.RemotingListener;
import org.kiteq.remoting.response.KiteResponse;
import org.kiteq.remoting.response.ResponseFuture;
import org.kiteq.remoting.utils.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -71,9 +67,9 @@ public enum STATE {

private final AtomicReference<STATE> state = new AtomicReference<STATE>(STATE.NONE);

private final ConcurrentMap<KiteListener, Boolean> listeners = new MapMaker().weakKeys().makeMap();
private RemotingListener listener;

public NettyKiteIOClient(String groupId, String secretKey, String serverUrl) {
public NettyKiteIOClient(String groupId, String secretKey, String serverUrl,final RemotingListener listener) {
this.groupId = groupId;
this.secretKey = secretKey;
this.serverUrl = serverUrl;
@@ -92,7 +88,7 @@ public NettyKiteIOClient(String groupId, String secretKey, String serverUrl) {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("encoder", new KiteEncoder());
ch.pipeline().addLast("decoder", new KiteDecoder());
ch.pipeline().addLast("kiteq-handler", new KiteClientHandler());
ch.pipeline().addLast("kiteq-handler", new KiteClientHandler(listener));
}
});
}
@@ -116,7 +112,6 @@ public boolean reconnect() {
return false;
}

String oldChannel = ChannelUtils.getChannelId(channelFuture.channel());

int retryCount = 1;
while (!Thread.currentThread().isInterrupted()) {
@@ -127,12 +122,6 @@ public boolean reconnect() {

heartbeat.reset();

String newChannel = ChannelUtils.getChannelId(channelFuture.channel());
for (KiteListener listener : listeners.keySet()) {
ListenerManager.register(newChannel, listener);
}
ListenerManager.unregister(oldChannel);

if (handshake()) {
state.set(STATE.RUNNING);
} else {
@@ -268,16 +257,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
channel.flush();
}

@Override
public void registerListener(KiteListener listener) {
listeners.putIfAbsent(listener, true);

Channel channel = channelFuture.channel();
ListenerManager.register(ChannelUtils.getChannelId(channel), listener);
}

@Override
public String getServerUrl() {
public String getHostPort() {
return serverUrl;
}

This file was deleted.

This file was deleted.

@@ -0,0 +1,19 @@
package org.kiteq.remoting.listener;



import org.kiteq.protocol.packet.KitePacket;

/**
* @author gaofeihang
* @since Feb 13, 2015
*/
public interface RemotingListener {

KitePacket txAckReceived(KitePacket packet);

KitePacket bytesMessageReceived(KitePacket packet);

KitePacket stringMessageReceived(KitePacket packet);

}