Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,34 @@ Java 后端项目,大规模集群设备的管理平台,使用 Spring 作为

- 采用 Redis 作为数据库的缓存,极大提升数据库的使用效率,并将服务器对帧的处理效率提升2个数量级以上。

### Netty 服务器设计方案

本节内容总结为以下文章,点击查看:

[基于 Netty 的自定义帧高可靠性读取方案](http://www.jianshu.com/p/7b1010e7e293)

### 帧调度算法概述

>本节介绍 Java 服务器中,Netty 模块使用的帧调度算法,由于众多硬件设备的**数据帧处理能力较差**,**可靠性较差**,服务器大规模下发数据帧时,需进行有效的**拥塞控制、超时重发**,可有效提升集群设备的可靠性,降低集群设备的研发难度。

内容较多,这部分内容被放在了独立博文中,请使用如下链接查看:

|来源|网址|
|---|---|
|简书|http://www.jianshu.com/p/c5da14855515|
|主页|http://bitky.cc/2017/07/19/java/|
[基于 Netty 的帧调度策略,自行实现流量控制及可靠性通信](http://www.jianshu.com/p/c5da14855515)

**「注」**本部分为源码「Netty服务器」部分的解释说明,需结合源码进行阅读。
「注」本部分为源码「Netty服务器」部分的解释说明,需结合源码进行阅读。

## JavaFX 设备模拟客户端

基于 JavaFX 开发 GUI 客户端,模拟集群设备的行为,并可对服务器进行压力测试。

本程序开发时,相关技巧和填坑总结为以下文章:

[JavaFX 8 下简化自定义控件的外部调用以及流式布局示例](http://www.jianshu.com/p/9b5300b44f39)

[Maven 集成 JavaFX 8 以及 fx:root 问题探讨](http://www.jianshu.com/p/fce816babefc)



### 模拟客户端主界面

![模拟客户端主界面](./mdphoto/main21.jpg)
Expand Down
3 changes: 3 additions & 0 deletions clustermanage-server/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
运行/


target/
.mvn/

Expand Down
2 changes: 1 addition & 1 deletion clustermanage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>cc.bitky.clustermanage</groupId>
<artifactId>clustermanage-server</artifactId>
<version>0.9.2-release</version>
<version>0.9.3-release</version>
<packaging>jar</packaging>

<name>clustermanage-server</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private static void initSetting() {
ServerSetting.DEFAULT_EMPLOYEE_CARD_NUMBER = exSetting.员工默认卡号;
ServerSetting.DEFAULT_EMPLOYEE_NAME = exSetting.员工默认姓名;
ServerSetting.DEFAULT_EMPLOYEE_DEPARTMENT = exSetting.员工默认部门;
ServerSetting.AUTO_REPEAT_REQUEST_TIMES = exSetting.检错重发最大重复次数;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import cc.bitky.clustermanage.db.bean.Device;
import cc.bitky.clustermanage.db.repository.DeviceRepository;
import cc.bitky.clustermanage.server.message.ChargeStatus;
import cc.bitky.clustermanage.server.message.tcp.TcpMsgResponseStatus;

@Service
Expand Down Expand Up @@ -48,7 +49,7 @@ Device handleMsgDeviceStatus(TcpMsgResponseStatus msgStatus) {
if (device == null) return null;
int rawStatus = device.getStatus();
int newStatus = msgStatus.getStatus();
if (newStatus > 6 || newStatus < 0) newStatus = 50;
if (newStatus > 6 || newStatus < 0) newStatus = ChargeStatus.CRASH;

// if (rawStatus >= 5) {
// logger.info("设备「" + msgStatus.getGroupId() + ", " + msgStatus.getDeviceId() + "」『"
Expand All @@ -64,7 +65,7 @@ Device handleMsgDeviceStatus(TcpMsgResponseStatus msgStatus) {
return device;
}

if (rawStatus == 2 && newStatus == 3) {
if (rawStatus == ChargeStatus.CHARGING && newStatus == ChargeStatus.FULL && device.getRemainChargeTime() > 0) {
device.setRemainChargeTime(device.getRemainChargeTime() - 1);
}
device.setStatus(newStatus);
Expand All @@ -78,8 +79,8 @@ Device handleMsgDeviceStatus(TcpMsgResponseStatus msgStatus) {
/**
* 获取设备的集合
*
* @param groupId 组 Id
* @param deviceId 设备 Id
* @param groupId 组 Id
* @param deviceId 设备 Id
* @return 设备的集合
*/
List<Device> getDevices(int groupId, int deviceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Device handleMsgDeviceStatus(TcpMsgResponseStatus tcpMsgResponseStatus, b
logger.info("无指定设备对应的员工和考勤表,且无法自动创建");
}
long l5 = System.currentTimeMillis();
logger.info("时间耗费:" + (l2 - l1) + "ms; " + (l3 - l2) + "ms; " + (l4 - l3) + "ms; " + (l5 - l4) + "ms");
logger.info("时间耗费:" + (l2 - l1) + "ms; " + (l3 - l2) + "ms; " + (l4 - l3) + "ms; " + (l5 - l4) + "ms");
return device;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public class ExSetting {
public String 员工默认部门;
public int 部署剩余充电次数阈值;
public int 帧发送间隔;
public int 检错重发最大重复次数;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ public class ServerSetting {
*/
public static final int COMMAND_DELAY_WAITING_TIME = 30;

/**
* 检错重发最大次数,服务器向 TCP 通道发送 CAN 帧,最大重复发送次数
*/
public static int AUTO_REPEAT_REQUEST_TIMES = 5;

//-----------------------接收到充电状态帧时的处理策略----------------------------
/**
* 项目版本号
*/
public static final String VERSION = "0.9.2";
public static final String VERSION = "0.9.3";
/**
* 主机名
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ boolean marchConfirmCard(String cardNumber) {
return kyDbPresenter.marchConfirmCard(cardNumber);
}

/**
* 获取 CAN 帧发送队列的信息
* @return CAN帧发送队列信息集合
*/
QueueInfo obtainQueueFrame() {
int size = getSendingMsgRepo().getLinkedBlockingDeque().size();
int capacity = ServerSetting.LINKED_DEQUE_LIMIT_CAPACITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@
import cc.bitky.clustermanage.server.message.web.WebMsgInitMarchConfirmCardResponse;
import cc.bitky.clustermanage.server.schedule.MsgKey;
import cc.bitky.clustermanage.server.schedule.SendingMsgRepo;
import cc.bitky.clustermanage.tcp.server.netty.SendWebMessagesListener;
import cc.bitky.clustermanage.tcp.TcpMediator;
import io.netty.util.internal.StringUtil;

@Service
public class ServerTcpMessageHandler {
private final KyDbPresenter kyDbPresenter;
private final TcpMediator tcpMediator;
private Logger logger = LoggerFactory.getLogger(getClass());
private SendWebMessagesListener sendWebMessagesListener;
private KyServerCenterHandler kyServerCenterHandler;

@Autowired
public ServerTcpMessageHandler(KyDbPresenter kyDbPresenter) {
public ServerTcpMessageHandler(KyDbPresenter kyDbPresenter, TcpMediator tcpMediator) {
this.kyDbPresenter = kyDbPresenter;
this.tcpMediator = tcpMediator;
tcpMediator.setServerTcpMessageHandler(this);
}

public SendingMsgRepo getSendingMsgRepo() {
Expand All @@ -54,6 +56,7 @@ public void handleResDeviceStatus(TcpMsgResponseStatus message) {
logger.info("收到:设备状态请求的回复");
long l1 = System.currentTimeMillis();
Device device = kyDbPresenter.handleMsgDeviceStatus(message, ServerSetting.AUTO_CREATE_DEVICE_EMPLOYEE);
//部署剩余充电次数
if (device != null) {
deployRemainChargeTimes(device);
}
Expand All @@ -63,7 +66,6 @@ public void handleResDeviceStatus(TcpMsgResponseStatus message) {

/**
* 其他功能消息处理方法
*
*/
public void handleTcpMsg() {

Expand All @@ -74,11 +76,13 @@ public void handleTcpMsg() {
*
* @param device 处理后的 Device
*/

private void deployRemainChargeTimes(Device device) {
if (device.getRemainChargeTime() <= ServerSetting.DEPLOY_REMAIN_CHARGE_TIMES) {

//当当前充电状态为「充满」,并且剩余充电次数小于或等于阈值时,部署剩余充电次数
if (device.getStatus() == 3 && device.getRemainChargeTime() <= ServerSetting.DEPLOY_REMAIN_CHARGE_TIMES) {
int remainTimes = device.getRemainChargeTime();
remainTimes = remainTimes > 0 ? remainTimes : 0;
remainTimes = remainTimes <= 100 ? remainTimes : 100;
sendMsgToTcpSpecial(new WebMsgDeployRemainChargeTimes(device.getGroupId(), device.getDeviceId(), remainTimes), true, true);
}
}
Expand Down Expand Up @@ -198,10 +202,6 @@ private void handleReceivedCard(IMessage message) {
}
}

public void setSendWebMessagesListener(SendWebMessagesListener sendWebMessagesListener) {
this.sendWebMessagesListener = sendWebMessagesListener;
}

/**
* 「特殊的」将特殊的 Message 发送至 Netty 的处理通道
*
Expand All @@ -228,21 +228,7 @@ boolean sendMsgTrafficControl(IMessage message) {
.newTimeout(timeout -> sendMsgTrafficControl(message), ServerSetting.COMMAND_DELAY_WAITING_TIME, TimeUnit.SECONDS);
return true;
}
return sendMsgToTcp(message);
}

/**
* 直接将 Message 发送至 Netty 的处理通道
*
* @param message 普通消息 Message
* @return 是否发送成功
*/
private boolean sendMsgToTcp(IMessage message) {
if (sendWebMessagesListener == null) {
logger.warn("Server 模块未能与 Netty 模块建立连接,故不能发送消息集合");
return false;
}
return sendWebMessagesListener.sendMessagesToTcp(message);
return tcpMediator.sendMsgToNetty(message);
}

void setKyServerCenterHandler(KyServerCenterHandler kyServerCenterHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import cc.bitky.clustermanage.server.message.web.WebMsgDeployEmployeeCardNumber;
import cc.bitky.clustermanage.server.message.web.WebMsgDeployEmployeeDepartment;
import cc.bitky.clustermanage.server.message.web.WebMsgDeployEmployeeName;
import cc.bitky.clustermanage.server.message.web.WebMsgDeployRemainChargeTimes;
import cc.bitky.clustermanage.web.bean.QueueDevice;
import cc.bitky.clustermanage.web.bean.QueueInfo;

@Service
Expand Down Expand Up @@ -94,62 +96,72 @@ public boolean saveCardNumber(String[] freeCards, CardType card) {
/**
* 从数据库中获取并更新设备的信息
*
* @param groupId 设备组 ID
* @param deviceId 设备 ID
* @param name 是否更新姓名
* @param department 是否更新部门
* @param cardNumber 是否更新卡号
* @param maxGroupId 若更新多个设备组,可指定更新设备组的 ID 范围为: 1 - maxgroupId
* @param groupId 设备组 ID
* @param deviceId 设备 ID
* @param queueDevice 需要部署的设备信息
* @param maxGroupId 若更新多个设备组,可指定更新设备组的 ID 范围为: 1 - maxGroupId
* @return 更新是否成功
*/
public boolean obtainDeployDeviceMsg(int groupId, int deviceId, boolean name, boolean department, boolean cardNumber, int maxGroupId) {
public boolean obtainDeployDeviceMsg(int groupId, int deviceId, QueueDevice queueDevice, int maxGroupId) {
if (groupId == 255 || groupId == 0) {
if (maxGroupId == 0)
maxGroupId = kyDbPresenter.obtainDeviceGroupCount();
if (maxGroupId == 0) return false;
for (int i = 1; i <= maxGroupId; i++) {
getDeviceInfo(i, deviceId).forEach(device -> deployEmployeeMsg(name, department, cardNumber, device));
getDeviceInfo(i, deviceId).forEach(device -> deployEmployeeMsg(queueDevice, device));
}

} else getDeviceInfo(groupId, deviceId)
.forEach(device -> deployEmployeeMsg(name, department, cardNumber, device));
.forEach(device -> deployEmployeeMsg(queueDevice, device));
return true;
}

/**
* 部署员工的姓名,单位,卡号
*
* @param name 员工的姓名
* @param department 员工的部门
* @param cardNumber 员工的卡号
* @param device 员工对应的设备
* @param device 员工对应的设备
* @param queueDevice 需要部署的设备信息
*/
private void deployEmployeeMsg(boolean name, boolean department, boolean cardNumber, Device device) {
private void deployEmployeeMsg(QueueDevice queueDevice, Device device) {
boolean name = queueDevice.isPostName();
boolean department = queueDevice.isPostDepartment();
boolean cardNumber = queueDevice.isPostCardNumber();
boolean remainChargeTime = queueDevice.isPostRemainChargeTime();

boolean autoInit = ServerSetting.DEPLOY_DEVICES_INIT;
boolean AUTO_INIT = ServerSetting.DEPLOY_DEVICES_INIT;

if (device == null) return;

//部署卡号
if (cardNumber && device.getCardNumber() != null)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeCardNumber(device.getGroupId(), device.getDeviceId(), device.getCardNumber()));
else if (cardNumber && autoInit)
else if (cardNumber && AUTO_INIT)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeCardNumber(device.getGroupId(), device.getDeviceId(), ServerSetting.DEFAULT_EMPLOYEE_CARD_NUMBER));

//部署剩余充电次数
if (remainChargeTime) {
int remainTimes = device.getRemainChargeTime();
remainTimes = remainTimes > 0 ? remainTimes : 0;
remainTimes = remainTimes <= 100 ? remainTimes : 100;
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployRemainChargeTimes(device.getGroupId(), device.getDeviceId(), remainTimes));
}

//部署姓名和单位
if (!(name || department)) return;
Employee employee = kyDbPresenter.obtainEmployeeByEmployeeObjectId(device.getEmployeeObjectId());

if (employee != null) {
if (name && employee.getName() != null)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeName(device.getGroupId(), device.getDeviceId(), employee.getName()));
else if (name && autoInit)
else if (name && AUTO_INIT)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeName(device.getGroupId(), device.getDeviceId(), ServerSetting.DEFAULT_EMPLOYEE_NAME));

if (department && employee.getDepartment() != null)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeDepartment(device.getGroupId(), device.getDeviceId(), employee.getDepartment()));
else if (department && autoInit)
else if (department && AUTO_INIT)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeDepartment(device.getGroupId(), device.getDeviceId(), ServerSetting.DEFAULT_EMPLOYEE_DEPARTMENT));

} else if (autoInit) {
} else if (AUTO_INIT) {
if (name)
kyServerCenterHandler.sendMsgTrafficControl(new WebMsgDeployEmployeeName(device.getGroupId(), device.getDeviceId(), ServerSetting.DEFAULT_EMPLOYEE_NAME));
if (department)
Expand All @@ -159,6 +171,5 @@ else if (department && autoInit)

public QueueInfo obtainQueueFrame() {
return kyServerCenterHandler.obtainQueueFrame();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package cc.bitky.clustermanage.server.message;

public interface ChargeStatus {

/**
* 未初始化
*/
int UNINIT = 0;

/**
* 使用中
*/
int USING = 1;

/**
* 充电中
*/
int CHARGING = 2;

/**
* 已充满
*/
int FULL = 3;

/**
* 通信故障
*/
int TRAFFIC_ERROR = 4;

/**
* 充电故障
*/
int CHARGE_ERROR = 5;

/**
* 矿灯未挂好
*/
int HUNG_ERROR = 6;

/**
* 多种故障
*/
int CRASH = 50;
}
Loading