首先我们的单体IM架构的重点侧重于通讯部分和服务端的设计,用户端的注册页面和登录页面等我们都采用控制台的方式模拟。
命令收集器的职能就是用来收集客户端输入的指令,进而完成用户后续的操作。
首先有一个命令收集的接口
public interface BaseCommand {
//获取命令的key
String getKey();
//获取命令的提示信息
String getTip();
//从控制台提取 业务数据
void exec(Scanner scanner);
}
实现类分别有 👇
- ChatConsoleCommand: 收集用户聊天消息
- ClientCommandMenu: 菜单显示
- LoginConsoleCommand: 用户登录指令收集
- LogoutConsoleCommand: 登出指令
- 首先是客户端启动的时候,加载所有的命令
- 客户端启动之后,控制台可以打印已经加载的命令收集器,也就是说,客户端目前提供了哪些功能
- 然后用户根据控制台提示的指令就可以进行功能使用了,比如登录,聊天等
//启动聊天客户端
private static void startChatClient(ApplicationContext context) {
CommandController commandClient = context.getBean(CommandController.class);
// 加载所有命令
commandClient.initCommandMap();
try {
commandClient.commandThreadRunning();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void initCommandMap() {
commandMap = new HashMap<>();
commandMap.put(clientCommandMenu.getKey(), clientCommandMenu);
commandMap.put(chatConsoleCommand.getKey(), chatConsoleCommand);
commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand);
commandMap.put(logoutConsoleCommand.getKey(), logoutConsoleCommand);
clientCommandMenu.setAllCommand(commandMap);
}
客户端完成连接之后呢,命令收集服务启动。
一个while循环,不断监听客户端输入的数据,然后解析数据,根据数据不同的类型,做出不同的处理。具体可以看一下下面的switch分流。
public void commandThreadRunning() throws InterruptedException {
Thread.currentThread().setName("命令线程");
while (true) {
//建立连接
while (connectFlag == false) {
//开始连接
startConnectServer();
waitCommandThread();
}
//处理命令
while (null != session) {
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);
if (null == command) {
System.err.println("无法识别[" + command + "]指令,请重新输入!");
continue;
}
switch (key) {
case ChatConsoleCommand.KEY:
command.exec(scanner);
startOneChat((ChatConsoleCommand) command);
break;
case LoginConsoleCommand.KEY:
command.exec(scanner);
startLogin((LoginConsoleCommand) command);
break;
case LogoutConsoleCommand.KEY:
command.exec(scanner);
startLogout(command);
break;
}
}
}
}
点击Help -> Edit custom vm Options, 添加如下
-Deditable.java.test.console=true
然后重启IDE生效!!!
简易代码收集命令测试程序。
@Test
public void testCommandController() {
initCommandMap();
//处理命令
while (true) {
//菜单输入
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
//根据菜单输入,选择正确的命令收集器
BaseCommand command = commandMap.get(key);
if (null == command) {
System.err.println("无法识别[" + key + "]指令,请重新输入!");
continue;
}
//执行命令收集器
switch (key) {
case LoginConsoleCommand.KEY:
command.exec(scanner);
Logger.info("本次输入的 username 为:");
Logger.info(((LoginConsoleCommand) command).getUserName());
Logger.info("本次输入的 password 为:");
Logger.info(((LoginConsoleCommand) command).getPassword());
// startLogin((LoginConsoleCommand) command);
break;
case LogoutConsoleCommand.KEY:
command.exec(scanner);
// startLogout((LoginConsoleCommand) command);
break;
}
}
}
public void initCommandMap() {
commandMap = new HashMap<>();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
LogoutConsoleCommand logoutConsoleCommand = new LogoutConsoleCommand();
commandMap.put(clientCommandMenu.getKey(), clientCommandMenu);
commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand);
commandMap.put(logoutConsoleCommand.getKey(), logoutConsoleCommand);
clientCommandMenu.setAllCommand(commandMap);
}
请输入某个操作指令:[menu] 0->show 所有命令 | 1->登录 | 10->退出 |
1
请输入用户信息(id@password)
2
请按照格式输入(id@password):
一般情况下,Netty自带的编解码器都不能满足我们的需求,需要我们自己定义编解码器。
编码逻辑:添加内容
- 魔数
- 版本
- 数据长度
- 数据
@Slf4j
public class SimpleProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
@Override
protected void encode(ChannelHandlerContext ctx, ProtoMsg.Message msg, ByteBuf out) throws Exception {
encode0(msg, out);
}
public static void encode0(ProtoMsg.Message msg, ByteBuf out) {
out.writeShort(ProtoInstant.MAGIC_CODE);
out.writeShort(ProtoInstant.VERSION_CODE);
byte[] bytes = msg.toByteArray();// 将 ProtoMsg.Message 对象转换为byte
// 加密消息体
/*ThreeDES des = channel.channel().attr(Constants.ENCRYPT).get();
byte[] encryptByte = des.encrypt(bytes);*/
int length = bytes.length;// 读取消息的长度
Logger.cfo("encoder length=" + length);
// 先将消息长度写入,也就是消息头
out.writeInt(length);
// 消息体中包含我们要发送的数据
out.writeBytes(bytes);
}
}
@Slf4j
public class SimpleProtobufDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object outmsg = decode0(ctx, in);
if (outmsg != null) {
// 获取业务消息
out.add(outmsg);
}
}
public static Object decode0(ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
// 判断包头长度
if (in.readableBytes() < 8) {// 不够包头
return null;
}
//读取魔数
short magic = in.readShort();
if (magic != ProtoInstant.MAGIC_CODE) {
String error = "客户端口令不对:" + ctx.channel().remoteAddress();
//异常连接,直接报错,关闭连接
throw new InvalidFrameException(error);
}
//读取版本
short version = in.readShort();
if (version != ProtoInstant.VERSION_CODE) {
String error = "协议的版本不对:" + ctx.channel().remoteAddress();
//异常连接,直接报错,关闭连接
throw new InvalidFrameException(error);
}
// 读取传送过来的消息的长度。
int length = in.readInt();
// 长度如果小于0
if (length < 0) {
// 非法数据,关闭连接
ctx.close();
}
if (length > in.readableBytes()) {// 读到的消息体长度如果小于传送过来的消息长度
// 重置读取位置
in.resetReaderIndex();
return null;
}
Logger.cfo("decoder length=" + in.readableBytes());
byte[] array;
if (in.hasArray()) {
array = new byte[length];
in.readBytes(array, 0, length);
} else {
//直接缓冲
array = new byte[length];
in.readBytes(array, 0, length);
}
// 字节转成对象
return ProtoMsg.Message.parseFrom(array);
}
}
@Test
public void testProtobufDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
// 半包的处理
ch.pipeline().addLast("decoder", new SimpleProtobufDecoder());
// ch.pipeline().addLast("encoder", new SimpleProtobufEncoder());
ch.pipeline().addLast("inHandler", new MockLoginRequestHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
ByteBuf bytebuf = Unpooled.buffer(1024).order(ByteOrder.BIG_ENDIAN);
ProtoMsg.Message pkg = buildLoginMsg(new User());
SimpleProtobufEncoder.encode0(pkg, bytebuf);
channel.writeInbound(bytebuf);
channel.flush();
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
测试结果:
SimpleProtobufEncoder:encode0:encoder length=97
SimpleProtobufDecoder:decode0:decoder length=97
[main|TestProtobufMsg$MockLoginRequestHandler:channelRead] |> id:=99
[main|TestProtobufMsg$MockLoginRequestHandler:channelRead] |> content:=4ac37b58-b5f8-4e06-891e-5cb051c87bea
SimpleProtobufEncoder:encode0:encoder length=98
SimpleProtobufDecoder:decode0:decoder length=98
[main|TestProtobufMsg$MockLoginRequestHandler:channelRead] |> id:=100
[main|TestProtobufMsg$MockLoginRequestHandler:channelRead] |> content:=0631a7a4-1c8c-434e-980f-907d559ba8bc