Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import com.google.gson.Gson;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Component;
import com.xiaomi.youpin.docean.common.StringUtils;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.Constant;
import org.apache.ozhera.log.model.LogtailConfig;
Expand All @@ -40,8 +40,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.ozhera.log.common.Constant.GSON;
Expand All @@ -68,10 +70,10 @@ public class MilogConfigListener {
private Map<Long, LogtailConfig> oldLogTailConfigMap = new ConcurrentHashMap<>();
private Map<Long, SinkConfig> oldSinkConfigMap = new ConcurrentHashMap<>();

private ReentrantLock buildDataLock = new ReentrantLock();

private StreamCommonExtension streamCommonExtension;

private volatile String originConfig;

public MilogConfigListener(Long spaceId, String dataId, String group, MilogSpaceData milogSpaceData, NacosConfig nacosConfig) {
this.spaceId = spaceId;
this.dataId = dataId;
Expand All @@ -89,38 +91,26 @@ private StreamCommonExtension getStreamCommonExtensionInstance() {
return Ioc.ins().getBean(factualServiceName);
}

private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) throws Exception {
boolean locked = false;
try {
locked = buildDataLock.tryLock(1, TimeUnit.MINUTES);
if (locked) {
if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) {
List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
stopUnusedOldStoreJobs(sinkConfigs);
for (SinkConfig sinkConfig : sinkConfigs) {
stopOldJobsForRemovedTailIds(sinkConfig);
if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
//Whether the submission store information changes, the change stops
if (!isStoreSame(sinkConfig, oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
restartPerTail(sinkConfig, milogSpaceData);
} else {
handlePerTailComparison(sinkConfig, milogSpaceData);
}
} else {
newStoreStart(sinkConfig, milogSpaceData);
}
private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) {
if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) {
List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
stopUnusedOldStoreJobs(sinkConfigs);
for (SinkConfig sinkConfig : sinkConfigs) {
stopOldJobsForRemovedTailIds(sinkConfig);
if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
//Whether the submission store information changes, the change stops
if (!isStoreSame(sinkConfig, oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
restartPerTail(sinkConfig, milogSpaceData);
} else {
handlePerTailComparison(sinkConfig, milogSpaceData);
}
} else {
// Restart all
initNewJob(newMilogSpaceData);
newStoreStart(sinkConfig, milogSpaceData);
}
} else {
log.warn("handleNacosConfigDataJob lock failed,data:{}", gson.toJson(newMilogSpaceData));
}
} finally {
if (locked) {
buildDataLock.unlock();
}
} else {
// Restart all
initNewJob(newMilogSpaceData);
}
}

Expand Down Expand Up @@ -283,15 +273,15 @@ private void stopOldJobsIfNeeded() {

private void startTailPer(SinkConfig sinkConfig, LogtailConfig logTailConfig, Long logSpaceId) {
if (null == logSpaceId || null == logTailConfig || null == logTailConfig.getLogtailId()) {
log.error("logSpaceId or logTailConfig or logTailId is null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig), gson.toJson(logTailConfig), spaceId);
log.error("logSpaceId or logTailConfig or logTailId is null,storeId:{},tailId:{},logSpaceId:{}", sinkConfig.getLogstoreId(), logTailConfig.getLogtailId(), spaceId);
return;
}
Boolean isStart = streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig, logSpaceId);
if (!isStart) {
log.warn("preCheckTaskExecution error,preCheckTaskExecution is false,LogTailConfig:{}", gson.toJson(logTailConfig));
return;
}
log.info("【Listen tail】Initialize the new task, tail configuration:{},index:{},cluster information:{},spaceId:{}", gson.toJson(logTailConfig), sinkConfig.getEsIndex(), gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
log.info("Initialize the new task, tail configuration:{},index:{},cluster information:{},spaceId:{}", gson.toJson(logTailConfig), sinkConfig.getEsIndex(), gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
jobManager.startJob(logTailConfig, sinkConfig, logSpaceId);
oldLogTailConfigMap.put(logTailConfig.getLogtailId(), logTailConfig);
}
Expand All @@ -309,7 +299,11 @@ public Executor getExecutor() {
@Override
public void receiveConfigInfo(String dataValue) {
try {
log.info("listen tail received a configuration request:{},a configuration that already exists:storeMap:{},tailMap:{}", dataValue, gson.toJson(oldSinkConfigMap), gson.toJson(oldLogTailConfigMap));
if (StringUtils.equals(originConfig, dataValue)) {
return;
}
originConfig = dataValue;
log.info("listen tail received a configuration request:{},origin config:{}", dataValue, originConfig);
if (StringUtils.isNotEmpty(dataValue) && !Constant.NULLVALUE.equals(dataValue)) {
dataValue = streamCommonExtension.dataPreProcess(dataValue);
MilogSpaceData newMilogSpaceData = GSON.fromJson(dataValue, MilogSpaceData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;


Expand All @@ -59,10 +57,6 @@ public class JobManager {

private Gson gson = new Gson();

private ReentrantLock stopLock = new ReentrantLock();

private ReentrantLock startLock = new ReentrantLock();

public JobManager() {
sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
sinkChain = Ioc.ins().getBean(SinkChain.class);
Expand Down Expand Up @@ -101,30 +95,22 @@ private void sinkJobsShutDown(LogtailConfig logtailConfig) {
}

public void stopJob(LogtailConfig logtailConfig) {
boolean locked = false;
try {
locked = stopLock.tryLock(10, TimeUnit.SECONDS);
if (locked) {
List<Long> jobKeys = jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
List<Long> jobKeys = jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
if(CollectionUtils.isNotEmpty(jobKeys)){
log.info("【stop job】,all jobs:{}", jobKeys);
sinkJobsShutDown(logtailConfig);
} else {
log.warn("【stop job】,other job is running,wait 10s,tailConfig:{}", gson.toJson(logtailConfig));
}
} catch (Exception e) {
log.error(String.format("[JobManager.stopJob] stopJob err,logtailId:%s", logtailConfig.getLogtailId()), e);
} finally {
if (locked) {
stopLock.unlock();
}
}
}

private void startConsumerJob(String type, String ak, String sk, String clusterInfo, LogtailConfig
logtailConfig, SinkConfig sinkConfig, Long logSpaceId) {
try {
SinkJobConfig sinkJobConfig = buildSinkJobConfig(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId);
log.warn("##startConsumerJob## spaceId:{}, storeId:{}, tailId:{}", sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), sinkJobConfig.getLogTailId());
log.warn("startConsumerJob spaceId:{}, storeId:{}, tailId:{}", sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), sinkJobConfig.getLogTailId());

String sinkProviderBean = sinkJobConfig.getMqType() + LogStreamConstants.sinkJobProviderBeanSuffix;
SinkJobProvider sinkJobProvider = Ioc.ins().getBean(sinkProviderBean);
Expand All @@ -149,10 +135,9 @@ private void startConsumerJob(String type, String ak, String sk, String clusterI
startSinkJob(sinkJobProvider.getBackupJob(sinkJobConfig), SinkJobEnum.BACKUP_JOB,
logtailConfig.getLogtailId());
}

log.info(String.format("[JobManager.initJobs] startJob success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
log.info(String.format("startJob success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
} catch (Throwable e) {
log.error(String.format("[JobManager.initJobs] startJob err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new RuntimeException(e));
log.error(String.format("startJob err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new RuntimeException(e));
}
}

Expand Down Expand Up @@ -195,28 +180,18 @@ private SinkJobConfig buildSinkJobConfig(String type, String ak, String sk, Stri
}

public void startJob(LogtailConfig logtailConfig, SinkConfig sinkConfig, Long logSpaceId) {
boolean locked = false;
try {
locked = startLock.tryLock(10, TimeUnit.SECONDS);
if (locked) {
String ak = logtailConfig.getAk();
String sk = logtailConfig.getSk();
String clusterInfo = logtailConfig.getClusterInfo();
String type = logtailConfig.getType();
if (StringUtils.isEmpty(clusterInfo) || StringUtils.isEmpty(logtailConfig.getTopic())) {
log.info("start job error,ak or sk or logtailConfig null,ak:{},sk:{},logtailConfig:{}", ak, sk, new Gson().toJson(logtailConfig));
return;
}
startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId);
} else {
log.warn("start job error,lock timeout,tailConfig:{},sinkConfig:{}", gson.toJson(logtailConfig), gson.toJson(sinkConfig));
String ak = logtailConfig.getAk();
String sk = logtailConfig.getSk();
String clusterInfo = logtailConfig.getClusterInfo();
String type = logtailConfig.getType();
if (StringUtils.isEmpty(clusterInfo) || StringUtils.isEmpty(logtailConfig.getTopic())) {
log.info("start job error,ak or sk or logTailConfig null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig));
return;
}
startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, sinkConfig, logSpaceId);
} catch (Exception e) {
log.error(String.format("[JobManager.startJob] start job err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
} finally {
if (locked) {
startLock.unlock();
}
log.error(String.format("start job err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
}
}

Expand Down
Loading