Skip to content

Commit

Permalink
#1055 Refactor the cluster related part to support more clustering ty…
Browse files Browse the repository at this point in the history
…pe (#1056)

* #1055 Refactor the cluster related part to support more clustering type

* #1055  enableAlert miss

* #1055 checkClusterConfig before start cluster helper

* #1055 add changeable rootPath

* #1055 change alert logic

* #1055 delete line
  • Loading branch information
sunsun314 authored and yanhuqing666 committed Mar 20, 2019
1 parent 5323816 commit 58c3aca
Show file tree
Hide file tree
Showing 58 changed files with 13,857 additions and 1,383 deletions.
2 changes: 1 addition & 1 deletion dble_checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

<module name="UncommentedMain">
<property name="excludedClasses"
value="(com\.actiontech\.dble\.DbleStartup)|(com\.actiontech\.dble\.config\.loader\.zkprocess\.xmltozk\.XmltoZkMain)|(com\.actiontech\.dble\.util\.DecryptUtil)|(com\.actiontech\.dble\.util\.dataMigrator\.DataMigrator)|(com\.actiontech\.dble\.config\.loader\.ucoreprocess\.XmltoUcore)"/>
value="(com\.actiontech\.dble\.DbleStartup)|(com\.actiontech\.dble\.config\.loader\.zkprocess\.xmltozk\.XmltoZkMain)|(com\.actiontech\.dble\.util\.DecryptUtil)|(com\.actiontech\.dble\.util\.dataMigrator\.DataMigrator)|(com\.actiontech\.dble\.cluster\.xmltoKv\.XmltoCluster)"/>
</module>
<module name="Indentation">
<property name="arrayInitIndent" value="8"/>
Expand Down
2 changes: 2 additions & 0 deletions dble_checkstyle_suppression.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
<suppress checks=".*" files="StructureMeta.java"/>
<suppress checks=".*" files="UcoreInterface.java"/>
<suppress checks=".*" files="UcoreGrpc.java"/>
<suppress checks=".*" files="UshardInterface.java"/>
<suppress checks=".*" files="DbleClusterGrpc.java"/>
<suppress checks="Indentation" files="MyTime.java"/>
<suppress checks="CyclomaticComplexity" files="MyTime.java"/>
<suppress checks="CyclomaticComplexity" files="TimSort.java"/>
Expand Down
3 changes: 3 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$TableMeta$Builder"/>
</Match>
<Match>
<Package name="com.actiontech.dble.cluster.impl.ushard"/>
</Match>
<Match>
<Package name="com.actiontech.dble.alarm"/>
</Match>
Expand Down
18 changes: 11 additions & 7 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.buffer.DirectByteBufferPool;
import com.actiontech.dble.cache.CacheService;
import com.actiontech.dble.cluster.ClusterGeneralConfig;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.ServerConfig;
import com.actiontech.dble.config.loader.ucoreprocess.UcoreConfig;
import com.actiontech.dble.config.loader.zkprocess.comm.ZkConfig;
import com.actiontech.dble.config.model.SchemaConfig;
import com.actiontech.dble.config.model.SystemConfig;
Expand Down Expand Up @@ -75,6 +75,7 @@ public final class DbleServer {
private static final long DEFAULT_OLD_CONNECTION_CLEAR_PERIOD = 5 * 1000L;

private static final DbleServer INSTANCE = new DbleServer();

private static final Logger LOGGER = LoggerFactory.getLogger("Server");
private AtomicBoolean backupLocked;

Expand Down Expand Up @@ -171,8 +172,8 @@ public String genXaTxId() {
id.append("'" + NAME + "Server.");
if (isUseZK()) {
id.append(ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
} else if (isUseUcore()) {
id.append(UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
} else if (isUseGeneralCluster()) {
id.append(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
} else {
id.append(this.getConfig().getSystem().getServerNodeId());

Expand Down Expand Up @@ -363,6 +364,7 @@ public void startup() throws IOException {
if (system.getEnableSlowLog() == 1) {
SlowQueryLog.getInstance().setEnableSlowLog(true);
}
AlertUtil.initAlert();
if (system.getEnableAlert() == 1) {
AlertUtil.switchAlert(true);
}
Expand Down Expand Up @@ -426,7 +428,7 @@ public Thread newThread(Runnable r) {

userManager.initForLatest(config.getUsers(), system.getMaxCon());

if (isUseUcore()) {
if (isUseGeneralCluster()) {
try {
OnlineLockStatus.getInstance().metaUcoreInit(true);
} catch (Exception e) {
Expand Down Expand Up @@ -694,11 +696,12 @@ private boolean isUseZkSwitch() {
}

public boolean isUseZK() {
return ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) != null;
return ClusterGeneralConfig.getInstance().isUseCluster() && ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) != null;
}

public boolean isUseUcore() {
return UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) != null;
public boolean isUseGeneralCluster() {
return ClusterGeneralConfig.getInstance().isUseCluster() &&
ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) == null;
}

public TxnLogProcessor getTxnLogProcessor() {
Expand Down Expand Up @@ -1081,4 +1084,5 @@ public FrontendUserManager getUserManager() {
return userManager;
}


}
24 changes: 17 additions & 7 deletions src/main/java/com/actiontech/dble/alarm/AlertUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

package com.actiontech.dble.alarm;

import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.ucoreprocess.UcoreConfig;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.cluster.ClusterController;
import com.actiontech.dble.cluster.ClusterGeneralConfig;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -26,7 +27,12 @@ private AlertUtil() {

public static void switchAlert(boolean enableAlert) {
isEnable = enableAlert;
if (enableAlert && UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) != null) {
}

public static void initAlert() {
if (DbleServer.getInstance().isUseGeneralCluster() &&
(ClusterController.CONFIG_MODE_UCORE.equals(ClusterGeneralConfig.getInstance().getClusterType()) ||
ClusterController.CONFIG_MODE_USHARD.equals(ClusterGeneralConfig.getInstance().getClusterType()))) {
alert = UcoreAlert.getInstance();
} else {
alert = DEFAULT_ALERT;
Expand All @@ -38,19 +44,23 @@ public static boolean isEnable() {
}

public static void alertSelf(String code, Alert.AlertLevel level, String desc, Map<String, String> labels) {
alert.alertSelf(code, level, desc, labels);
if (isEnable) {
alert.alertSelf(code, level, desc, labels);
}
}

public static void alert(String code, Alert.AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map<String, String> labels) {
alert.alert(code, level, desc, alertComponentType, alertComponentId, labels);
if (isEnable) {
alert.alert(code, level, desc, alertComponentType, alertComponentId, labels);
}
}

public static boolean alertResolve(String code, Alert.AlertLevel level, String alertComponentType, String alertComponentId, Map<String, String> labels) {
return alert.alertResolve(code, level, alertComponentType, alertComponentId, labels);
return isEnable ? alert.alertResolve(code, level, alertComponentType, alertComponentId, labels) : true;
}

public static boolean alertSelfResolve(String code, Alert.AlertLevel level, Map<String, String> labels) {
return alert.alertSelfResolve(code, level, labels);
return isEnable ? alert.alertSelfResolve(code, level, labels) : true;
}

public static Map<String, String> genSingleLabel(String key, String value) {
Expand Down
61 changes: 29 additions & 32 deletions src/main/java/com/actiontech/dble/alarm/UcoreAlert.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

package com.actiontech.dble.alarm;

import com.actiontech.dble.cluster.ClusterGeneralConfig;
import com.actiontech.dble.cluster.ClusterHelper;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.ucoreprocess.ClusterUcoreSender;
import com.actiontech.dble.config.loader.ucoreprocess.UcoreConfig;
import com.actiontech.dble.cluster.bean.ClusterAlertBean;

import java.util.Map;

public final class UcoreAlert implements Alert {
private static final String SOURCE_COMPONENT_TYPE = "dble";
private static final String SERVER_ID = UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID);
private static final String SOURCE_COMPONENT_ID = UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID);

private static final UcoreAlert INSTANCE = new UcoreAlert();

Expand All @@ -28,50 +27,48 @@ private UcoreAlert() {

@Override
public void alertSelf(String code, AlertLevel level, String desc, Map<String, String> labels) {
alert(code, level, desc, SOURCE_COMPONENT_TYPE, SOURCE_COMPONENT_ID, labels);
alert(code, level, desc, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels);
}

@Override
public void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map<String, String> labels) {
UcoreInterface.AlertInput.Builder builder = UcoreInterface.AlertInput.newBuilder().
setCode(code).
setDesc(desc).
setLevel(level.toString()).
setSourceComponentType(SOURCE_COMPONENT_TYPE).
setSourceComponentId(SOURCE_COMPONENT_ID).
setAlertComponentId(alertComponentId).
setAlertComponentType(alertComponentType).
setServerId(SERVER_ID).
setTimestampUnix(System.currentTimeMillis() * 1000000);
ClusterAlertBean alert = new ClusterAlertBean();
alert.setCode(code);
alert.setDesc(desc);
alert.setLevel(level.toString());
alert.setSourceComponentType(SOURCE_COMPONENT_TYPE);
alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
alert.setAlertComponentId(alertComponentId);
alert.setAlertComponentType(alertComponentType);
alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID));
alert.setTimestampUnix(System.currentTimeMillis() * 1000000);
if (labels != null) {
builder.putAllLabels(labels);
alert.setLabels(labels);
}
UcoreInterface.AlertInput input = builder.build();
ClusterUcoreSender.alert(input);
ClusterHelper.alert(alert);
}

@Override
public boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map<String, String> labels) {
UcoreInterface.AlertInput.Builder builder = UcoreInterface.AlertInput.newBuilder().
setCode(code).
setDesc("").
setLevel(level.toString()).
setSourceComponentType(SOURCE_COMPONENT_TYPE).
setSourceComponentId(SOURCE_COMPONENT_ID).
setAlertComponentId(alertComponentId).
setAlertComponentType(alertComponentType).
setServerId(SERVER_ID).
setResolveTimestampUnix(System.currentTimeMillis() * 1000000);
ClusterAlertBean alert = new ClusterAlertBean();
alert.setCode(code);
alert.setDesc("");
alert.setLevel(level.toString());
alert.setSourceComponentType(SOURCE_COMPONENT_TYPE);
alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
alert.setAlertComponentId(alertComponentId);
alert.setAlertComponentType(alertComponentType);
alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID));
alert.setResolveTimestampUnix(System.currentTimeMillis() * 1000000);
if (labels != null) {
builder.putAllLabels(labels);
alert.setLabels(labels);
}
UcoreInterface.AlertInput input = builder.build();
return ClusterUcoreSender.alertResolve(input);
return ClusterHelper.alertResolve(alert);
}

@Override
public boolean alertSelfResolve(String code, AlertLevel level, Map<String, String> labels) {
return alertResolve(code, level, SOURCE_COMPONENT_TYPE, SOURCE_COMPONENT_ID, labels);
return alertResolve(code, level, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.ucoreprocess.ClusterUcoreSender;
import com.actiontech.dble.config.loader.ucoreprocess.UDistributeLock;
import com.actiontech.dble.config.loader.ucoreprocess.UcoreConfig;
import com.actiontech.dble.config.loader.ucoreprocess.UcorePathUtil;
import com.actiontech.dble.config.loader.ucoreprocess.bean.UKvBean;
import com.actiontech.dble.cluster.*;
import com.actiontech.dble.cluster.bean.KvBean;
import com.actiontech.dble.cluster.ClusterPathUtil;
import com.actiontech.dble.config.model.SchemaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -51,8 +48,8 @@ public CKVStoreRepository() {

@Override
public void init() {
List<UKvBean> allList = ClusterUcoreSender.getKeyTree(UcorePathUtil.getViewPath());
for (UKvBean bean : allList) {
List<KvBean> allList = ClusterHelper.getKVPath(ClusterPathUtil.getViewPath());
for (KvBean bean : allList) {
String[] key = bean.getKey().split("/");
if (key.length == 5) {
String[] value = key[key.length - 1].split(SCHEMA_VIEW_SPLIT);
Expand Down Expand Up @@ -81,14 +78,14 @@ public void terminate() {
public void put(String schemaName, String viewName, String createSql) {
Map<String, String> schemaMap = viewCreateSqlMap.get(schemaName);

StringBuffer sb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer sb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(schemaName).append(SCHEMA_VIEW_SPLIT).append(viewName);
StringBuffer lsb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer lsb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(LOCK).append(SEPARATOR).append(schemaName).append(SCHEMA_VIEW_SPLIT).append(viewName);
StringBuffer nsb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer nsb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(UPDATE).append(SEPARATOR).append(schemaName).append(SCHEMA_VIEW_SPLIT).append(viewName);
UDistributeLock distributeLock = new UDistributeLock(lsb.toString(),
UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + UPDATE);
DistributeLock distributeLock = new DistributeLock(lsb.toString(),
ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + UPDATE);


try {
Expand All @@ -102,14 +99,14 @@ public void put(String schemaName, String viewName, String createSql) {

ClusterDelayProvider.delayAfterGetLock();
schemaMap.put(viewName, createSql);
ClusterUcoreSender.sendDataToUcore(sb.toString(), createSql);
ClusterHelper.setKV(sb.toString(), createSql);
ClusterDelayProvider.delayAfterViewSetKey();
ClusterUcoreSender.sendDataToUcore(nsb.toString(), UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + UPDATE);
ClusterHelper.setKV(nsb.toString(), ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + UPDATE);
ClusterDelayProvider.delayAfterViewNotic();
//self reponse set
ClusterUcoreSender.sendDataToUcore(nsb.toString() + UcorePathUtil.SEPARATOR + UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), UcorePathUtil.SUCCESS);
ClusterHelper.setKV(nsb.toString() + ClusterPathUtil.SEPARATOR + ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), ClusterPathUtil.SUCCESS);

String errorMsg = ClusterUcoreSender.waitingForAllTheNode(UcorePathUtil.SUCCESS, nsb.toString() + SEPARATOR);
String errorMsg = ClusterHelper.waitingForAllTheNode(ClusterPathUtil.SUCCESS, nsb.toString() + SEPARATOR);

if (errorMsg != null) {
throw new RuntimeException(errorMsg);
Expand All @@ -124,7 +121,7 @@ public void put(String schemaName, String viewName, String createSql) {
throw new RuntimeException(e);
} finally {
ClusterDelayProvider.beforeDeleteViewNotic();
ClusterUcoreSender.deleteKVTree(nsb.toString() + SEPARATOR);
ClusterHelper.cleanPath(nsb.toString() + SEPARATOR);
ClusterDelayProvider.beforeReleaseViewLock();
distributeLock.release();
}
Expand All @@ -142,15 +139,15 @@ public void put(String schemaName, String viewName, String createSql) {
*/
@Override
public void delete(String schemaName, String view) {
StringBuffer sb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer sb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(schemaName).append(SCHEMA_VIEW_SPLIT).append(view);
StringBuffer nsb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer nsb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(UPDATE).append(SEPARATOR).
append(schemaName).append(SCHEMA_VIEW_SPLIT).append(view);
StringBuffer lsb = new StringBuffer().append(UcorePathUtil.getViewPath()).
StringBuffer lsb = new StringBuffer().append(ClusterPathUtil.getViewPath()).
append(SEPARATOR).append(LOCK).append(SEPARATOR).append(schemaName).append(SCHEMA_VIEW_SPLIT).append(view);
UDistributeLock distributeLock = new UDistributeLock(lsb.toString(),
UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + DELETE);
DistributeLock distributeLock = new DistributeLock(lsb.toString(),
ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + DELETE);

try {
viewCreateSqlMap.get(schemaName).remove(view);
Expand All @@ -162,15 +159,15 @@ public void delete(String schemaName, String view) {
}
}
ClusterDelayProvider.delayAfterGetLock();
ClusterUcoreSender.deleteKV(sb.toString());
ClusterHelper.cleanKV(sb.toString());
ClusterDelayProvider.delayAfterViewSetKey();
ClusterUcoreSender.sendDataToUcore(nsb.toString(), UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + DELETE);
ClusterHelper.setKV(nsb.toString(), ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID) + SCHEMA_VIEW_SPLIT + DELETE);
ClusterDelayProvider.delayAfterViewNotic();

//self reponse set
ClusterUcoreSender.sendDataToUcore(nsb.toString() + UcorePathUtil.SEPARATOR + UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), UcorePathUtil.SUCCESS);
ClusterHelper.setKV(nsb.toString() + ClusterPathUtil.SEPARATOR + ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), ClusterPathUtil.SUCCESS);

String errorMsg = ClusterUcoreSender.waitingForAllTheNode(UcorePathUtil.SUCCESS, nsb.toString() + SEPARATOR);
String errorMsg = ClusterHelper.waitingForAllTheNode(ClusterPathUtil.SUCCESS, nsb.toString() + SEPARATOR);

if (errorMsg != null) {
throw new RuntimeException(errorMsg);
Expand All @@ -184,7 +181,7 @@ public void delete(String schemaName, String view) {
throw new RuntimeException(e);
} finally {
ClusterDelayProvider.beforeDeleteViewNotic();
ClusterUcoreSender.deleteKVTree(nsb.toString() + SEPARATOR);
ClusterHelper.cleanPath(nsb.toString() + SEPARATOR);
ClusterDelayProvider.beforeReleaseViewLock();
distributeLock.release();
}
Expand Down
Loading

0 comments on commit 58c3aca

Please sign in to comment.