Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class TimeOutServiceImpl implements TimeOutService {


@Override
public void loadOutTime() {
int timeOut = providerConfig.getTimeout();
Constants.maxOutTime = timeOut;
public void loadOutTime(int timeOut) {
int finalTimeOut = (null != providerConfig.getTimeout()) ? providerConfig.getTimeout() : timeOut;
Constants.maxOutTime = finalTimeOut;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public class TimeOutServiceImpl implements TimeOutService {
private BasicServiceConfigBean basicServiceConfigBean;


public void loadOutTime() {
int timeOut = basicServiceConfigBean.getRequestTimeout();
public void loadOutTime(int timeOut) {
int finalTimeOut = (null != basicServiceConfigBean.getRequestTimeout() ? basicServiceConfigBean.getRequestTimeout() : timeOut);
Constants.maxOutTime = timeOut;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ public class TimeOutServiceImpl implements TimeOutService {


@Override
public void loadOutTime() {
public void loadOutTime(int timeOut) {
//todo 暂时写死
int timeOut = 20*1000;
Constants.maxOutTime = timeOut;
/*int timeOut = 20*1000;
Constants.maxOutTime = timeOut;*/
//从txManager取
if(timeOut < 0){
Constants.maxOutTime = 20*1000;
} else {
Constants.maxOutTime = timeOut*1000;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
*/
public interface TimeOutService {

void loadOutTime();
void loadOutTime(int timeOut);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void start() {
nettyService.start();
logger.info("socket-start..");

timeOutService.loadOutTime();
//timeOutService.loadOutTime();


}
Expand Down
14 changes: 12 additions & 2 deletions tx-client/src/main/java/com/codingapi/tx/model/TxServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class TxServer {
private String host;
private int heart;
private int delay;
private int autoCompensateLimit;

public int getPort() {
return port;
Expand Down Expand Up @@ -45,10 +46,18 @@ public int getDelay() {
public void setDelay(int delay) {
this.delay = delay;
}

public int getAutoCompensateLimit() {
return autoCompensateLimit;
}

@Override
public void setAutoCompensateLimit(int autoCompensateLimit) {
this.autoCompensateLimit = autoCompensateLimit;
}

@Override
public String toString() {
return "host:" + host + ",port:" + port + ",heart:" + heart + ",delay:" + delay;
return "host:" + host + ",port:" + port + ",heart:" + heart + ",delay:" + delay + "autoCompensateLimit:" + autoCompensateLimit;
}

public static TxServer parser(String json) {
Expand All @@ -59,6 +68,7 @@ public static TxServer parser(String json) {
txServer.setHost(jsonObject.getString("ip"));
txServer.setHeart(jsonObject.getInteger("heart"));
txServer.setDelay(jsonObject.getInteger("delay"));
txServer.setAutoCompensateLimit(jsonObject.getInteger("autoCompensateLimit"));
return txServer;
} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.codingapi.tx.Constants;
import com.codingapi.tx.framework.utils.SocketManager;
import com.codingapi.tx.listener.service.TimeOutService;
import com.codingapi.tx.netty.handler.TransactionHandler;
import com.codingapi.tx.netty.service.NettyControlService;
import com.codingapi.tx.netty.service.NettyDistributeService;
Expand Down Expand Up @@ -34,6 +35,9 @@ public class NettyServiceImpl implements NettyService {

@Autowired
private NettyControlService nettyControlService;

@Autowired
private TimeOutService timeOutService;

private EventLoopGroup workerGroup;

Expand All @@ -55,8 +59,11 @@ public synchronized void start() {
int port = Constants.txServer.getPort();
final int heart = Constants.txServer.getHeart();
int delay = Constants.txServer.getDelay();
int autoCompensateLimit = Constants.txServer.getAutoCompensateLimit();

final TransactionHandler transactionHandler = new TransactionHandler(nettyControlService, delay);

timeOutService.loadOutTime(autoCompensateLimit);
workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean saveCompensateMsg(TransactionCompensateMsg transactionCompensateM

final String json = JSON.toJSONString(transactionCompensateMsg);

logger.info("补偿->" + json);
logger.info("Compensate->" + json);

final String compensateKey = compensateDao.saveCompensateMsg(transactionCompensateMsg);

Expand All @@ -83,9 +83,9 @@ public void run() {
requestJson.put("json", json);

String url = configReader.getCompensateNotifyUrl();
logger.error("补偿回调地址->" + url);
logger.error("Compensate Callback Address->" + url);
String res = HttpUtils.postJson(url, requestJson.toJSONString());
logger.error("补偿回调结果->" + res);
logger.error("Compensate Callback Result->" + res);
if (configReader.isCompensateAuto()) {
//自动补偿,是否自动执行补偿
if (res.contains("success")||res.contains("SUCCESS")) {
Expand All @@ -94,7 +94,7 @@ public void run() {
}
}
} catch (Exception e) {
logger.error("补偿回调失败->" + e.getMessage());
logger.error("Compensate Callback Fails->" + e.getMessage());
}
}
});
Expand All @@ -109,16 +109,16 @@ public void run() {

public void autoCompensate(final String compensateKey, TransactionCompensateMsg transactionCompensateMsg) {
final String json = JSON.toJSONString(transactionCompensateMsg);
logger.info("自动补偿->" + json);
logger.info("Auto Compensate->" + json);
//自动补偿业务执行...
final int tryTime = configReader.getCompensateTryTime();
boolean autoExecuteRes = false;
try {
int executeCount = 0;
autoExecuteRes = _executeCompensate(json);
logger.info("自动补偿结果->" + autoExecuteRes + ",json->" + json);
logger.info("Automatic Compensate Result->" + autoExecuteRes + ",json->" + json);
while (!autoExecuteRes) {
logger.info("try补偿(补偿失败,进入补偿队列)->" + autoExecuteRes + ",json->" + json);
logger.info("Compensate Failure, Entering Compensate Queue->" + autoExecuteRes + ",json->" + json);
executeCount++;
if(executeCount==3){
autoExecuteRes = false;
Expand All @@ -138,7 +138,7 @@ public void autoCompensate(final String compensateKey, TransactionCompensateMsg
}

}catch (Exception e){
logger.error("自动补偿失败,msg:"+e.getLocalizedMessage());
logger.error("Auto Compensate Fails,msg:"+e.getLocalizedMessage());
//推送数据给第三方通知
autoExecuteRes = false;
}
Expand All @@ -151,9 +151,9 @@ public void autoCompensate(final String compensateKey, TransactionCompensateMsg
requestJson.put("resState",autoExecuteRes);

String url = configReader.getCompensateNotifyUrl();
logger.error("补偿结果回调地址->" + url);
logger.error("Compensate Result Callback Address->" + url);
String res = HttpUtils.postJson(url, requestJson.toJSONString());
logger.error("补偿结果回调结果->" + res);
logger.error("Compensate Result Callback Result->" + res);

}

Expand Down Expand Up @@ -264,7 +264,7 @@ public void reloadCompensate(TxGroup txGroup) {
}
}

logger.info("加载补偿以后->"+JSON.toJSONString(txGroup));
logger.info("Compensate Loaded->"+JSON.toJSONString(txGroup));
}

private TxGroup getCompensateByGroupId(String groupId) {
Expand All @@ -283,7 +283,7 @@ public boolean executeCompensate(String path) throws ServiceException {

String json = compensateDao.getCompensate(path);
if (json == null) {
throw new ServiceException("不存在该数据");
throw new ServiceException("no data existing");
}

boolean hasOk = _executeCompensate(json);
Expand All @@ -304,7 +304,7 @@ private boolean _executeCompensate(String json) throws ServiceException {

ModelInfo modelInfo = ModelInfoManager.getInstance().getModelByModel(model);
if (modelInfo == null) {
throw new ServiceException("当前模块不在线.");
throw new ServiceException("current model offline.");
}

String data = jsonObject.getString("data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ConfigReader {

@Value("${tm.compensate.tryTime}")
private int compensateTryTime;

@Value("${tm.auto.compensate.limit}")
private int autoCompensateLimit;



Expand Down Expand Up @@ -97,4 +100,10 @@ public boolean isCompensateAuto() {
public int getCompensateTryTime() {
return compensateTryTime;
}

public int getAutoCompensateLimit() {
return autoCompensateLimit;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public TxState getState() {
state.setNotifyUrl(configReader.getCompensateNotifyUrl());
state.setCompensate(configReader.isCompensateAuto());
state.setCompensateTryTime(configReader.getCompensateTryTime());
state.setAutoCompensateLimit(configReader.getAutoCompensateLimit());
state.setSlbList(getServices());
return state;
}
Expand Down
13 changes: 13 additions & 0 deletions tx-manager/src/main/java/com/codingapi/tm/model/TxServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class TxServer {
private int port;
private int heart;
private int delay;
private int autoCompensateLimit;

public static TxServer format(TxState state) {
TxServer txServer = new TxServer();
Expand Down Expand Up @@ -52,4 +53,16 @@ public int getDelay() {
public void setDelay(int delay) {
this.delay = delay;
}


public int getAutoCompensateLimit() {
return autoCompensateLimit;
}


public void setAutoCompensateLimit(int autoCompensateLimit) {
this.autoCompensateLimit = autoCompensateLimit;
}


}
15 changes: 15 additions & 0 deletions tx-manager/src/main/java/com/codingapi/tm/model/TxState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public class TxState {
* slb list
*/
private List<String> slbList;

/**
* 自动补偿间隔时间
*/
private int autoCompensateLimit;


public String getIp() {
Expand Down Expand Up @@ -152,4 +157,14 @@ public String getNotifyUrl() {
public void setNotifyUrl(String notifyUrl) {
this.notifyUrl = notifyUrl;
}

public int getAutoCompensateLimit() {
return autoCompensateLimit;
}

public void setAutoCompensateLimit(int autoCompensateLimit) {
this.autoCompensateLimit = autoCompensateLimit;
}


}
3 changes: 3 additions & 0 deletions tx-manager/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ tm.compensate.notifyUrl=http://ip:port/path
#补偿失败,再次尝试间隔(秒),最大尝试次数3次,当超过3次即为补偿失败。
tm.compensate.tryTime=30

#各组件自动补偿的时间上限
tm.auto.compensate.limit=20




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void transaction() throws SQLException {

//start 结束就是全部事务的结束表示,考虑start挂掉的情况
Timer timer = new Timer();
logger.info("maxOutTime:" + getMaxOutTime());
timer.schedule(new TimerTask() {
@Override
public void run() {
Expand Down