From 5e2169fd74619cf4b73ad7603068622ad3dc5de0 Mon Sep 17 00:00:00 2001 From: zanglei Date: Mon, 31 Dec 2018 15:12:35 +0800 Subject: [PATCH] MOD: update common config --- .gitignore | 1 + carrera-chronos/pom.xml | 12 +- carrera-common/pom.xml | 42 +++---- .../com/xiaojukeji/carrera/biz/ZkService.java | 24 +--- .../xiaojukeji/carrera/biz/ZkServiceImpl.java | 107 +--------------- .../carrera/config/v4/BrokerConfig.java | 20 +-- .../carrera/config/v4/CProxyConfig.java | 44 +------ .../carrera/config/v4/GroupConfig.java | 85 ++----------- .../carrera/config/v4/HostRegionConfig.java | 31 ----- .../config/v4/MonitorAssignedConfig.java | 23 ---- .../carrera/config/v4/PProxyConfig.java | 14 +-- .../carrera/config/v4/TopicConfig.java | 37 ------ .../config/v4/cproxy/UpstreamTopic.java | 39 +----- .../v4/pproxy/CarreraConfiguration.java | 24 ---- .../v4/pproxy/ParamLengthConfiguration.java | 48 -------- .../v4/pproxy/RateLimitConfiguration.java | 38 ------ .../config/v4/pproxy/TopicConfiguration.java | 21 +--- .../xiaojukeji/carrera/utils/HttpUtils.java | 114 +----------------- .../carrera/console/service/IdcService.java | 12 -- .../console/service/ZKV4ConfigService.java | 2 +- .../impl/ConsumeSubscriptionServiceImpl.java | 2 +- .../console/service/impl/IdcServiceImpl.java | 30 ----- .../service/impl/TopicServiceImpl.java | 58 +-------- .../service/impl/ZKV4ConfigServiceImpl.java | 101 ++-------------- carrera-consumer/pom.xml | 50 +------- .../cproxy/actions/hdfs/DataFileManager.java | 20 +-- .../actions/http/CarreraAsyncRequest.java | 45 +------ .../actions/util/CarreraProducerManager.java | 67 ---------- .../AbstractCarreraRocketMqConsumer.java | 10 +- .../cproxy/consumer/BaseCarreraConsumer.java | 5 +- .../cproxy/consumer/CarreraConsumer.java | 16 ++- .../cproxy/consumer/CarreraKafkaConsumer.java | 4 +- .../consumer/CarreraNewRocketMqConsumer.java | 4 +- .../consumer/CarreraRocketMqConsumer.java | 4 +- .../cproxy/consumer/ConfigManager.java | 25 +--- .../consumer/LowLevelKafkaConsumer.java | 2 +- .../carrera/cproxy/proxy/ProxyApp.java | 3 - .../carrera/cproxy/utils/ConfigUtils.java | 13 -- carrera-docker/Dockerfile | 7 +- carrera-docker/README.md | 3 +- carrera-docker/README_CN.md | 3 +- .../pproxy/producer/CarreraRequest.java | 8 +- .../carrera/pproxy/producer/ProducerPool.java | 2 +- .../SimpleCarreraConsumerExample.java | 2 +- .../java/carrera-producer-sdk-example/pom.xml | 2 +- .../java/carrera-producer-sdk/pom.xml | 2 +- 46 files changed, 120 insertions(+), 1106 deletions(-) delete mode 100644 carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/HostRegionConfig.java delete mode 100644 carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/MonitorAssignedConfig.java delete mode 100644 carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/ParamLengthConfiguration.java delete mode 100644 carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/RateLimitConfiguration.java delete mode 100644 carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/IdcService.java delete mode 100644 carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/IdcServiceImpl.java delete mode 100644 carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/util/CarreraProducerManager.java diff --git a/.gitignore b/.gitignore index 0b4f687..3de96fb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ **/output **/logs *.idea +*.DS_Store diff --git a/carrera-chronos/pom.xml b/carrera-chronos/pom.xml index 7b09819..4a9d527 100644 --- a/carrera-chronos/pom.xml +++ b/carrera-chronos/pom.xml @@ -35,6 +35,11 @@ 4.12 test + + com.lmax + disruptor + 3.3.6 + org.yaml snakeyaml @@ -45,11 +50,6 @@ slf4j-api 1.7.21 - - com.lmax - disruptor - 3.3.6 - org.apache.commons commons-lang3 @@ -103,7 +103,7 @@ org.apache.zookeeper zookeeper - 3.4.5 + 3.4.9 org.apache.curator diff --git a/carrera-common/pom.xml b/carrera-common/pom.xml index 1cb3882..2b20e8d 100644 --- a/carrera-common/pom.xml +++ b/carrera-common/pom.xml @@ -67,21 +67,21 @@ slf4j-api ${slf4j-version} - - org.apache.logging.log4j - log4j-api - ${log4j-version} - - - org.apache.logging.log4j - log4j-core - ${log4j-version} - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j-version} - + + + + + + + + + + + + + + + @@ -120,18 +120,6 @@ guava ${guava-version} - - junit - junit - 4.12 - test - - - org.apache.commons - commons-math3 - 3.6.1 - test - org.apache.thrift diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkService.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkService.java index 424d34e..6e19cf1 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkService.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkService.java @@ -1,12 +1,6 @@ package com.xiaojukeji.carrera.biz; -import com.xiaojukeji.carrera.config.v4.BrokerConfig; -import com.xiaojukeji.carrera.config.v4.CProxyConfig; -import com.xiaojukeji.carrera.config.v4.GroupConfig; -import com.xiaojukeji.carrera.config.v4.HostRegionConfig; -import com.xiaojukeji.carrera.config.v4.MonitorAssignedConfig; -import com.xiaojukeji.carrera.config.v4.PProxyConfig; -import com.xiaojukeji.carrera.config.v4.TopicConfig; +import com.xiaojukeji.carrera.config.v4.*; import com.xiaojukeji.carrera.dynamic.ParameterDynamicZookeeper; import java.util.List; @@ -45,26 +39,14 @@ public interface ZkService { void getAndWatchBroker(ParameterDynamicZookeeper.DataChangeCallback callback) throws Exception; - List getAllTopic(); - TopicConfig getTopic(String topic); - List getAllGroup(); - GroupConfig getGroup(String group); PProxyConfig getPProxy(String instance); - List getAllPProxy(); - CProxyConfig getCProxy(String instance); - List getAllCProxy(); - - List getAllBroker(); - - BrokerConfig getBroker(String brokerCluster); - boolean createOrUpdateTopic(TopicConfig config) throws Exception; boolean deleteTopic(String topic); @@ -85,10 +67,6 @@ public interface ZkService { boolean deleteBroker(String brokerCluster); - boolean createOrUpdateMonitorHost(String host, HostRegionConfig config); - - boolean createOrUpdateMonitorAssigned(String broker, MonitorAssignedConfig config); - List getChildren(String path); } \ No newline at end of file diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkServiceImpl.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkServiceImpl.java index e8677f0..eda9c18 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkServiceImpl.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/biz/ZkServiceImpl.java @@ -1,18 +1,10 @@ package com.xiaojukeji.carrera.biz; -import com.google.common.collect.Lists; -import com.xiaojukeji.carrera.config.v4.BrokerConfig; -import com.xiaojukeji.carrera.config.v4.CProxyConfig; -import com.xiaojukeji.carrera.config.v4.GroupConfig; -import com.xiaojukeji.carrera.config.v4.HostRegionConfig; -import com.xiaojukeji.carrera.config.v4.MonitorAssignedConfig; -import com.xiaojukeji.carrera.config.v4.PProxyConfig; -import com.xiaojukeji.carrera.config.v4.TopicConfig; +import com.xiaojukeji.carrera.config.v4.*; import com.xiaojukeji.carrera.dynamic.ParameterDynamicConfig; import com.xiaojukeji.carrera.dynamic.ParameterDynamicZookeeper; import com.xiaojukeji.carrera.utils.CommonFastJsonUtils; import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +20,7 @@ public class ZkServiceImpl implements ZkService { private ParameterDynamicZookeeper zkService; /** - * @param zkHost ip:port,ip:port + * @param zkHost ip:port,ip:port * @param isConfigCentre * @throws Exception */ @@ -101,101 +93,26 @@ public void getAndWatchBroker(ParameterDynamicZookeeper.DataChangeCallback getAllTopic() { - List allTopic = Lists.newArrayList(); - List topicList = zkService.getChildren(CARRERA_TOPIC); - if (CollectionUtils.isEmpty(topicList)) { - return allTopic; - } - for (String topicName : topicList) { - allTopic.add(getZkData(getTopicPath(topicName), TopicConfig.class)); - } - - return allTopic; - } - @Override public TopicConfig getTopic(String topic) { return getZkData(getTopicPath(topic), TopicConfig.class); } - @Override - public List getAllGroup() { - List allGroup = Lists.newArrayList(); - List groupList = zkService.getChildren(CARRERA_GROUP); - if (CollectionUtils.isEmpty(groupList)) { - return allGroup; - } - for (String groupName : groupList) { - allGroup.add(getZkData(getGroupPath(groupName), GroupConfig.class)); - } - - return allGroup; - } - @Override public GroupConfig getGroup(String group) { return getZkData(getGroupPath(group), GroupConfig.class); } - @Override - public List getAllPProxy() { - List allPProxy = Lists.newArrayList(); - List childList = zkService.getChildren(CARRERA_PPROXY); - if (CollectionUtils.isEmpty(childList)) { - return allPProxy; - } - for (String childPath : childList) { - allPProxy.add(getZkData(getProxyPath(CARRERA_PPROXY, childPath), PProxyConfig.class)); - } - - return allPProxy; - } - @Override public PProxyConfig getPProxy(String instance) { return getZkData(getProxyPath(CARRERA_PPROXY, instance), PProxyConfig.class); } - @Override - public List getAllCProxy() { - List allCProxy = Lists.newArrayList(); - List childList = zkService.getChildren(CARRERA_CPROXY); - if (CollectionUtils.isEmpty(childList)) { - return allCProxy; - } - for (String childPath : childList) { - allCProxy.add(getZkData(getProxyPath(CARRERA_CPROXY, childPath), CProxyConfig.class)); - } - - return allCProxy; - } - @Override public CProxyConfig getCProxy(String instance) { return getZkData(getProxyPath(CARRERA_CPROXY, instance), CProxyConfig.class); } - @Override - public List getAllBroker() { - List allBroker = Lists.newArrayList(); - List childList = zkService.getChildren(CARRERA_BROKER); - if (CollectionUtils.isEmpty(childList)) { - return allBroker; - } - for (String childPath : childList) { - allBroker.add(getZkData(getBrokerPath(childPath), BrokerConfig.class)); - } - - return allBroker; - } - - @Override - public BrokerConfig getBroker(String brokerCluster) { - return getZkData(getBrokerPath(brokerCluster), BrokerConfig.class); - } - private String getTopicPath(String topic) { return CARRERA_TOPIC + "/" + topic; } @@ -212,14 +129,6 @@ private String getBrokerPath(String broker) { return CARRERA_BROKER + "/" + broker; } - private String getMonitorHostPath(String host) { - return CARRERA_MONITHOR_HOST + "/" + host; - } - - private String getMonitorAssignedPath(String broken) { - return CARRERA_MONITOR_ASSIGNED + "/" + broken; - } - @Override public boolean createOrUpdateTopic(TopicConfig config) throws Exception { if (!config.validate()) { @@ -323,18 +232,6 @@ public boolean deleteBroker(String brokerCluster) { return true; } - @Override - public boolean createOrUpdateMonitorHost(String host, HostRegionConfig config) { - setZkData(getMonitorHostPath(host), config); - return true; - } - - @Override - public boolean createOrUpdateMonitorAssigned(String broker, MonitorAssignedConfig config) { - setZkData(getMonitorAssignedPath(broker), config); - return true; - } - @Override public List getChildren(String path) { return zkService.getChildren(path); diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/BrokerConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/BrokerConfig.java index 096f53c..11476fa 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/BrokerConfig.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/BrokerConfig.java @@ -1,16 +1,15 @@ package com.xiaojukeji.carrera.config.v4; -import java.util.Map; -import java.util.Set; - import com.alibaba.fastjson.TypeReference; import com.xiaojukeji.carrera.config.ConfigurationValidator; import com.xiaojukeji.carrera.utils.CommonFastJsonUtils; import org.apache.commons.lang3.StringUtils; +import java.util.Map; +import java.util.Set; + public class BrokerConfig implements ConfigurationValidator, Cloneable { - private String idc; private String brokerCluster; private String brokerClusterAddrs; private Map/*slave ip:port*/> brokers; @@ -18,14 +17,6 @@ public class BrokerConfig implements ConfigurationValidator, Cloneable { private Map> pproxies; private Map> cproxies; - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public String getBrokerCluster() { return brokerCluster; } @@ -69,7 +60,6 @@ public void setBrokers(Map> brokers) { @Override public String toString() { return "BrokerConfig{" + - "idc='" + idc + '\'' + ", brokerCluster='" + brokerCluster + '\'' + ", brokerClusterAddrs=" + brokerClusterAddrs + ", brokers=" + brokers + @@ -80,9 +70,7 @@ public String toString() { @Override public boolean validate() throws ConfigException { - if (StringUtils.isEmpty(this.idc)) { - throw new ConfigException("[BrokerConfig] idc empty, brokerCluster=" + brokerCluster); - } else if (StringUtils.isEmpty(this.brokerCluster)) { + if (StringUtils.isEmpty(this.brokerCluster)) { throw new ConfigException("[BrokerConfig] brokerCluster empty, brokerCluster=" + brokerCluster); } else if (StringUtils.isEmpty(this.brokerClusterAddrs)) { throw new ConfigException("[BrokerConfig] brokerClusterAddrs empty, brokerCluster=" + brokerCluster); diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/CProxyConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/CProxyConfig.java index 146860f..47c21b4 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/CProxyConfig.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/CProxyConfig.java @@ -1,10 +1,5 @@ package com.xiaojukeji.carrera.config.v4; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.xiaojukeji.carrera.config.ConfigurationValidator; @@ -15,20 +10,22 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class CProxyConfig implements ConfigurationValidator, Cloneable { private String instance; private String proxyCluster; - private String idc; private List brokerClusters; private Map kafkaConfigs; private Map rocketmqConfigs; private ConsumeServerConfiguration thriftServer; - private Map> pproxies; - - private Set groups = Collections.emptySet(); + private Set groups = Collections.emptySet(); /* group white list. */ @Override public boolean validate() throws ConfigException { @@ -37,8 +34,6 @@ public boolean validate() throws ConfigException { throw new ConfigException("[CProxyConfig] instance empty"); } else if (StringUtils.isEmpty(this.proxyCluster)) { throw new ConfigException("[CProxyConfig] proxyCluster empty"); - } else if (StringUtils.isEmpty(this.idc)) { - throw new ConfigException("[CProxyConfig] idc empty"); } else if (CollectionUtils.isEmpty(this.brokerClusters)) { throw new ConfigException("[CProxyConfig] brokerClusters empty"); } @@ -89,11 +84,6 @@ public CProxyConfig clone() { if (groups != null) { cProxyConfig.setGroups(Sets.newHashSet(groups)); } - if (this.pproxies != null) { - Map> pproxiesConfig = Maps.newHashMap(); - this.pproxies.forEach((bk, conf) -> pproxiesConfig.put(bk, Sets.newHashSet(conf))); - cProxyConfig.setPproxies(pproxiesConfig); - } return cProxyConfig; } @@ -106,14 +96,6 @@ public void setInstance(String instance) { this.instance = instance; } - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public List getBrokerClusters() { return brokerClusters; } @@ -162,25 +144,15 @@ public void setThriftServer(ConsumeServerConfiguration thriftServer) { this.thriftServer = thriftServer; } - public Map> getPproxies() { - return pproxies; - } - - public void setPproxies(Map> pproxies) { - this.pproxies = pproxies; - } - @Override public String toString() { return "CProxyConfig{" + "instance='" + instance + '\'' + ", proxyCluster='" + proxyCluster + '\'' + - ", idc='" + idc + '\'' + ", brokerClusters=" + brokerClusters + ", kafkaConfigs=" + kafkaConfigs + ", rocketmqConfigs=" + rocketmqConfigs + ", thriftServer=" + thriftServer + - ", pproxies=" + pproxies + ", groups=" + groups + '}'; } @@ -194,14 +166,12 @@ public boolean equals(Object o) { if (instance != null ? !instance.equals(that.instance) : that.instance != null) return false; if (proxyCluster != null ? !proxyCluster.equals(that.proxyCluster) : that.proxyCluster != null) return false; - if (idc != null ? !idc.equals(that.idc) : that.idc != null) return false; if (brokerClusters != null ? !brokerClusters.equals(that.brokerClusters) : that.brokerClusters != null) return false; if (kafkaConfigs != null ? !kafkaConfigs.equals(that.kafkaConfigs) : that.kafkaConfigs != null) return false; if (rocketmqConfigs != null ? !rocketmqConfigs.equals(that.rocketmqConfigs) : that.rocketmqConfigs != null) return false; if (thriftServer != null ? !thriftServer.equals(that.thriftServer) : that.thriftServer != null) return false; - if (pproxies != null ? !pproxies.equals(that.pproxies) : that.pproxies != null) return false; return groups != null ? groups.equals(that.groups) : that.groups == null; } @@ -209,12 +179,10 @@ public boolean equals(Object o) { public int hashCode() { int result = instance != null ? instance.hashCode() : 0; result = 31 * result + (proxyCluster != null ? proxyCluster.hashCode() : 0); - result = 31 * result + (idc != null ? idc.hashCode() : 0); result = 31 * result + (brokerClusters != null ? brokerClusters.hashCode() : 0); result = 31 * result + (kafkaConfigs != null ? kafkaConfigs.hashCode() : 0); result = 31 * result + (rocketmqConfigs != null ? rocketmqConfigs.hashCode() : 0); result = 31 * result + (thriftServer != null ? thriftServer.hashCode() : 0); - result = 31 * result + (pproxies != null ? pproxies.hashCode() : 0); result = 31 * result + (groups != null ? groups.hashCode() : 0); return result; } diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/GroupConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/GroupConfig.java index 1f40ea7..6dec503 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/GroupConfig.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/GroupConfig.java @@ -1,21 +1,19 @@ package com.xiaojukeji.carrera.config.v4; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.regex.Pattern; -import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.xiaojukeji.carrera.config.ConfigurationValidator; -import com.xiaojukeji.carrera.utils.PropertyUtils; import com.xiaojukeji.carrera.config.v4.cproxy.RedisConfiguration; import com.xiaojukeji.carrera.config.v4.cproxy.UpstreamTopic; +import com.xiaojukeji.carrera.utils.PropertyUtils; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + public class GroupConfig implements ConfigurationValidator, Cloneable { private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; @@ -26,26 +24,15 @@ public class GroupConfig implements ConfigurationValidator, Cloneable { private List topics; private int asyncThreads = 8; - // these configs are optional - private RedisConfiguration redisConfig; // can be set by redisConfigStr + private RedisConfiguration redisConfig; private List alarmGroup; private boolean enableAlarm = true; private long delayTimeThreshold = 300000L; private long committedLagThreshold = 10000L; - private String consoleAlarmAddr; - - private boolean broadcast = false; private int delayRequestHandlerThreads = -1; - private ConsumeMode consumeMode = ConsumeMode.CROSS_IDC; - private Map> consumeModeMapper; - - public enum ConsumeMode { - SAME_IDC, CROSS_IDC, OTHER - } - public GroupConfig() { } @@ -62,12 +49,6 @@ public void setRedisConfig(RedisConfiguration redisConfig) { this.redisConfig = redisConfig; } - public void setRedisConfigStr(String redisConfigStr) { - if (StringUtils.isNotBlank(redisConfigStr)) { - redisConfig = JSON.parseObject(redisConfigStr, RedisConfiguration.class); - } - } - public String getGroup() { return group; } @@ -124,38 +105,6 @@ public void setAlarmGroup(List alarmGroup) { this.alarmGroup = alarmGroup; } - public boolean isBroadcast() { - return broadcast; - } - - public void setBroadcast(boolean broadcast) { - this.broadcast = broadcast; - } - - public ConsumeMode getConsumeMode() { - return consumeMode; - } - - public void setConsumeMode(ConsumeMode consumeMode) { - this.consumeMode = consumeMode; - } - - public Map> getConsumeModeMapper() { - return consumeModeMapper; - } - - public void setConsumeModeMapper(Map> consumeModeMapper) { - this.consumeModeMapper = consumeModeMapper; - } - - public String getConsoleAlarmAddr() { - return consoleAlarmAddr; - } - - public void setConsoleAlarmAddr(String consoleAlarmAddr) { - this.consoleAlarmAddr = consoleAlarmAddr; - } - public int getDelayRequestHandlerThreads() { return delayRequestHandlerThreads; } @@ -173,15 +122,11 @@ public boolean equals(Object o) { enableAlarm == that.enableAlarm && delayTimeThreshold == that.delayTimeThreshold && committedLagThreshold == that.committedLagThreshold && - broadcast == that.broadcast && delayRequestHandlerThreads == that.delayRequestHandlerThreads && Objects.equals(group, that.group) && Objects.equals(topics, that.topics) && Objects.equals(redisConfig, that.redisConfig) && - Objects.equals(alarmGroup, that.alarmGroup) && - Objects.equals(consoleAlarmAddr, that.consoleAlarmAddr) && - consumeMode == that.consumeMode && - Objects.equals(consumeModeMapper, that.consumeModeMapper); + Objects.equals(alarmGroup, that.alarmGroup); } public boolean bizEquals(Object o) { @@ -189,12 +134,9 @@ public boolean bizEquals(Object o) { if (o == null || getClass() != o.getClass()) return false; GroupConfig that = (GroupConfig) o; boolean ret = asyncThreads == that.asyncThreads && - broadcast == that.broadcast && delayRequestHandlerThreads == that.delayRequestHandlerThreads && Objects.equals(group, that.group) && - Objects.equals(redisConfig, that.redisConfig) && - consumeMode == that.consumeMode && - Objects.equals(consumeModeMapper, that.consumeModeMapper); + Objects.equals(redisConfig, that.redisConfig); if (!ret) return false; if (that.topics.size() != topics.size()) @@ -216,7 +158,7 @@ public boolean bizEquals(Object o) { @Override public int hashCode() { - return Objects.hash(group, topics, asyncThreads, redisConfig, alarmGroup, enableAlarm, delayTimeThreshold, committedLagThreshold, consoleAlarmAddr, broadcast, delayRequestHandlerThreads, consumeMode, consumeModeMapper); + return Objects.hash(group, topics, asyncThreads, redisConfig, alarmGroup, enableAlarm, delayTimeThreshold, committedLagThreshold, delayRequestHandlerThreads); } @Override @@ -230,11 +172,7 @@ public String toString() { ", enableAlarm=" + enableAlarm + ", delayTimeThreshold=" + delayTimeThreshold + ", committedLagThreshold=" + committedLagThreshold + - ", consoleAlarmAddr='" + consoleAlarmAddr + '\'' + - ", broadcast=" + broadcast + ", delayRequestHandlerThreads=" + delayRequestHandlerThreads + - ", consumeMode=" + consumeMode + - ", consumeModeMapper=" + consumeModeMapper + '}'; } @@ -272,9 +210,6 @@ public boolean validate() throws ConfigurationValidator.ConfigException { if (delayTimeThreshold < 0 || committedLagThreshold < 0) { throw new ConfigException("[GroupConfig] delayTimeThreshold or committedLagThreshold error, delayTimeThreshold=" + this.delayTimeThreshold + ", committedLagThreshold=" + committedLagThreshold); } - - if (consumeMode == null || (consumeMode == ConsumeMode.OTHER && MapUtils.isEmpty(consumeModeMapper))) - throw new ConfigException("[GroupConfig] consumeMode error, consumeMode=" + this.consumeMode); } catch (ConfigException e) { throw new ConfigException(e.getMessage() + ", group=" + this.group); } diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/HostRegionConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/HostRegionConfig.java deleted file mode 100644 index f16e62e..0000000 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/HostRegionConfig.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.xiaojukeji.carrera.config.v4; - - -public class HostRegionConfig { - private String brokerCluster; - private String idc; - - public String getBrokerCluster() { - return brokerCluster; - } - - public void setBrokerCluster(String brokerCluster) { - this.brokerCluster = brokerCluster; - } - - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - - @Override - public String toString() { - return "HostRegionConfig{" + - "brokerCluster='" + brokerCluster + '\'' + - ", idc='" + idc + '\'' + - '}'; - } -} \ No newline at end of file diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/MonitorAssignedConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/MonitorAssignedConfig.java deleted file mode 100644 index 30aebec..0000000 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/MonitorAssignedConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.xiaojukeji.carrera.config.v4; - -import java.util.Set; - - -public class MonitorAssignedConfig { - private Set brokers; - - public Set getBrokers() { - return brokers; - } - - public void setBrokers(Set brokers) { - this.brokers = brokers; - } - - @Override - public String toString() { - return "MonitorAssignedConfig{" + - "brokers=" + brokers + - '}'; - } -} \ No newline at end of file diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/PProxyConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/PProxyConfig.java index 18d9123..a3da841 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/PProxyConfig.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/PProxyConfig.java @@ -15,10 +15,9 @@ public class PProxyConfig implements ConfigurationValidator, Cloneable { private String instance/*ip:port*/; private String proxyCluster; - private String idc; private List brokerClusters; - private Set topics = Collections.emptySet(); + private Set topics = Collections.emptySet(); /* topic white list. */ private CarreraConfiguration carreraConfiguration; @@ -29,8 +28,6 @@ public boolean validate() throws ConfigException { throw new ConfigException("[PProxyConfig] instance empty"); } else if (StringUtils.isEmpty(this.proxyCluster)) { throw new ConfigException("[PProxyConfig] proxyCluster empty"); - } else if (StringUtils.isEmpty(this.idc)) { - throw new ConfigException("[PProxyConfig] idc empty"); } else if (CollectionUtils.isEmpty(this.brokerClusters)) { throw new ConfigException("[PProxyConfig] brokerClusters empty"); } else if (carreraConfiguration == null || !carreraConfiguration.validate()) { @@ -51,14 +48,6 @@ public void setInstance(String instance) { this.instance = instance; } - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public List getBrokerClusters() { return brokerClusters; } @@ -102,7 +91,6 @@ public String toString() { return "PProxyConfig{" + "instance='" + instance + '\'' + ", proxyCluster='" + proxyCluster + '\'' + - ", idc='" + idc + '\'' + ", brokerClusters=" + brokerClusters + ", topics=" + topics + ", carreraConfiguration=" + carreraConfiguration + diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/TopicConfig.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/TopicConfig.java index e939e64..37a2b00 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/TopicConfig.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/TopicConfig.java @@ -8,11 +8,9 @@ import com.xiaojukeji.carrera.utils.CommonFastJsonUtils; import com.xiaojukeji.carrera.utils.ConfigUtils; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import java.util.List; -import java.util.Map; public class TopicConfig implements ConfigurationValidator, Cloneable { @@ -28,14 +26,10 @@ public class TopicConfig implements ConfigurationValidator, Cloneable { private String topic; - private String schema; private List alarmGroup; private List topicUnits; - private ProduceMode produceMode = ProduceMode.SAME_IDC; - private Map> produceModeMapper; - private boolean delayTopic = DEFAULT_DELAY_TOPIC; private boolean autoBatch = DEFAULT_AUTO_BATCH; @@ -54,14 +48,6 @@ public void setTopic(String topic) { this.topic = topic; } - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - public List getAlarmGroup() { return alarmGroup; } @@ -78,22 +64,6 @@ public void setTopicUnits(List topicUnits) { this.topicUnits = topicUnits; } - public ProduceMode getProduceMode() { - return produceMode; - } - - public void setProduceMode(ProduceMode produceMode) { - this.produceMode = produceMode; - } - - public Map> getProduceModeMapper() { - return produceModeMapper; - } - - public void setProduceModeMapper(Map> produceModeMapper) { - this.produceModeMapper = produceModeMapper; - } - public boolean isDelayTopic() { return delayTopic; } @@ -148,11 +118,8 @@ public TopicConfig clone() { public String toString() { return "TopicConfig{" + "topic='" + topic + '\'' + - ", schema='" + schema + '\'' + ", alarmGroup=" + alarmGroup + ", topicUnits=" + topicUnits + - ", produceMode=" + produceMode + - ", produceModeMapper=" + produceModeMapper + ", delayTopic=" + delayTopic + ", autoBatch=" + autoBatch + ", strongOrder=" + strongOrder + @@ -167,10 +134,6 @@ public boolean validate() throws ConfigException { throw new ConfigException("[TopicConfig] topic empty, topic=" + topic); } else if (CollectionUtils.isEmpty(this.topicUnits)) { throw new ConfigException("[TopicConfig] topicUnits empty, topic=" + topic); - } else if (produceMode == null) { - throw new ConfigException("[TopicConfig] produceMode is null, topic=" + topic); - } else if (produceMode == ProduceMode.OTHER && MapUtils.isEmpty(produceModeMapper)) { - throw new ConfigException("[TopicConfig] produceModeMapper is empty, topic=" + topic); } else if (!topicUnits.stream().allMatch(TopicConfiguration::validate)) { throw new ConfigException("[TopicConfig] topicUnits error, topic=" + topic); } diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/cproxy/UpstreamTopic.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/cproxy/UpstreamTopic.java index 26a040b..2f0547d 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/cproxy/UpstreamTopic.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/cproxy/UpstreamTopic.java @@ -29,28 +29,26 @@ public class UpstreamTopic implements ConfigurationValidator, Serializable, Clon public static final List DEFAULT_RETRY_INTERVAL = Lists.newArrayList(100, 150, 300); public static final String HTTP_POST = "POST"; public static final String HTTP_GET = "GET"; - public static final String ORDER_BY_QID = "QID"; + public static final String ORDER_BY_QID = "QID"; public static final String ORDER_BY_KEY = "KEY"; + public static final int DEFAULT_MAX_CONSUME_LAG_FACTOR = 3; - private String idc; private String brokerCluster; private Map> proxies = Maps.newHashMap(); private boolean isEnabled; // is subscription enabled - //公共配置 private String topic; private List tags; private List actions; - private int fetchThreads = 1; // rmq / kafka client的消费线程数. - private double maxTps; // kafka/rmq client拉取消息的最大tps,console计算出来的 + private int fetchThreads = 1; // rmq / kafka client 的消费线程数. + private double maxTps; // kafka/rmq client拉取消息的最大tps private double totalMaxTps; // kafka/rmq 真实最大tps private int concurrency = 1024; // 同时未完成消费的消息数量. private long maxConsumeLag = -1; // 第一条未完成消费的消息的offset 与 最后一条已完成消费的消息的offset的 最大差值. 默认-1,表示不限制. 取值为0表示:concurrency*DEFAULT_MAX_CONSUME_LAG_FACTOR; private String groovyScript; - private boolean isBinlog = false; //是否消费的binlog,需要一些额外的日志。 private int timeout = 5000; //http 推送请求的超时时间; SDK消费时, 消息消费的超时时间.超时后,消息可以被重新消费. private boolean isPressureTraffic = false; @@ -90,7 +88,6 @@ public class UpstreamTopic implements ConfigurationValidator, Serializable, Clon private String tokenKey = ""; //在form parameters中添加token private double httpMaxTps = -1; //通过http推送的最大tps。默认-1,表示等于maxTps. - //SDK拉取模式相关配置 private int maxPullBatchSize = 8; //SDK消费时,client一次拉取的小大消息数量. // 写入 hdfs 相关配置 @@ -106,14 +103,6 @@ public class UpstreamTopic implements ConfigurationValidator, Serializable, Clon public UpstreamTopic() { } - public boolean isBinlog() { - return isBinlog; - } - - public void setBinlog(boolean binlog) { - isBinlog = binlog; - } - public String getGroovyScript() { return groovyScript; } @@ -130,14 +119,6 @@ public void setHttpMaxTps(double httpMaxTps) { this.httpMaxTps = httpMaxTps; } - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public String getBrokerCluster() { return brokerCluster; } @@ -367,8 +348,6 @@ public void setHbaseconfiguration(HBaseConfiguration hbaseconfiguration) { @Override public boolean validate() throws ConfigException { - if (StringUtils.isEmpty(this.idc)) - throw new ConfigException("[UpstreamTopic] idc empty"); if (StringUtils.isEmpty(this.brokerCluster)) throw new ConfigException("[UpstreamTopic] brokerClusters empty"); if (StringUtils.isEmpty(this.topic)) @@ -488,7 +467,6 @@ public boolean checkLowLevel() { @Override public String toString() { return "UpstreamTopic{" + - "idc='" + idc + '\'' + ", brokerCluster='" + brokerCluster + '\'' + ", proxies=" + proxies + ", isEnabled=" + isEnabled + @@ -501,7 +479,6 @@ public String toString() { ", concurrency=" + concurrency + ", maxConsumeLag=" + maxConsumeLag + ", groovyScript='" + groovyScript + '\'' + - ", isBinlog=" + isBinlog + ", timeout=" + timeout + ", isPressureTraffic=" + isPressureTraffic + ", orderKey='" + orderKey + '\'' + @@ -536,7 +513,6 @@ public boolean equals(Object o) { if (Double.compare(topic1.totalMaxTps, totalMaxTps) != 0) return false; if (concurrency != topic1.concurrency) return false; if (maxConsumeLag != topic1.maxConsumeLag) return false; - if (isBinlog != topic1.isBinlog) return false; if (timeout != topic1.timeout) return false; if (isPressureTraffic != topic1.isPressureTraffic) return false; if (maxRetry != topic1.maxRetry) return false; @@ -545,7 +521,6 @@ public boolean equals(Object o) { if (enableAlarm != topic1.enableAlarm) return false; if (delayTimeThreshold != topic1.delayTimeThreshold) return false; if (committedLagThreshold != topic1.committedLagThreshold) return false; - if (idc != null ? !idc.equals(topic1.idc) : topic1.idc != null) return false; if (brokerCluster != null ? !brokerCluster.equals(topic1.brokerCluster) : topic1.brokerCluster != null) return false; if (proxies != null ? !proxies.equals(topic1.proxies) : topic1.proxies != null) return false; @@ -578,13 +553,11 @@ public boolean bizEquals(Object o) { if (Double.compare(topic1.totalMaxTps, totalMaxTps) != 0) return false; if (concurrency != topic1.concurrency) return false; if (maxConsumeLag != topic1.maxConsumeLag) return false; - if (isBinlog != topic1.isBinlog) return false; if (timeout != topic1.timeout) return false; if (isPressureTraffic != topic1.isPressureTraffic) return false; if (maxRetry != topic1.maxRetry) return false; if (Double.compare(topic1.httpMaxTps, httpMaxTps) != 0) return false; if (maxPullBatchSize != topic1.maxPullBatchSize) return false; - if (idc != null ? !idc.equals(topic1.idc) : topic1.idc != null) return false; if (brokerCluster != null ? !brokerCluster.equals(topic1.brokerCluster) : topic1.brokerCluster != null) return false; if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false; @@ -607,9 +580,8 @@ public boolean bizEquals(Object o) { @Override public int hashCode() { - int result; + int result = 0; long temp; - result = idc != null ? idc.hashCode() : 0; result = 31 * result + (brokerCluster != null ? brokerCluster.hashCode() : 0); result = 31 * result + (proxies != null ? proxies.hashCode() : 0); result = 31 * result + (isEnabled ? 1 : 0); @@ -624,7 +596,6 @@ public int hashCode() { result = 31 * result + concurrency; result = 31 * result + (int) (maxConsumeLag ^ (maxConsumeLag >>> 32)); result = 31 * result + (groovyScript != null ? groovyScript.hashCode() : 0); - result = 31 * result + (isBinlog ? 1 : 0); result = 31 * result + timeout; result = 31 * result + (isPressureTraffic ? 1 : 0); result = 31 * result + (orderKey != null ? orderKey.hashCode() : 0); diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/CarreraConfiguration.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/CarreraConfiguration.java index 6a662c5..6262a33 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/CarreraConfiguration.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/CarreraConfiguration.java @@ -25,8 +25,6 @@ public class CarreraConfiguration implements ConfigurationValidator { private int maxTps = 100000; private double tpsWarningRatio = 0.9; - private ParamLengthConfiguration paramLength; - private RateLimitConfiguration rateLimit; // 静态添加的配置。优先级低于动态配置。会被覆盖。 private TopicInfoConfiguration defaultTopicInfoConf = new TopicInfoConfiguration(); @@ -146,22 +144,6 @@ public void setAutoBatch(BatchMQProducerConfiguration autoBatch) { this.autoBatch = autoBatch; } - public ParamLengthConfiguration getParamLength() { - return paramLength; - } - - public void setParamLength(ParamLengthConfiguration paramLength) { - this.paramLength = paramLength; - } - - public RateLimitConfiguration getRateLimit() { - return rateLimit; - } - - public void setRateLimit(RateLimitConfiguration rateLimit) { - this.rateLimit = rateLimit; - } - public Map getKafkaConfigurationMap() { return kafkaConfigurationMap; } @@ -212,10 +194,6 @@ public boolean validate() throws ConfigException { throw new ConfigException("[CarreraConfiguration] tpsWarningRatio <= 0"); } else if (defaultTopicInfoConf == null) { throw new ConfigException("[CarreraConfiguration] defaultTopicInfoConf is null"); - } else if (paramLength == null || !paramLength.validate()) { - throw new ConfigException("[CarreraConfiguration] paramLength error"); - } else if (rateLimit == null) { - throw new ConfigException("[CarreraConfiguration] rateLimit error"); } return true; @@ -239,8 +217,6 @@ public String toString() { ", warmUpConnection=" + warmUpConnection + ", maxTps=" + maxTps + ", tpsWarningRatio=" + tpsWarningRatio + - ", paramLength=" + paramLength + - ", rateLimit=" + rateLimit + ", defaultTopicInfoConf=" + defaultTopicInfoConf + ", warmUpFetchTopicRouteInfo=" + warmUpFetchTopicRouteInfo + ", limiterFailureRetryQueueSize=" + limiterFailureRetryQueueSize + diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/ParamLengthConfiguration.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/ParamLengthConfiguration.java deleted file mode 100644 index a0f4aa5..0000000 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/ParamLengthConfiguration.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.xiaojukeji.carrera.config.v4.pproxy; - -import com.xiaojukeji.carrera.config.ConfigurationValidator; - - -public class ParamLengthConfiguration implements ConfigurationValidator { - private boolean failWhenIllegal = true; - private int keyLenMax = 255; - private int tagLenMax = 255; - - public boolean isFailWhenIllegal() { - return failWhenIllegal; - } - - public void setFailWhenIllegal(boolean failWhenIllegal) { - this.failWhenIllegal = failWhenIllegal; - } - - public int getKeyLenMax() { - return keyLenMax; - } - - public void setKeyLenMax(int keyLenMax) { - this.keyLenMax = keyLenMax; - } - - public int getTagLenMax() { - return tagLenMax; - } - - public void setTagLenMax(int tagLenMax) { - this.tagLenMax = tagLenMax; - } - - @Override - public String toString() { - return "ParamLengthConfiguration{" + - "failWhenIllegal=" + failWhenIllegal + - ", keyLenMax=" + keyLenMax + - ", tagLenMax=" + tagLenMax + - '}'; - } - - @Override - public boolean validate() { - return keyLenMax > 0 && tagLenMax > 0; - } -} \ No newline at end of file diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/RateLimitConfiguration.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/RateLimitConfiguration.java deleted file mode 100644 index 741e7c9..0000000 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/RateLimitConfiguration.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.xiaojukeji.carrera.config.v4.pproxy; - -import com.xiaojukeji.carrera.config.ConfigurationValidator; - - -public class RateLimitConfiguration implements ConfigurationValidator { - private boolean staticMode = false; - private String zkPath; - - public boolean isStaticMode() { - return staticMode; - } - - public void setStaticMode(boolean staticMode) { - this.staticMode = staticMode; - } - - public String getZkPath() { - return zkPath; - } - - public void setZkPath(String zkPath) { - this.zkPath = zkPath; - } - - @Override - public String toString() { - return "RateLimitConfiguration{" + - "staticMode=" + staticMode + - ", zkPath='" + zkPath + '\'' + - '}'; - } - - @Override - public boolean validate() { - return true; - } -} \ No newline at end of file diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/TopicConfiguration.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/TopicConfiguration.java index 2512abc..049f334 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/TopicConfiguration.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/config/v4/pproxy/TopicConfiguration.java @@ -16,9 +16,7 @@ public class TopicConfiguration implements ConfigurationValidator { private static final int DEFAULT_TOTAL_MAX_TPS = ConfigUtils.getDefaultConfig( "com.xiaojukeji.carrera.config.v4.pproxy.TopicConfiguration.totalMaxTps", 1024); - private String idc; private String brokerCluster; - private String clusterName; private Map> proxies = Maps.newHashMap(); private int totalMaxTps = DEFAULT_TOTAL_MAX_TPS; @@ -41,14 +39,6 @@ public void setTotalMaxTps(int totalMaxTps) { this.totalMaxTps = totalMaxTps; } - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public String getBrokerCluster() { return brokerCluster; } @@ -65,25 +55,16 @@ public void setProxies(Map> proxies) { this.proxies = proxies; } - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } @Override public boolean validate() { - return StringUtils.isNotEmpty(idc) && StringUtils.isNotEmpty(brokerCluster) && totalMaxTps > 0; + return StringUtils.isNotEmpty(brokerCluster) && totalMaxTps > 0; } @Override public String toString() { return "TopicConfiguration{" + - "idc='" + idc + '\'' + ", brokerCluster='" + brokerCluster + '\'' + - ", clusterName='" + clusterName + '\'' + ", proxies=" + proxies + ", totalMaxTps=" + totalMaxTps + ", maxTps=" + maxTps + diff --git a/carrera-common/src/main/java/com/xiaojukeji/carrera/utils/HttpUtils.java b/carrera-common/src/main/java/com/xiaojukeji/carrera/utils/HttpUtils.java index 685b17a..902b972 100644 --- a/carrera-common/src/main/java/com/xiaojukeji/carrera/utils/HttpUtils.java +++ b/carrera-common/src/main/java/com/xiaojukeji/carrera/utils/HttpUtils.java @@ -1,32 +1,17 @@ package com.xiaojukeji.carrera.utils; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.NameValuePair; -import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class HttpUtils { @@ -57,103 +42,6 @@ static class InnerStaticClass { } } - /** - * 发送 GET 请求(HTTP),不带输入数据 - * - * @param url - * @return - */ - public static String doGet(String url) { - return doGet(url, new HashMap()); - } - - /** - * 发送 GET 请求(HTTP),K-V形式 - * - * @param url - * @param params - * @return - */ - public static String doGet(String url, Map params) { - String apiUrl = url; - StringBuffer param = new StringBuffer(); - int i = 0; - for (String key : params.keySet()) { - if (i == 0) - param.append("?"); - else - param.append("&"); - param.append(key).append("=").append(params.get(key)); - i++; - } - apiUrl += param; - String result = null; - HttpClient httpclient = new DefaultHttpClient(); - try { - HttpGet httpPost = new HttpGet(apiUrl); - HttpResponse response = httpclient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - LOGGER.debug("status code:" + statusCode); - HttpEntity entity = response.getEntity(); - if (entity != null) { - InputStream instream = entity.getContent(); - result = IOUtils.toString(instream, "UTF-8"); - } - } catch (IOException e) { - LOGGER.error("do get failed", e); - } - return result; - } - - /** - * 发送 POST 请求(HTTP),不带输入数据 - * - * @param apiUrl - * @return - */ - public static String doPost(String apiUrl) { - return doPost(apiUrl, new HashMap()); - } - - /** - * 发送 POST 请求(HTTP),K-V形式 - * - * @param apiUrl API接口URL - * @param params 参数map - * @return - */ - public static String doPost(String apiUrl, Map params) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - String httpStr = null; - HttpPost httpPost = new HttpPost(apiUrl); - CloseableHttpResponse response = null; - - try { - httpPost.setConfig(InnerStaticClass.requestConfig); - List pairList = new ArrayList<>(params.size()); - for (Map.Entry entry : params.entrySet()) { - NameValuePair pair = new BasicNameValuePair(entry.getKey(), entry.getValue().toString()); - pairList.add(pair); - } - httpPost.setEntity(new UrlEncodedFormEntity(pairList, Charset.forName("UTF-8"))); - response = httpClient.execute(httpPost); - System.out.println(response.toString()); - HttpEntity entity = response.getEntity(); - httpStr = EntityUtils.toString(entity, "UTF-8"); - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (response != null) { - try { - EntityUtils.consume(response.getEntity()); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - return httpStr; - } - /** * 发送 POST 请求(HTTP),JSON形式 * @@ -168,7 +56,7 @@ public static CloseableHttpResponse doPost(String apiUrl, Object json) { try { httpPost.setConfig(InnerStaticClass.requestConfig); - StringEntity stringEntity = new StringEntity(json.toString(), "UTF-8");//解决中文乱码问题 + StringEntity stringEntity = new StringEntity(json.toString(), "UTF-8"); stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/IdcService.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/IdcService.java deleted file mode 100644 index 64aeb5c..0000000 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/IdcService.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.didi.carrera.console.service; - -import com.didi.carrera.console.dao.model.Idc; - -import java.util.Map; - - -public interface IdcService { - - Map findMap(); - -} \ No newline at end of file diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/ZKV4ConfigService.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/ZKV4ConfigService.java index fe78be1..50adcb9 100644 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/ZKV4ConfigService.java +++ b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/ZKV4ConfigService.java @@ -42,7 +42,7 @@ public interface ZKV4ConfigService { ConsoleBaseResponse pushCProxyByCluster(String clusterName) throws Exception; - UpstreamTopic buildUpstreamTopic(GroupConfig groupConfig, ConsumeSubscription subscription, String brokerCluster, String idc) throws ZkConfigException; + UpstreamTopic buildUpstreamTopic(GroupConfig groupConfig, ConsumeSubscription subscription, String brokerCluster) throws ZkConfigException; void updatePProxyConfigByClusterId(String topic, Set clusterIdSet) throws Exception; void updateCProxyConfigByClusterId(String group, Set clusterIdSet) throws Exception; diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ConsumeSubscriptionServiceImpl.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ConsumeSubscriptionServiceImpl.java index 3203543..79cfeec 100644 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ConsumeSubscriptionServiceImpl.java +++ b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ConsumeSubscriptionServiceImpl.java @@ -156,7 +156,7 @@ private com.xiaojukeji.carrera.config.v4.cproxy.UpstreamTopic buildUpstreamTopic ConsumeSubscriptionConfig config = new ConsumeSubscriptionConfig(); subscription.setConsumeSubscriptionConfig(config); - return zkv4ConfigService.buildUpstreamTopic(new GroupConfig(), subscription, "0", "0"); + return zkv4ConfigService.buildUpstreamTopic(new GroupConfig(), subscription, "0"); } @Override diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/IdcServiceImpl.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/IdcServiceImpl.java deleted file mode 100644 index 8b55b65..0000000 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/IdcServiceImpl.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.didi.carrera.console.service.impl; - -import com.didi.carrera.console.dao.dict.IsDelete; -import com.didi.carrera.console.dao.mapper.IdcMapper; -import com.didi.carrera.console.dao.model.Idc; -import com.didi.carrera.console.dao.model.IdcCriteria; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - - -@Service -public class IdcServiceImpl implements com.didi.carrera.console.service.IdcService { - - @Autowired - private IdcMapper idcMapper; - - @Override - public Map findMap() { - IdcCriteria cc = new IdcCriteria(); - cc.createCriteria().andIsDeleteEqualTo(IsDelete.NO.getIndex()); - List idcList = idcMapper.selectByExample(cc); - - return idcList.stream().collect(Collectors.toMap(Idc::getId, Function.identity())); - } -} \ No newline at end of file diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/TopicServiceImpl.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/TopicServiceImpl.java index add918f..2244f2f 100644 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/TopicServiceImpl.java +++ b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/TopicServiceImpl.java @@ -5,37 +5,17 @@ import com.didi.carrera.console.dao.dict.IsEnable; import com.didi.carrera.console.dao.dict.MqServerType; import com.didi.carrera.console.dao.dict.NodeType; -import com.didi.carrera.console.dao.dict.TopicProduceMode; import com.didi.carrera.console.dao.mapper.TopicMapper; import com.didi.carrera.console.dao.mapper.custom.TopicCustomMapper; -import com.didi.carrera.console.dao.model.Cluster; -import com.didi.carrera.console.dao.model.Idc; -import com.didi.carrera.console.dao.model.MqServer; -import com.didi.carrera.console.dao.model.Node; -import com.didi.carrera.console.dao.model.Topic; -import com.didi.carrera.console.dao.model.TopicConf; -import com.didi.carrera.console.dao.model.TopicCriteria; +import com.didi.carrera.console.dao.model.*; import com.didi.carrera.console.dao.model.custom.CustomConsumeSubscription; import com.didi.carrera.console.dao.model.custom.CustomTopicConf; import com.didi.carrera.console.dao.model.custom.TopicConfConfig; import com.didi.carrera.console.dao.model.custom.TopicConfig; import com.didi.carrera.console.data.Message; -import com.didi.carrera.console.service.ClusterService; -import com.didi.carrera.console.service.ConsumeSubscriptionService; -import com.didi.carrera.console.service.IdcService; -import com.didi.carrera.console.service.MqServerService; -import com.didi.carrera.console.service.NodeService; -import com.didi.carrera.console.service.RmqAdminService; -import com.didi.carrera.console.service.TopicConfService; -import com.didi.carrera.console.service.TopicService; -import com.didi.carrera.console.service.ZKV4ConfigService; +import com.didi.carrera.console.service.*; import com.didi.carrera.console.service.bean.PageModel; -import com.didi.carrera.console.service.vo.TopicConfVo; -import com.didi.carrera.console.service.vo.TopicListGroupVo; -import com.didi.carrera.console.service.vo.TopicMessageVo; -import com.didi.carrera.console.service.vo.TopicOrderVo; -import com.didi.carrera.console.service.vo.TopicSimpleVo; -import com.didi.carrera.console.service.vo.TopicStateVo; +import com.didi.carrera.console.service.vo.*; import com.didi.carrera.console.web.ConsoleBaseResponse; import com.didi.carrera.console.web.controller.bo.AcceptTopicConfBo; import com.didi.carrera.console.web.controller.bo.NodeBo; @@ -60,11 +40,7 @@ import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -101,9 +77,6 @@ public class TopicServiceImpl implements TopicService { @Resource(name = "didiNodeServiceImpl") private NodeService nodeService; - @Autowired - private IdcService idcService; - public ConsoleBaseResponse validateTopicBo(TopicOrderBo topicInfo) { if (!topicInfo.isModify() && CollectionUtils.isNotEmpty(findByTopicNameWithDelete(topicInfo.getTopicName()))) { return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "Topic existed"); @@ -146,7 +119,6 @@ public ConsoleBaseResponse validateTopicBo(TopicOrder } } - Map idcMap = idcService.findMap(); Map> confIdcMap = Maps.newHashMap(); for (T bo : topicInfo.getConf()) { Cluster cluster = clusterService.findById(bo.getClusterId()); @@ -156,34 +128,12 @@ public ConsoleBaseResponse validateTopicBo(TopicOrder bo.setClusterName(cluster.getName()); } - if (bo.getServerIdcId() == null || bo.getServerIdcId() == 0L) { - bo.setServerIdcId(cluster.getIdcId()); - bo.setServerIdcName(cluster.getIdc()); - } else { - if (!cluster.getIdcId().equals(bo.getServerIdcId())) { - return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "serverIdc<" + bo.getServerIdcId() + ">param error"); - } - } - if (confIdcMap.containsKey(bo.getServerIdcId())) { return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群 " + confIdcMap.get(bo.getServerIdcId()).get(0) + " 和 " + bo.getClusterName() + " 同属于一个IDC, 不允许重复添加"); } else { confIdcMap.put(bo.getServerIdcId(), Lists.newArrayList(bo.getClusterName())); } - if (topicInfo.getProduceMode() == TopicProduceMode.OTHER.getIndex()) { - if (MapUtils.isEmpty(bo.getClientIdcMap())) { - return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "clientIdc不能为空"); - } - for (Long idcId : bo.getClientIdcMap().values()) { - if (!idcMap.containsKey(idcId)) { - return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "clientIdc不存在"); - } else { - bo.getClientIdcMap().put(idcMap.get(idcId).getName(), idcId); - } - } - } - if (bo instanceof AcceptTopicConfBo) { AcceptTopicConfBo acceptBo = (AcceptTopicConfBo) bo; if (acceptBo.getMqServerId() == null || acceptBo.getMqServerId().equals(0L)) { diff --git a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ZKV4ConfigServiceImpl.java b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ZKV4ConfigServiceImpl.java index 44e840b..0634685 100644 --- a/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ZKV4ConfigServiceImpl.java +++ b/carrera-console/carrera-console/src/main/java/com/didi/carrera/console/service/impl/ZKV4ConfigServiceImpl.java @@ -5,39 +5,10 @@ import com.didi.carrera.console.common.util.FastJsonUtils; import com.didi.carrera.console.common.util.HostUtils; import com.didi.carrera.console.config.ConsoleConfig; -import com.didi.carrera.console.dao.dict.ClusterMqServerRelationType; -import com.didi.carrera.console.dao.dict.ConsumeSubscriptionAlarmType; -import com.didi.carrera.console.dao.dict.ConsumeSubscriptionBigDataType; -import com.didi.carrera.console.dao.dict.ConsumeSubscriptionConsumeType; -import com.didi.carrera.console.dao.dict.ConsumeSubscriptionHttpMethod; -import com.didi.carrera.console.dao.dict.ConsumeSubscriptionPressureTraffic; -import com.didi.carrera.console.dao.dict.IsDelete; -import com.didi.carrera.console.dao.dict.IsEnable; -import com.didi.carrera.console.dao.dict.MqServerType; -import com.didi.carrera.console.dao.dict.NodeType; -import com.didi.carrera.console.dao.dict.TopicCompressionType; -import com.didi.carrera.console.dao.dict.TopicDelayTopic; -import com.didi.carrera.console.dao.dict.TopicProduceMode; -import com.didi.carrera.console.dao.model.Cluster; -import com.didi.carrera.console.dao.model.ClusterMqserverRelation; -import com.didi.carrera.console.dao.model.ConsumeGroup; -import com.didi.carrera.console.dao.model.ConsumeSubscription; -import com.didi.carrera.console.dao.model.Idc; -import com.didi.carrera.console.dao.model.MqServer; -import com.didi.carrera.console.dao.model.Node; -import com.didi.carrera.console.dao.model.Topic; -import com.didi.carrera.console.dao.model.TopicConf; +import com.didi.carrera.console.dao.dict.*; +import com.didi.carrera.console.dao.model.*; import com.didi.carrera.console.dao.model.custom.ConsumeGroupConfig; -import com.didi.carrera.console.service.ClusterMqserverRelationService; -import com.didi.carrera.console.service.ClusterService; -import com.didi.carrera.console.service.ConsumeGroupService; -import com.didi.carrera.console.service.ConsumeSubscriptionService; -import com.didi.carrera.console.service.IdcService; -import com.didi.carrera.console.service.MqServerService; -import com.didi.carrera.console.service.NodeService; -import com.didi.carrera.console.service.TopicConfService; -import com.didi.carrera.console.service.TopicService; -import com.didi.carrera.console.service.ZKV4ConfigService; +import com.didi.carrera.console.service.*; import com.didi.carrera.console.service.exception.ZkConfigException; import com.didi.carrera.console.web.ConsoleBaseResponse; import com.didi.carrera.console.web.controller.bo.ConsumeSubscriptionOrderBo; @@ -48,16 +19,8 @@ import com.xiaojukeji.carrera.config.Actions; import com.xiaojukeji.carrera.config.CompressType; import com.xiaojukeji.carrera.config.ConfigurationValidator; -import com.xiaojukeji.carrera.config.v4.BrokerConfig; -import com.xiaojukeji.carrera.config.v4.CProxyConfig; -import com.xiaojukeji.carrera.config.v4.GroupConfig; -import com.xiaojukeji.carrera.config.v4.PProxyConfig; -import com.xiaojukeji.carrera.config.v4.TopicConfig; -import com.xiaojukeji.carrera.config.v4.cproxy.ConsumeServerConfiguration; -import com.xiaojukeji.carrera.config.v4.cproxy.HBaseConfiguration; -import com.xiaojukeji.carrera.config.v4.cproxy.HdfsConfiguration; -import com.xiaojukeji.carrera.config.v4.cproxy.KafkaConfiguration; -import com.xiaojukeji.carrera.config.v4.cproxy.UpstreamTopic; +import com.xiaojukeji.carrera.config.v4.*; +import com.xiaojukeji.carrera.config.v4.cproxy.*; import com.xiaojukeji.carrera.config.v4.pproxy.CarreraConfiguration; import com.xiaojukeji.carrera.config.v4.pproxy.RocketmqConfiguration; import com.xiaojukeji.carrera.config.v4.pproxy.TopicConfiguration; @@ -119,9 +82,6 @@ public class ZKV4ConfigServiceImpl implements ZKV4ConfigService { @Autowired private ZkService zkService; - @Autowired - private IdcService idcService; - private Map getMqServerMap() { Map mqServerMap = Maps.newHashMap(); mqServerService.findAll().forEach(mqServer -> mqServerMap.put(mqServer.getId(), mqServer)); @@ -427,11 +387,6 @@ private TopicConfig buildTopicConfig(Topic topic, List topicConfList) TopicConfig topicConfig = new TopicConfig(); topicConfig.setTopic(topic.getTopicName()); topicConfig.setAlarmGroup(topic.getTopicAlarmGroup()); - if (IsEnable.isEnable(topic.getEnableSchemaVerify())) { - topicConfig.setSchema(topic.getTopicSchema()); - } - topicConfig.setProduceMode(TopicProduceMode.getZkMode(topic.getProduceMode())); - topicConfig.setDelayTopic(topic.getDelayTopic() == TopicDelayTopic.DELAY_TOPIC.getIndex()); topicConfig.setAutoBatch(topic.getTopicConfig().isAutoBatch()); @@ -446,9 +401,7 @@ private TopicConfig buildTopicConfig(Topic topic, List topicConfList) Map mqServerTypeTable = getMqServerMap(); Map clusterTable = getClusterMap(); - Map idcMap = idcService.findMap(); - Map> produceModeMapper = Maps.newHashMap(); for (TopicConf conf : topicConfList) { if (!mqServerTypeTable.containsKey(conf.getMqServerId())) { throw new ZkConfigException(String.format("[Topic] topicConfId(%s) not found mqserver(%s)", conf.getId(), conf.getMqServerId())); @@ -458,21 +411,8 @@ private TopicConfig buildTopicConfig(Topic topic, List topicConfList) throw new ZkConfigException(String.format("[Topic] topicConfId(%s) not found cluster(%s)", conf.getId(), conf.getClusterId())); } - if (topic.getProduceMode() == TopicProduceMode.OTHER.getIndex()) { - if (MapUtils.isEmpty(conf.getTopicConfClientIdc())) { - throw new ZkConfigException(String.format("[Topic] topicConfId(%s) not found clientIdc, clusterId=(%s), ", conf.getId(), conf.getClusterId())); - } - for (Long clientIdc : conf.getTopicConfClientIdc().values()) { - if (conf.getServerIdcId() != null && conf.getServerIdcId() > 0) { - produceModeMapper.computeIfAbsent(idcMap.get(clientIdc).getName(), s -> Lists.newArrayList()).add(idcMap.get(clusterTable.get(conf.getClusterId()).getIdcId()).getName()); - } - } - } - TopicConfiguration topicConfiguration = new TopicConfiguration(); - topicConfiguration.setIdc(clusterTable.get(conf.getClusterId()).getIdc()); topicConfiguration.setBrokerCluster(mqServerTypeTable.get(conf.getMqServerId()).getName()); - topicConfiguration.setClusterName(conf.getClusterName()); if (conf.getTopicConfConfig() != null) { if (MapUtils.isNotEmpty(conf.getTopicConfConfig().getProxies())) { @@ -493,12 +433,6 @@ private TopicConfig buildTopicConfig(Topic topic, List topicConfList) confList.add(topicConfiguration); } - if (topic.getProduceMode() == TopicProduceMode.OTHER.getIndex()) { - if (MapUtils.isEmpty(produceModeMapper)) { - throw new ZkConfigException(String.format("[Topic] ProduceModeMapper is empty, topicId=%s", topic.getId())); - } - topicConfig.setProduceModeMapper(produceModeMapper); - } return topicConfig; } @@ -577,7 +511,6 @@ private GroupConfig buildGroupConfig(ConsumeGroup group, List upstreamTopics = Lists.newArrayList(); groupConfig.setTopics(upstreamTopics); @@ -597,7 +530,7 @@ private GroupConfig buildGroupConfig(ConsumeGroup group, List mqServerTypeTable = getMqServerNameMap(); @@ -868,12 +799,10 @@ private PProxyConfig buildPProxyConfig(Node node, String host, Cluster cluster, return pProxyConfig; } - private String getPProxyCluster(String clusterName) { return "P_" + clusterName; } - private String getCProxyCluster(String clusterName) { return "C_" + clusterName; } @@ -934,11 +863,9 @@ private CProxyConfig buildCProxyConfig(String host, Cluster cluster, List> pproxies = Maps.newHashMap(); - cProxyConfig.setPproxies(pproxies); List pNodes = nodeService.findByClusterIdNodeType(cluster.getId(), NodeType.PRODUCER_PROXY); if (CollectionUtils.isNotEmpty(pNodes)) { @@ -1037,7 +964,6 @@ public void updateBrokerConfig(Long mqServerId) throws Exception { } BrokerConfig brokerConfig = new BrokerConfig(); - brokerConfig.setIdc(getIdcFromMqServerName(mqServer.getName())); brokerConfig.setBrokerCluster(mqServer.getName()); brokerConfig.setBrokerClusterAddrs(mqServer.getAddr()); Map> brokers = Maps.newHashMap(); @@ -1108,15 +1034,4 @@ public void updateBrokerConfig(Long mqServerId) throws Exception { zkService.createOrUpdateBroker(brokerConfig); } } - - private String getIdcFromMqServerName(String mqServerName) { - String idc = mqServerName.split("_")[1]; - if (idc.contains("test")) { - idc = "test"; - } else { - idc = "default"; - } - - return idc; - } } \ No newline at end of file diff --git a/carrera-consumer/pom.xml b/carrera-consumer/pom.xml index fb50cb6..0be1381 100644 --- a/carrera-consumer/pom.xml +++ b/carrera-consumer/pom.xml @@ -22,6 +22,11 @@ snakeyaml 1.17 + + com.lmax + disruptor + 3.3.6 + commons-io commons-io @@ -48,37 +53,16 @@ log4j-slf4j-impl ${log4j2.version} - - - com.lmax - disruptor - 3.3.4 - - - - org.apache.httpcomponents - httpclient - 4.5.2 - - com.google.guava guava 21.0 - - junit - junit - 4.12 - test - - org.codehaus.groovy groovy 2.4.7 - org.asynchttpclient async-http-client @@ -93,12 +77,6 @@ com.xiaojukeji.ddmq carrera-common 1.0.0-SNAPSHOT - - - org.apache.httpcomponents - httpclient - - com.xiaojukeji.rocketmq @@ -119,13 +97,6 @@ - - - com.google.protobuf - protobuf-java - 2.5.0 - - redis.clients jedis @@ -147,17 +118,6 @@ commons-beanutils 1.9.2 - - org.mockito - mockito-all - 1.10.19 - test - - - com.xiaojukeji.ddmq - carrera-producer-sdk - 1.0.0-SNAPSHOT - org.apache.hbase hbase-client diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/hdfs/DataFileManager.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/hdfs/DataFileManager.java index bfe6020..f95dd5c 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/hdfs/DataFileManager.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/hdfs/DataFileManager.java @@ -3,7 +3,6 @@ import com.xiaojukeji.carrera.config.v4.cproxy.HdfsConfiguration; import com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob; import com.xiaojukeji.carrera.cproxy.utils.MetricUtils; -import com.xiaojukeji.carrera.utils.TimeUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -14,22 +13,8 @@ import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -173,7 +158,6 @@ private void flushDataFiles() { LOGGER.error("Failed to get result ", e); } executor.shutdown(); - LOGGER.info("FileFlushScanner sync time is {} ms, group {}", TimeUtils.getElapseTime(start), group); } public void flush(boolean stop) { diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/http/CarreraAsyncRequest.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/http/CarreraAsyncRequest.java index 8a7237b..a3c7413 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/http/CarreraAsyncRequest.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/http/CarreraAsyncRequest.java @@ -5,18 +5,10 @@ import com.xiaojukeji.carrera.chronos.enums.MsgTypes; import com.xiaojukeji.carrera.chronos.model.InternalKey; import com.xiaojukeji.carrera.config.v4.cproxy.UpstreamTopic; -import com.xiaojukeji.carrera.cproxy.consumer.limiter.LimiterMgr; -import com.xiaojukeji.carrera.producer.CarreraProducer; -import com.xiaojukeji.carrera.producer.CarreraReturnCode; -import com.xiaojukeji.carrera.thrift.DelayResult; -import com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob; import com.xiaojukeji.carrera.cproxy.actions.FormParamsExtractAction; -import com.xiaojukeji.carrera.cproxy.actions.util.CarreraProducerManager; -import com.xiaojukeji.carrera.cproxy.utils.JsonUtils; -import com.xiaojukeji.carrera.cproxy.utils.LogUtils; -import com.xiaojukeji.carrera.cproxy.utils.MetricUtils; -import com.xiaojukeji.carrera.cproxy.utils.StringUtils; -import com.xiaojukeji.carrera.cproxy.utils.TimeUtils; +import com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob; +import com.xiaojukeji.carrera.cproxy.consumer.limiter.LimiterMgr; +import com.xiaojukeji.carrera.cproxy.utils.*; import org.apache.commons.lang3.RandomUtils; import org.apache.http.HttpStatus; import org.asynchttpclient.AsyncCompletionHandler; @@ -28,15 +20,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import static com.xiaojukeji.carrera.thrift.consumer.consumerProxyConstants.CARRERA_HEADERS; import static com.xiaojukeji.carrera.cproxy.actions.FormParamsExtractAction.CARRERA_PROPERTIES; +import static com.xiaojukeji.carrera.thrift.consumer.consumerProxyConstants.CARRERA_HEADERS; import static org.slf4j.LoggerFactory.getLogger; @@ -191,29 +179,6 @@ ProcessResult proceedErrCode(int code, UpstreamJob job) { slowDown(); return ProcessResult.OK; } else if (code == HttpErrNo.IN_PROCESSABLE.code()) { - if (job.isFromChronos()) { - InternalKey internalKey = new InternalKey(job.getCommonMessage().getKey()); - if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) { - CarreraProducer producer = CarreraProducerManager.getProducer(); - - if (producer != null) { - DelayResult delayResult = producer.cancelDelayMessageBuilder() - .setTopic(job.getTopic()).setUniqDelayMsgId(job.getCommonMessage().getKey()).send(); - if (delayResult.getCode() == CarreraReturnCode.OK) { - LOGGER.info("succ cancel delay msg, delayResult:{}, topic:{}, uniqDelayMsgId:{}", delayResult, - job.getTopic(), job.getCommonMessage().getKey()); - } else { - LOGGER.info("fail to cancel delay msg, delayResult:{}, topic:{}, uniqDelayMsgId:{}", delayResult, - job.getTopic(), job.getCommonMessage().getKey()); - } - } else { - LogUtils.logErrorInfo("CarreraAsyncRequest_error", "fail to cancel delay msg for carrera producer is null, topic:{}, uniqDelayMsgId:{}", - job.getTopic(), job.getCommonMessage().getKey()); - } - } - } - - // skip this msg return ProcessResult.OK; } diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/util/CarreraProducerManager.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/util/CarreraProducerManager.java deleted file mode 100644 index 1806b5d..0000000 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/util/CarreraProducerManager.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.xiaojukeji.carrera.cproxy.actions.util; - -import com.xiaojukeji.carrera.config.CarreraConfig; -import com.xiaojukeji.carrera.producer.CarreraProducer; -import com.xiaojukeji.carrera.cproxy.concurrent.CarreraExecutors; -import com.xiaojukeji.carrera.cproxy.config.ProducerProxyConfiguration; -import org.slf4j.Logger; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.slf4j.LoggerFactory.getLogger; - - -public class CarreraProducerManager { - private static final Logger LOGGER = getLogger(CarreraProducerManager.class); - - private static volatile CarreraProducer producer = null; - private static final ScheduledExecutorService scheduler = CarreraExecutors.newScheduledThreadPool(1, "CarreraProducerManagerScheduler"); - - public static synchronized void startCarreraProducer(ProducerProxyConfiguration producerProxyConfiguration) { - if (producerProxyConfiguration.getCarreraProxyList() == null || producerProxyConfiguration.getCarreraProxyList().size() == 0) { - LOGGER.error("error while start carrera producer for no pproxy list, producerProxyConfiguration:{}", producerProxyConfiguration); - return; - } - final long start = System.currentTimeMillis(); - CarreraProducer oldProducer = producer; - - final CarreraConfig config = new CarreraConfig(); - config.setCarreraProxyList(producerProxyConfiguration.getCarreraProxyList()); - config.setCarreraProxyTimeout(producerProxyConfiguration.getCarreraProxyTimeout()); - config.setCarreraClientRetry(producerProxyConfiguration.getCarreraClientRetry()); - config.setCarreraClientTimeout(producerProxyConfiguration.getCarreraClientTimeout()); - config.setCarreraPoolSize(producerProxyConfiguration.getCarreraPoolSize()); - config.setBatchSendThreadNumber(producerProxyConfiguration.getBatchSendThreadNumber()); - CarreraProducer newProducer = new CarreraProducer(config); - try { - newProducer.start(); - LOGGER.info("start carrera producer, cost:{}ms, producerProxyConfiguration:{}", System.currentTimeMillis() - start, producerProxyConfiguration); - } catch (Exception e) { - LOGGER.error("error while start carrera producer, producerProxyConfiguration:{}, err:{}", producerProxyConfiguration, e.getMessage(), e); - return; - } - - producer = newProducer; - - // old producer可能正在被使用,延迟一定时间之后再关闭 - scheduler.schedule(() -> stopCarreraProducer(oldProducer), 2, TimeUnit.MINUTES); - } - - public static void stopCarreraProducer(CarreraProducer producer) { - final long start = System.currentTimeMillis(); - if (producer != null) { - producer.shutdown(); - LOGGER.info("stop carrera producer, cost:{}ms", System.currentTimeMillis() - start); - } - } - - public static synchronized void shutdown() { - stopCarreraProducer(producer); - scheduler.shutdown(); - } - - public static CarreraProducer getProducer() { - return producer; - } -} \ No newline at end of file diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/AbstractCarreraRocketMqConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/AbstractCarreraRocketMqConsumer.java index 7effdf5..7a6aada 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/AbstractCarreraRocketMqConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/AbstractCarreraRocketMqConsumer.java @@ -45,10 +45,10 @@ abstract class AbstractCarreraRocketMqConsumer extends BaseCarreraConsumer { private int totalThreads; - public AbstractCarreraRocketMqConsumer(String idc, String brokerCluster, String group, GroupConfig groupConfig, - CProxyConfig cProxyConfig, RocketmqConfiguration rocketmqConfiguration, - AsyncMessageHandler handler, Map maxCommitLagMap, int totalThreads) { - super(idc, brokerCluster, group, groupConfig, cProxyConfig, handler, maxCommitLagMap); + public AbstractCarreraRocketMqConsumer(String brokerCluster, String group, GroupConfig groupConfig, + CProxyConfig cProxyConfig, RocketmqConfiguration rocketmqConfiguration, + AsyncMessageHandler handler, Map maxCommitLagMap, int totalThreads) { + super(brokerCluster, group, groupConfig, cProxyConfig, handler, maxCommitLagMap); this.rocketmqConfiguration = rocketmqConfiguration; this.totalThreads = totalThreads; } @@ -179,7 +179,7 @@ public synchronized void shutdown() { super.shutdown(); isRunning = false; - LOGGER.info("consumer shutdown, group:{},idc:{},brokerCluster:{}.", group, idc, brokerCluster); + LOGGER.info("consumer shutdown, group:{}, brokerCluster:{}.", group, brokerCluster); } @Override diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/BaseCarreraConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/BaseCarreraConsumer.java index 6732d90..5c75778 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/BaseCarreraConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/BaseCarreraConsumer.java @@ -25,8 +25,6 @@ public abstract class BaseCarreraConsumer { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCarreraConsumer.class); - protected String idc; - protected String brokerCluster; protected String group; @@ -47,9 +45,8 @@ public abstract class BaseCarreraConsumer { protected volatile boolean isRunning; - public BaseCarreraConsumer(String idc, String brokerCluster, String group, GroupConfig groupConfig, + public BaseCarreraConsumer(String brokerCluster, String group, GroupConfig groupConfig, CProxyConfig cProxyConfig, AsyncMessageHandler handler, Map maxCommitLagMap) { - this.idc = idc; this.brokerCluster = brokerCluster; this.group = group; this.groupConfig = groupConfig; diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraConsumer.java index ed0a36b..0c0f48a 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraConsumer.java @@ -40,8 +40,8 @@ public CarreraConsumer(ConsumerGroupConfig config) { } public void start() throws CarreraClientException { - LogUtils.logMainInfo("CarreraConsumer.start, group:{}, idc:{}, brokerCluster:{}.", - config.getGroup(), config.getcProxyConfig().getIdc(), config.getBrokerCluster()); + LogUtils.logMainInfo("CarreraConsumer.start, group:{}, brokerCluster:{}.", + config.getGroup(), config.getBrokerCluster()); workingJobs = Sets.newConcurrentHashSet(); buildActionMap(); @@ -52,11 +52,11 @@ public void start() throws CarreraClientException { if (ConfigUtils.satisfyNewRmqConsumer(config.getGroupConfig())) { LOGGER.debug("open a CarreraNewRocketMqConsumer client. group:{}.", config.getGroup()); - consumer = new CarreraNewRocketMqConsumer(config.getcProxyConfig().getIdc(), config.getBrokerCluster(), + consumer = new CarreraNewRocketMqConsumer(config.getBrokerCluster(), config.getGroup(), config.getGroupConfig(), config.getcProxyConfig(), rocketmqConfiguration, this, config.getMaxConsumeLagMap(), config.getTotalThreads()); } else { - consumer = new CarreraRocketMqConsumer(config.getcProxyConfig().getIdc(), config.getBrokerCluster(), + consumer = new CarreraRocketMqConsumer(config.getBrokerCluster(), config.getGroup(), config.getGroupConfig(), config.getcProxyConfig(), rocketmqConfiguration, this, config.getMaxConsumeLagMap(), config.getTotalThreads()); } @@ -65,7 +65,7 @@ public void start() throws CarreraClientException { //kafka if (config.getcProxyConfig().getKafkaConfigs().containsKey(config.getBrokerCluster())) { KafkaConfiguration kafkaConfiguration = config.getcProxyConfig().getKafkaConfigs().get(config.getBrokerCluster()); - consumer = new CarreraKafkaConsumer(config.getcProxyConfig().getIdc(), config.getBrokerCluster(), + consumer = new CarreraKafkaConsumer(config.getBrokerCluster(), config.getGroup(), config.getGroupConfig(), config.getcProxyConfig(), kafkaConfiguration, this, config.getMaxConsumeLagMap(), config.getTopicCount(), config.getTopicMap()); } @@ -82,8 +82,7 @@ public void buildActionMap() { } public void stop() { - LogUtils.logMainInfo("CarreraConsumer.stop;group:{}, idc:{}, brokerCluster:{}", config.getGroup(), - config.getcProxyConfig().getIdc(), config.getBrokerCluster()); + LogUtils.logMainInfo("CarreraConsumer.stop;group:{}, brokerCluster:{}", config.getGroup(), config.getBrokerCluster()); if (consumer != null) { consumer.shutdown(); } @@ -137,8 +136,7 @@ public BaseCarreraConsumer getConsumer() { @Override public String toString() { - return "CarreraConsumer@" + config.getcProxyConfig().getIdc() + "@" + - config.getBrokerCluster() + "@" + config.getGroup(); + return "CarreraConsumer@" + config.getBrokerCluster() + "@" + config.getGroup(); } public String getBrokerCluster() { diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraKafkaConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraKafkaConsumer.java index 6e501a2..818acc9 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraKafkaConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraKafkaConsumer.java @@ -51,10 +51,10 @@ public class CarreraKafkaConsumer extends BaseCarreraConsumer { private Map topicCountMap; private Map topicMap; - public CarreraKafkaConsumer(String idc, String brokerCluster, String group, GroupConfig groupConfig, + public CarreraKafkaConsumer(String brokerCluster, String group, GroupConfig groupConfig, CProxyConfig cProxyConfig, KafkaConfiguration kafkaConfiguration, AsyncMessageHandler handler, Map maxCommitLagMap, Map topicCountMap, Map topicMap) { - super(idc, brokerCluster, group, groupConfig, cProxyConfig, handler, maxCommitLagMap); + super(brokerCluster, group, groupConfig, cProxyConfig, handler, maxCommitLagMap); this.kafkaConfiguration = kafkaConfiguration; this.topicCountMap = topicCountMap; diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraNewRocketMqConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraNewRocketMqConsumer.java index f950568..0274b9c 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraNewRocketMqConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraNewRocketMqConsumer.java @@ -21,10 +21,10 @@ public class CarreraNewRocketMqConsumer extends AbstractCarreraRocketMqConsumer { - public CarreraNewRocketMqConsumer(String idc, String brokerCluster, String group, GroupConfig groupConfig, + public CarreraNewRocketMqConsumer(String brokerCluster, String group, GroupConfig groupConfig, CProxyConfig cProxyConfig, RocketmqConfiguration rocketmqConfiguration, AsyncMessageHandler handler, Map maxCommitLagMap, int totalThreads) { - super(idc, brokerCluster, group, groupConfig, cProxyConfig, rocketmqConfiguration, handler, maxCommitLagMap, totalThreads); + super(brokerCluster, group, groupConfig, cProxyConfig, rocketmqConfiguration, handler, maxCommitLagMap, totalThreads); } @Override diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraRocketMqConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraRocketMqConsumer.java index 567a089..dc3441f 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraRocketMqConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/CarreraRocketMqConsumer.java @@ -21,10 +21,10 @@ public class CarreraRocketMqConsumer extends AbstractCarreraRocketMqConsumer { - public CarreraRocketMqConsumer(String idc, String brokerCluster, String group, GroupConfig groupConfig, + public CarreraRocketMqConsumer(String brokerCluster, String group, GroupConfig groupConfig, CProxyConfig cProxyConfig, RocketmqConfiguration rocketmqConfiguration, AsyncMessageHandler handler, Map maxCommitLagMap, int totalThreads) { - super(idc, brokerCluster, group, groupConfig, cProxyConfig, rocketmqConfiguration, handler, maxCommitLagMap, totalThreads); + super(brokerCluster, group, groupConfig, cProxyConfig, rocketmqConfiguration, handler, maxCommitLagMap, totalThreads); } @Override diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/ConfigManager.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/ConfigManager.java index 29a0876..f4f7214 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/ConfigManager.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/ConfigManager.java @@ -4,29 +4,25 @@ import com.xiaojukeji.carrera.config.v4.CProxyConfig; import com.xiaojukeji.carrera.config.v4.GroupConfig; import com.xiaojukeji.carrera.config.v4.cproxy.ConsumeServerConfiguration; -import com.xiaojukeji.carrera.cproxy.consumer.offset.CarreraOffsetManager; -import com.xiaojukeji.carrera.cproxy.utils.ConfigUtils; -import com.xiaojukeji.carrera.dynamic.ParameterDynamicZookeeper; -import com.xiaojukeji.carrera.metric.MetricFactory; -import com.xiaojukeji.carrera.utils.CommonUtils; import com.xiaojukeji.carrera.cproxy.concurrent.CarreraExecutors; import com.xiaojukeji.carrera.cproxy.config.ConfigurationLoader; import com.xiaojukeji.carrera.cproxy.config.ConsumeProxyConfiguration; import com.xiaojukeji.carrera.cproxy.config.ConsumerGroupConfig; import com.xiaojukeji.carrera.cproxy.config.LocalModeConfig; -import com.xiaojukeji.carrera.cproxy.config.ProducerProxyConfiguration; -import com.xiaojukeji.carrera.cproxy.actions.util.CarreraProducerManager; +import com.xiaojukeji.carrera.cproxy.consumer.offset.CarreraOffsetManager; import com.xiaojukeji.carrera.cproxy.proxy.ConsumerProxyMain; import com.xiaojukeji.carrera.cproxy.proxy.ProxyApp; +import com.xiaojukeji.carrera.cproxy.utils.ConfigUtils; import com.xiaojukeji.carrera.cproxy.utils.LogUtils; import com.xiaojukeji.carrera.cproxy.utils.MixAll; import com.xiaojukeji.carrera.cproxy.utils.StringUtils; +import com.xiaojukeji.carrera.dynamic.ParameterDynamicZookeeper; +import com.xiaojukeji.carrera.metric.MetricFactory; +import com.xiaojukeji.carrera.utils.CommonUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; -import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -152,12 +148,6 @@ private void startWatchConfig() throws Exception { } - private void initCarreraProducer(List proxyList) { - ProducerProxyConfiguration producerProxyConfiguration = new ProducerProxyConfiguration(); - producerProxyConfiguration.setCarreraProxyList(proxyList); - CarreraProducerManager.startCarreraProducer(producerProxyConfiguration); - } - public ConsumeServerConfiguration getConsumeServerConfiguration() { return Optional.ofNullable(curCproxyConfig) .map(CProxyConfig::getThriftServer) @@ -282,11 +272,6 @@ private void handleCproxyConfigChanged(CProxyConfig oldConf, CProxyConfig newCon ConsumerManager.getInstance().updateCproxyConfig(newConf); CarreraOffsetManager.getInstance().update(newConf); } - - // pproxy addr was changed - if (ConfigUtils.pProxyListIsUpdated(oldConf, newConf)) { - initCarreraProducer(newConf.getPproxies().values().stream().flatMap(Collection::stream).collect(Collectors.toList())); - } } private void handleGroupConfigChanged(GroupConfig newConf) { diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/LowLevelKafkaConsumer.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/LowLevelKafkaConsumer.java index 31ccef5..6b2a9f9 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/LowLevelKafkaConsumer.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/consumer/LowLevelKafkaConsumer.java @@ -9,7 +9,7 @@ public class LowLevelKafkaConsumer extends CarreraKafkaConsumer { public LowLevelKafkaConsumer(ConsumerGroupConfig config, AsyncMessageHandler handler) { - super(config.getcProxyConfig().getIdc(), config.getBrokerCluster(), config.getGroup(), config.getGroupConfig(), + super(config.getBrokerCluster(), config.getGroup(), config.getGroupConfig(), config.getcProxyConfig(), config.getcProxyConfig().getKafkaConfigs().get(config.getBrokerCluster()), handler, config.getMaxConsumeLagMap(), config.getTopicCount(), config.getTopicMap()); commitLagLimiter = null; diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/proxy/ProxyApp.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/proxy/ProxyApp.java index eab7341..d9adc63 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/proxy/ProxyApp.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/proxy/ProxyApp.java @@ -4,7 +4,6 @@ import com.xiaojukeji.carrera.cproxy.consumer.ConfigManager; import com.xiaojukeji.carrera.cproxy.consumer.ConsumerManager; import com.xiaojukeji.carrera.cproxy.consumer.SharedThreadPool; -import com.xiaojukeji.carrera.cproxy.actions.util.CarreraProducerManager; import com.xiaojukeji.carrera.cproxy.server.ConsumeServer; import com.xiaojukeji.carrera.cproxy.utils.LogUtils; import com.xiaojukeji.carrera.cproxy.utils.StringUtils; @@ -87,8 +86,6 @@ public void stop() { SharedThreadPool.shutdown(); - CarreraProducerManager.shutdown(); - if (waitForShutdown != null) { waitForShutdown.countDown(); waitForShutdown = null; diff --git a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/utils/ConfigUtils.java b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/utils/ConfigUtils.java index 479fa02..19ec12b 100644 --- a/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/utils/ConfigUtils.java +++ b/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/utils/ConfigUtils.java @@ -85,17 +85,4 @@ public static boolean brokerClusterIsUpdated(CProxyConfig oldcProxyConf, CProxyC return !oldcProxyConf.getRocketmqConfigs().equals(newcProxyConf.getRocketmqConfigs()); } - - public static boolean pProxyListIsUpdated(CProxyConfig oldcProxyConf, CProxyConfig newcProxyConf) { - if (oldcProxyConf == null) { - return newcProxyConf != null; - } - if (newcProxyConf == null) { - return false; - } - - return !oldcProxyConf.getPproxies().equals(newcProxyConf.getPproxies()); - - } - } \ No newline at end of file diff --git a/carrera-docker/Dockerfile b/carrera-docker/Dockerfile index 57f14a4..1d8a67b 100644 --- a/carrera-docker/Dockerfile +++ b/carrera-docker/Dockerfile @@ -4,6 +4,7 @@ FROM centos:7 EXPOSE 9613 EXPOSE 9713 EXPOSE 8080 +EXPOSE 2181 # Env ENV HOME_DIR /root @@ -14,9 +15,9 @@ WORKDIR ${HOME_DIR} RUN yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel unzip gettext nmap-ncat openssl wget telnet\ && yum clean all -y -RUN curl http://mirror.bit.edu.cn/apache/tomcat/tomcat-9/v9.0.13/bin/apache-tomcat-9.0.13.zip -o tomcat.zip \ +RUN curl http://mirror.bit.edu.cn/apache/tomcat/tomcat-9/v9.0.14/bin/apache-tomcat-9.0.14.zip -o tomcat.zip \ && unzip tomcat.zip \ - && mv apache-tomcat-9.0.13 tomcat \ + && mv apache-tomcat-9.0.14 tomcat \ && rm tomcat.zip \ && chmod +x tomcat/bin/*.sh @@ -43,4 +44,4 @@ COPY namesvr ${HOME_DIR}/namesvr COPY start.sh ${HOME_DIR}/start.sh # cmd -CMD bash -C '/root/start.sh';'bash' \ No newline at end of file +CMD bash -C '/root/start.sh';'bash' diff --git a/carrera-docker/README.md b/carrera-docker/README.md index 81e0aca..5d83d6c 100644 --- a/carrera-docker/README.md +++ b/carrera-docker/README.md @@ -17,6 +17,7 @@ Note:DDMQ Container requires a MySQL 5.7 container. ### Usage ### * Install Docker * Install MySQL Client(recommend 5.7.x) +* run ```build.sh``` to build packages * run ```play-ddmq.sh``` (will download centos7,mysql,tomcat,zookeeper in first run) * Visit DDMQ Web Console @@ -27,4 +28,4 @@ Note:DDMQ Container requires a MySQL 5.7 container. > > - >*Note:producer-proxy port: 9613、consumer-proxy port: 9713* \ No newline at end of file + >*Note:producer-proxy port: 9613、consumer-proxy port: 9713* diff --git a/carrera-docker/README_CN.md b/carrera-docker/README_CN.md index e18767c..f325193 100644 --- a/carrera-docker/README_CN.md +++ b/carrera-docker/README_CN.md @@ -17,6 +17,7 @@ DDMQ 涉及到的模块较多,集群部署步骤比较复杂。为了方便开 ### 使用方式 ### * 安装 Docker * 安装 MySQL 客户端(建议使用 5.7.x版本) +* 运行 ```build.sh``` 构建打包 * 运行 ```play-ddmq.sh``` (首次执行将下载 centos7,mysql,tomcat,zookeeper 等依赖,大约20分钟,具体情况视网络情况) * 打开 DDMQ 用户控制台 @@ -27,4 +28,4 @@ DDMQ 涉及到的模块较多,集群部署步骤比较复杂。为了方便开 -*备注:producer-proxy port: 9613、consumer-proxy port: 9713* \ No newline at end of file +*备注:producer-proxy port: 9613、consumer-proxy port: 9713* diff --git a/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/CarreraRequest.java b/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/CarreraRequest.java index 0a717b4..be30747 100644 --- a/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/CarreraRequest.java +++ b/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/CarreraRequest.java @@ -108,15 +108,15 @@ public boolean checkValid() { protected boolean checkValidExceptBody(Message msg) { // check msg key. - if (StringUtils.isNotEmpty(msg.getKey()) && msg.getKey().length() > producerPool.getConfigManager().getCarreraConfig().getParamLength().getKeyLenMax()) { + if (StringUtils.isNotEmpty(msg.getKey()) && msg.getKey().length() > 255) { LOGGER.warn("key is too long, topic={}, key={}", msg.getTopic(), msg.getKey()); - return !producerPool.getConfigManager().getCarreraConfig().getParamLength().isFailWhenIllegal(); + return false; } // check msg tags. - if (StringUtils.isNotEmpty(msg.getTags()) && msg.getTags().length() > producerPool.getConfigManager().getCarreraConfig().getParamLength().getTagLenMax()) { + if (StringUtils.isNotEmpty(msg.getTags()) && msg.getTags().length() > 255) { LOGGER.warn("tag is too long, topic={}, tag={}", msg.getTopic(), msg.getTags()); - return !producerPool.getConfigManager().getCarreraConfig().getParamLength().isFailWhenIllegal(); + return false; } //check msg properties diff --git a/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/ProducerPool.java b/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/ProducerPool.java index c8fcb73..e159e24 100644 --- a/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/ProducerPool.java +++ b/carrera-producer/src/main/java/com/xiaojukeji/carrera/pproxy/producer/ProducerPool.java @@ -103,7 +103,7 @@ private void initMetric() throws Exception { private void initRateLimiter() throws Exception { requestLimiter = new TpsLimiter(configManager.getCarreraConfig().getTpsWarningRatio(), configManager.getCarreraConfig().getMaxTps(), configManager.getTopicConfigManager()); - LOGGER.info("initRateLimiter finished, rateLimit config info:{}", configManager.getCarreraConfig().getRateLimit()); + LOGGER.info("initRateLimiter finished"); } public void close() { diff --git a/carrera-sdk/consumer/java/carrera-consumer-sdk-example/src/main/java/com/xiaojukeji/carrera/consumer/examples/SimpleCarreraConsumerExample.java b/carrera-sdk/consumer/java/carrera-consumer-sdk-example/src/main/java/com/xiaojukeji/carrera/consumer/examples/SimpleCarreraConsumerExample.java index bfef770..3663ac5 100644 --- a/carrera-sdk/consumer/java/carrera-consumer-sdk-example/src/main/java/com/xiaojukeji/carrera/consumer/examples/SimpleCarreraConsumerExample.java +++ b/carrera-sdk/consumer/java/carrera-consumer-sdk-example/src/main/java/com/xiaojukeji/carrera/consumer/examples/SimpleCarreraConsumerExample.java @@ -21,7 +21,7 @@ public class SimpleCarreraConsumerExample { public static void main(String[] args) throws TTransportException, InterruptedException { CarreraConfig config = - new CarreraConfig("test-thrift-client", "127.0.0.1:9713"); + new CarreraConfig("cg_test", "127.0.0.1:9713"); config.setRetryInterval(1000); final SimpleCarreraConsumer consumer = new SimpleCarreraConsumer(config); diff --git a/carrera-sdk/producer/java/carrera-producer-sdk-example/pom.xml b/carrera-sdk/producer/java/carrera-producer-sdk-example/pom.xml index 304b91a..fe72d10 100644 --- a/carrera-sdk/producer/java/carrera-producer-sdk-example/pom.xml +++ b/carrera-sdk/producer/java/carrera-producer-sdk-example/pom.xml @@ -23,7 +23,7 @@ com.lmax disruptor - 3.3.0 + 3.3.6 org.apache.logging.log4j diff --git a/carrera-sdk/producer/java/carrera-producer-sdk/pom.xml b/carrera-sdk/producer/java/carrera-producer-sdk/pom.xml index 67225d6..4710086 100644 --- a/carrera-sdk/producer/java/carrera-producer-sdk/pom.xml +++ b/carrera-sdk/producer/java/carrera-producer-sdk/pom.xml @@ -87,7 +87,7 @@ com.lmax disruptor - 3.3.4 + 3.3.6 test