diff --git a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java index e9bf51568..f948bcbe3 100644 --- a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java +++ b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java @@ -23,11 +23,11 @@ import com.google.gson.Gson; import com.xiaomi.youpin.docean.Ioc; import com.xiaomi.youpin.docean.anno.Component; -import com.xiaomi.youpin.docean.common.StringUtils; import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.ozhera.log.common.Config; import org.apache.ozhera.log.common.Constant; import org.apache.ozhera.log.model.LogtailConfig; @@ -40,8 +40,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.*; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import static org.apache.ozhera.log.common.Constant.GSON; @@ -68,10 +70,10 @@ public class MilogConfigListener { private Map oldLogTailConfigMap = new ConcurrentHashMap<>(); private Map oldSinkConfigMap = new ConcurrentHashMap<>(); - private ReentrantLock buildDataLock = new ReentrantLock(); - private StreamCommonExtension streamCommonExtension; + private volatile String originConfig; + public MilogConfigListener(Long spaceId, String dataId, String group, MilogSpaceData milogSpaceData, NacosConfig nacosConfig) { this.spaceId = spaceId; this.dataId = dataId; @@ -89,38 +91,26 @@ private StreamCommonExtension getStreamCommonExtensionInstance() { return Ioc.ins().getBean(factualServiceName); } - private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) throws Exception { - boolean locked = false; - try { - locked = buildDataLock.tryLock(1, TimeUnit.MINUTES); - if (locked) { - if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) { - List sinkConfigs = newMilogSpaceData.getSpaceConfig(); - stopUnusedOldStoreJobs(sinkConfigs); - for (SinkConfig sinkConfig : sinkConfigs) { - stopOldJobsForRemovedTailIds(sinkConfig); - if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) { - //Whether the submission store information changes, the change stops - if (!isStoreSame(sinkConfig, oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) { - restartPerTail(sinkConfig, milogSpaceData); - } else { - handlePerTailComparison(sinkConfig, milogSpaceData); - } - } else { - newStoreStart(sinkConfig, milogSpaceData); - } + private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) { + if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) { + List sinkConfigs = newMilogSpaceData.getSpaceConfig(); + stopUnusedOldStoreJobs(sinkConfigs); + for (SinkConfig sinkConfig : sinkConfigs) { + stopOldJobsForRemovedTailIds(sinkConfig); + if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) { + //Whether the submission store information changes, the change stops + if (!isStoreSame(sinkConfig, oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) { + restartPerTail(sinkConfig, milogSpaceData); + } else { + handlePerTailComparison(sinkConfig, milogSpaceData); } } else { - // Restart all - initNewJob(newMilogSpaceData); + newStoreStart(sinkConfig, milogSpaceData); } - } else { - log.warn("handleNacosConfigDataJob lock failed,data:{}", gson.toJson(newMilogSpaceData)); - } - } finally { - if (locked) { - buildDataLock.unlock(); } + } else { + // Restart all + initNewJob(newMilogSpaceData); } } @@ -283,7 +273,7 @@ private void stopOldJobsIfNeeded() { private void startTailPer(SinkConfig sinkConfig, LogtailConfig logTailConfig, Long logSpaceId) { if (null == logSpaceId || null == logTailConfig || null == logTailConfig.getLogtailId()) { - log.error("logSpaceId or logTailConfig or logTailId is null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig), gson.toJson(logTailConfig), spaceId); + log.error("logSpaceId or logTailConfig or logTailId is null,storeId:{},tailId:{},logSpaceId:{}", sinkConfig.getLogstoreId(), logTailConfig.getLogtailId(), spaceId); return; } Boolean isStart = streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig, logSpaceId); @@ -291,7 +281,7 @@ private void startTailPer(SinkConfig sinkConfig, LogtailConfig logTailConfig, Lo log.warn("preCheckTaskExecution error,preCheckTaskExecution is false,LogTailConfig:{}", gson.toJson(logTailConfig)); return; } - log.info("【Listen tail】Initialize the new task, tail configuration:{},index:{},cluster information:{},spaceId:{}", gson.toJson(logTailConfig), sinkConfig.getEsIndex(), gson.toJson(sinkConfig.getEsInfo()), logSpaceId); + log.info("Initialize the new task, tail configuration:{},index:{},cluster information:{},spaceId:{}", gson.toJson(logTailConfig), sinkConfig.getEsIndex(), gson.toJson(sinkConfig.getEsInfo()), logSpaceId); jobManager.startJob(logTailConfig, sinkConfig, logSpaceId); oldLogTailConfigMap.put(logTailConfig.getLogtailId(), logTailConfig); } @@ -309,7 +299,11 @@ public Executor getExecutor() { @Override public void receiveConfigInfo(String dataValue) { try { - log.info("listen tail received a configuration request:{},a configuration that already exists:storeMap:{},tailMap:{}", dataValue, gson.toJson(oldSinkConfigMap), gson.toJson(oldLogTailConfigMap)); + if (StringUtils.equals(originConfig, dataValue)) { + return; + } + originConfig = dataValue; + log.info("listen tail received a configuration request:{},origin config:{}", dataValue, originConfig); if (StringUtils.isNotEmpty(dataValue) && !Constant.NULLVALUE.equals(dataValue)) { dataValue = streamCommonExtension.dataPreProcess(dataValue); MilogSpaceData newMilogSpaceData = GSON.fromJson(dataValue, MilogSpaceData.class); diff --git a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java index ac6fa56df..1a9227b1a 100644 --- a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java +++ b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java @@ -38,8 +38,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -59,10 +57,6 @@ public class JobManager { private Gson gson = new Gson(); - private ReentrantLock stopLock = new ReentrantLock(); - - private ReentrantLock startLock = new ReentrantLock(); - public JobManager() { sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, ""); sinkChain = Ioc.ins().getBean(SinkChain.class); @@ -101,22 +95,14 @@ private void sinkJobsShutDown(LogtailConfig logtailConfig) { } public void stopJob(LogtailConfig logtailConfig) { - boolean locked = false; try { - locked = stopLock.tryLock(10, TimeUnit.SECONDS); - if (locked) { - List jobKeys = jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + List jobKeys = jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + if(CollectionUtils.isNotEmpty(jobKeys)){ log.info("【stop job】,all jobs:{}", jobKeys); sinkJobsShutDown(logtailConfig); - } else { - log.warn("【stop job】,other job is running,wait 10s,tailConfig:{}", gson.toJson(logtailConfig)); } } catch (Exception e) { log.error(String.format("[JobManager.stopJob] stopJob err,logtailId:%s", logtailConfig.getLogtailId()), e); - } finally { - if (locked) { - stopLock.unlock(); - } } } @@ -124,7 +110,7 @@ private void startConsumerJob(String type, String ak, String sk, String clusterI logtailConfig, SinkConfig sinkConfig, Long logSpaceId) { try { SinkJobConfig sinkJobConfig = buildSinkJobConfig(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId); - log.warn("##startConsumerJob## spaceId:{}, storeId:{}, tailId:{}", sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), sinkJobConfig.getLogTailId()); + log.warn("startConsumerJob spaceId:{}, storeId:{}, tailId:{}", sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), sinkJobConfig.getLogTailId()); String sinkProviderBean = sinkJobConfig.getMqType() + LogStreamConstants.sinkJobProviderBeanSuffix; SinkJobProvider sinkJobProvider = Ioc.ins().getBean(sinkProviderBean); @@ -149,10 +135,9 @@ private void startConsumerJob(String type, String ak, String sk, String clusterI startSinkJob(sinkJobProvider.getBackupJob(sinkJobConfig), SinkJobEnum.BACKUP_JOB, logtailConfig.getLogtailId()); } - - log.info(String.format("[JobManager.initJobs] startJob success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex())); + log.info(String.format("startJob success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex())); } catch (Throwable e) { - log.error(String.format("[JobManager.initJobs] startJob err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new RuntimeException(e)); + log.error(String.format("startJob err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new RuntimeException(e)); } } @@ -195,28 +180,18 @@ private SinkJobConfig buildSinkJobConfig(String type, String ak, String sk, Stri } public void startJob(LogtailConfig logtailConfig, SinkConfig sinkConfig, Long logSpaceId) { - boolean locked = false; try { - locked = startLock.tryLock(10, TimeUnit.SECONDS); - if (locked) { - String ak = logtailConfig.getAk(); - String sk = logtailConfig.getSk(); - String clusterInfo = logtailConfig.getClusterInfo(); - String type = logtailConfig.getType(); - if (StringUtils.isEmpty(clusterInfo) || StringUtils.isEmpty(logtailConfig.getTopic())) { - log.info("start job error,ak or sk or logtailConfig null,ak:{},sk:{},logtailConfig:{}", ak, sk, new Gson().toJson(logtailConfig)); - return; - } - startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId); - } else { - log.warn("start job error,lock timeout,tailConfig:{},sinkConfig:{}", gson.toJson(logtailConfig), gson.toJson(sinkConfig)); + String ak = logtailConfig.getAk(); + String sk = logtailConfig.getSk(); + String clusterInfo = logtailConfig.getClusterInfo(); + String type = logtailConfig.getType(); + if (StringUtils.isEmpty(clusterInfo) || StringUtils.isEmpty(logtailConfig.getTopic())) { + log.info("start job error,ak or sk or logTailConfig null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig)); + return; } + startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId); } catch (Exception e) { - log.error(String.format("[JobManager.startJob] start job err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e); - } finally { - if (locked) { - startLock.unlock(); - } + log.error(String.format("start job err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e); } }